In [None]:
import numpy as np
import matplotlib.pyplot as plt
import time

In [None]:
# Grid world parameters
GRID_ROWS = 10
GRID_COLS = 10

N_STATES = GRID_ROWS * GRID_COLS  # 100 个状态

START_STATE = 1     # 起始状态（左上角）
GOAL_STATE = 100    # 目标状态（右下角）

# Actions: 0 = up, 1 = down, 2 = left, 3 = right
ACTIONS = [0, 1, 2, 3]


In [None]:
def state_to_position(s):
  
    s_index = s - 1  # 变成 0~99
    row = s_index // GRID_ROWS
    col = s_index % GRID_COLS
    return row, col


def position_to_state(row, col):
   
    return row * GRID_COLS + col + 1


In [None]:
def step(state, action):
   
    row, col = state_to_position(state)

    if action == 0:      # up
        new_row, new_col = row - 1, col
    elif action == 1:    # down
        new_row, new_col = row + 1, col
    elif action == 2:    # left
        new_row, new_col = row, col - 1
    elif action == 3:    # right
        new_row, new_col = row, col + 1
    else:
        raise ValueError("Unknown action!")

    # 边界条件：不能走出 0~9 的范围，超出就停在原地
    if new_row < 0 or new_row >= GRID_ROWS or new_col < 0 or new_col >= GRID_COLS:
        new_row, new_col = row, col  # 保持不变

    next_state = position_to_state(new_row, new_col)
    return next_state


In [None]:
def reward(state):
   
    if state == GOAL_STATE:
        return 100.0
    else:
        return 0.0


In [None]:
def run_episode_random(max_steps=1000):
  
    current_state = START_STATE
    total_reward = 0.0
    steps = 0

    for t in range(max_steps):
        # 随机选一个动作
        action = np.random.choice(ACTIONS)

        # 根据状态转移函数得到下一个状态
        next_state = step(current_state, action)

        # 根据新状态计算奖励
        r = reward(next_state)

        total_reward += r
        steps += 1

        # 如果到达目标状态，本回合结束
        if next_state == GOAL_STATE:
            break

        # 否则继续
        current_state = next_state

    # 平均每步奖励（避免除 0）
    avg_reward_per_step = total_reward / steps if steps > 0 else 0.0

    return avg_reward_per_step, steps


In [None]:
def run_multiple_episodes(n_runs=30, max_steps=1000):
   
    avg_rewards = []
    steps_list = []
    runtimes = []

    for i in range(n_runs):
        start_time = time.time()

        avg_r, steps = run_episode_random(max_steps=max_steps)

        end_time = time.time()
        runtime = end_time - start_time

        avg_rewards.append(avg_r)
        steps_list.append(steps)
        runtimes.append(runtime)

        print(f"Run {i+1}/{n_runs}: avg_reward = {avg_r:.4f}, steps = {steps}, runtime = {runtime:.4f} s")

    return np.array(avg_rewards), np.array(steps_list), np.array(runtimes)


In [None]:
avg_rewards, steps_list, runtimes = run_multiple_episodes(n_runs=30, max_steps=1000)


In [None]:
def summarize_results(avg_rewards, steps_list, runtimes):
  
    print("=== Summary over runs ===")
    print(f"Average reward per step: mean = {avg_rewards.mean():.4f}, std = {avg_rewards.std():.4f}")
    print(f"Steps to goal:          mean = {steps_list.mean():.2f}, std = {steps_list.std():.2f}")
    print(f"Runtime (seconds):      mean = {runtimes.mean():.4f}, std = {runtimes.std():.4f}")


summarize_results(avg_rewards, steps_list, runtimes)


In [None]:
# 为了图更好看一点，可以把图像放大一些
plt.figure(figsize=(10, 4))

# 1. 平均奖励的 boxplot
plt.subplot(1, 3, 1)
plt.boxplot(avg_rewards)
plt.title("Avg Reward per Step")

# 2. 步数的 boxplot
plt.subplot(1, 3, 2)
plt.boxplot(steps_list)
plt.title("Steps to Goal")

# 3. 运行时间的 boxplot
plt.subplot(1, 3, 3)
plt.boxplot(runtimes)
plt.title("Runtime (s)")

plt.tight_layout()
plt.show()


---

# Exercise 2 – Q-learning

在这一部分中，我们在与 Exercise 1 相同的 10×10 网格环境中，引入 **Q-learning 算法**，
让智能体通过试错学习每个状态–动作对的价值，并在不同训练步数时测试其表现。


## 2.1 Initialize Q-table

在这一小节中，我们创建一个大小为 (100 × 4) 的 Q 表，用来存储每个状态–动作对的估计价值，
并将所有元素初始化为 0。


In [None]:
# Q-learning parameters
ALPHA = 0.7   # learning rate
GAMMA = 0.99  # discount factor

def initialize_q_table():
  
    Q = np.zeros((N_STATES, len(ACTIONS)))
    return Q


## 2.2 Q-learning Update Rule

实现基于公式

\\[
Q[s, a] = (1 - \\alpha) Q[s, a] + \\alpha (r(s') + \\gamma \\max_{a'} Q[s', a'])
\\]

的更新函数，其中学习率 \\(\\alpha = 0.7\\)，折扣因子 \\(\\gamma = 0.99\\)。


In [None]:
def q_learning_update(Q, state, action, reward_value, next_state, alpha=ALPHA, gamma=GAMMA):
   
    # 注意：Python 索引从 0 开始，所以要减 1
    s_idx = state - 1
    s_next_idx = next_state - 1
    a_idx = action  # 这里动作本来就用 0,1,2,3 表示

    old_value = Q[s_idx, a_idx]
    next_max = np.max(Q[s_next_idx, :])

    new_value = (1 - alpha) * old_value + alpha * (reward_value + gamma * next_max)
    Q[s_idx, a_idx] = new_value

    return Q


## 2.3 Training with Random Policy

使用与 Exercise 1 相同的随机策略进行 20,000 步交互，每一步都根据 Q-learning 公式更新 Q 表。
在若干关键步数（如 100, 200, 500, …, 20000）暂停学习，运行测试回合并记录平均奖励。


In [None]:
def run_greedy_test_episode(Q, max_steps=1000):
  
    current_state = START_STATE
    total_reward = 0.0
    steps = 0

    for t in range(max_steps):
        s_idx = current_state - 1
        q_values = Q[s_idx, :]

        # 找到当前状态下 Q 值最大的动作（可能不止一个）
        max_q = np.max(q_values)
        best_actions = np.where(q_values == max_q)[0]
        action = int(np.random.choice(best_actions))

        # 与环境交互
        next_state = step(current_state, action)
        r = reward(next_state)

        total_reward += r
        steps += 1

        if next_state == GOAL_STATE:
            breakb

        current_state = next_state

    avg_reward = total_reward / steps if steps > 0 else 0.0
    return avg_reward, steps


In [None]:
# 设定训练的总步数和测试点
TOTAL_STEPS = 20000
TEST_POINTS = [100, 200, 500, 600, 700, 800, 900, 1000,
               2500, 5000, 7500, 10000, 12500, 15000, 17500, 20000]

def train_q_learning_random_policy(total_steps=TOTAL_STEPS,
                                   test_points=TEST_POINTS,
                                   alpha=ALPHA, gamma=GAMMA,
                                   max_test_steps=1000):
   
    Q = initialize_q_table()
    current_state = START_STATE

    test_avg_rewards = []
    next_test_idx = 0  # 当前要检测的 test_points 索引

    start_time = time.time()

    for step_idx in range(1, total_steps + 1):
        # 1. 随机选择一个动作（探索）
        action = np.random.choice(ACTIONS)

        # 2. 与环境交互，得到下一状态和奖励
        next_state = step(current_state, action)
        r = reward(next_state)

        # 3. Q-learning 更新
        Q = q_learning_update(Q, current_state, action, r, next_state,
                              alpha=alpha, gamma=gamma)

        # 4. 如果到达目标，重置到起始状态；否则继续
        if next_state == GOAL_STATE:
            current_state = START_STATE
        else:
            current_state = next_state

        # 5. 如果当前步数是一个测试点，进行一次贪心测试 episode
        if next_test_idx < len(test_points) and step_idx == test_points[next_test_idx]:
            avg_r_test, _ = run_greedy_test_episode(Q, max_steps=max_test_steps)
            test_avg_rewards.append(avg_r_test)
            next_test_idx += 1

    end_time = time.time()
    runtime = end_time - start_time

    return Q, np.array(test_avg_rewards), runtime


In [None]:
N_RUNS_Q = 30

all_test_rewards = []  # 形状将会是 (30, len(TEST_POINTS))
q_runtimes = []

for i in range(N_RUNS_Q):
    print(f"Run {i+1}/{N_RUNS_Q} ...")
    Q_final, test_rewards, runtime = train_q_learning_random_policy()

    all_test_rewards.append(test_rewards)
    q_runtimes.append(runtime)

all_test_rewards = np.vstack(all_test_rewards)  # (N_RUNS_Q, len(TEST_POINTS))
q_runtimes = np.array(q_runtimes)

print("Shape of all_test_rewards:", all_test_rewards.shape)


## 2.4 Evaluation Episodes (Greedy Policy)

在测试回合中，不再更新 Q 表，而是总是选择具有最大 Q 值的动作（贪心策略），
运行最多 1000 步并计算平均奖励，用于评估当前 Q 表的质量。


In [None]:
# 计算每个测试点的平均测试奖励和标准差
mean_test_rewards = all_test_rewards.mean(axis=0)
std_test_rewards = all_test_rewards.std(axis=0)

print("Test points:", TEST_POINTS)
print("Mean test rewards:", mean_test_rewards)
print("Std  test rewards:", std_test_rewards)

# 运行时间统计
print("\n=== Runtime over Q-learning runs ===")
print(f"Runtime: mean = {q_runtimes.mean():.4f} s, std = {q_runtimes.std():.4f} s")


In [None]:
plt.figure(figsize=(6, 4))
plt.errorbar(TEST_POINTS, mean_test_rewards, yerr=std_test_rewards, marker='o')
plt.xlabel("Training Steps")
plt.ylabel("Average Reward (test episode)")
plt.title("Q-learning Performance (Random Policy Training)")
plt.grid(True)
plt.show()


## 2.5 Result Visualization

1. 绘制“训练步数 vs 测试平均奖励”的曲线；
2. 使用热力图展示每个状态的最大 Q 值，以可视化智能体在网格中的价值分布。


In [None]:
# 取最近一次 run 得到的 Q_final（如果你想用平均的也可以扩展）
Q_learned = Q_final

# 对每个状态取 max_a Q[s,a]，并 reshape 成 10×10
state_values = np.max(Q_learned, axis=1)          # shape = (100,)
value_grid = state_values.reshape((GRID_ROWS, GRID_COLS))

plt.figure(figsize=(5, 5))
plt.imshow(value_grid, origin='upper')
plt.colorbar(label="Max Q-value")
plt.title("Learned State Values (Max Q over Actions)")
plt.xlabel("Column")
plt.ylabel("Row")
plt.show()
