# Q Learning coding
We build a maze environment to train tabular Q-learning model.

In [None]:
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import os

os.makedirs("frame", exist_ok=True)

In [None]:
# 动作符号
action_symbols = {
    0: '↑',  
    1: '↓',  
    2: '←',  
    3: '→',  
}

# 迷宫环境设置
maze = np.array([
    [2, 0, 0, 0],
    [0, -1, -1, 0],
    [0, -1, -1, 0],
    [0, 0, 0, 0],
    [0, -1, -1, 1]
])
rows, cols = maze.shape
start, end = (0, 0), (4, 3)

# Q-Learning参数
alpha = 0.1   # 学习率
gamma = 0.9   # 折扣因子
epsilon = 0.5 # 探索概率

# 初始化Q表
Q = np.zeros((rows, cols, 4))  # 4个动作：上、下、左、右

# 动作索引
actions = {
    0: (-1, 0),  # 上
    1: (1, 0),   # 下
    2: (0, -1),  # 左
    3: (0, 1)    # 右
}

In [None]:
def plot_policy_with_maze(Q, maze_map, shape=(4, 4), show=False, title=None):
    policy = np.argmax(Q, axis=-1).reshape(shape)
    fig, ax = plt.subplots()
    ax.set_xticks(np.arange(shape[1]+1)-0.5, minor=True)
    ax.set_yticks(np.arange(shape[0]+1)-0.5, minor=True)
    ax.grid(which="minor", color="black", linewidth=1)
    ax.tick_params(which="both", bottom=False, left=False, labelbottom=False, labelleft=False)
    if title is None:
        ax.set_title("Maze Policy")
    else:
        ax.set_title(title)
    
    
    for i in range(shape[0]):
        for j in range(shape[1]):
            state = i * shape[1] + j
            if maze_map[i, j] == -1:
                # 墙体为黄色
                ax.add_patch(plt.Rectangle((j-0.5, i-0.5), 1, 1, color='gold'))
            else:
                if maze_map[i, j] == 0:
                    # 可通行区域为蓝色
                    ax.add_patch(plt.Rectangle((j-0.5, i-0.5), 1, 1, color='skyblue'))
                elif maze_map[i, j] == 1:
                    ax.add_patch(plt.Rectangle((j-0.5, i-0.5), 1, 1, color='red'))
                else:
                    ax.add_patch(plt.Rectangle((j-0.5, i-0.5), 1, 1, color='green'))

                # 写动作符号
                action = policy[i, j]
                ax.text(j, i, action_symbols[action], ha='center', va='center', fontsize=20, color='black')

    plt.gca().invert_yaxis()
    if show:
        plt.show()
    return fig

def plot_q_values_with_maze(Q, maze_map, shape=(4, 4)):
    max_q = np.max(Q, axis=-1).reshape(shape)
    fig, ax = plt.subplots()
    ax.set_title("Max Q-Value per State with Maze")

    for i in range(shape[0]):
        for j in range(shape[1]):
            if maze_map[i, j] == -1:
                color = 'gold'
            else:
                norm_val = max_q[i, j] / np.max(max_q) if np.max(max_q) > 0 else 0
                color = plt.cm.Blues(norm_val)
                ax.text(j, i, f"{max_q[i, j]:.2f}", ha='center', va='center', color='black')

            ax.add_patch(plt.Rectangle((j-0.5, i-0.5), 1, 1, color=color))


    ax.set_xticks(np.arange(shape[1]+1)-0.5, minor=True)
    ax.set_yticks(np.arange(shape[0]+1)-0.5, minor=True)
    ax.grid(which="minor", color="black", linewidth=1)
    ax.tick_params(which="both", bottom=False, left=False, labelbottom=False, labelleft=False)
    plt.gca().invert_yaxis()
    plt.show()

In [None]:
# 训练过程
max_count = 40
success_episodes = []

for episode in tqdm(range(5000)):
    state = start
    count = 0
    success = False  # 本轮是否成功的标记

    while state != end and count < max_count:
        if np.random.rand() < epsilon:
            action_index = np.random.choice([0, 1, 2, 3])
        else:
            action_index = np.argmax(Q[state[0], state[1]])

        action = actions[action_index]
        next_state = (state[0] + action[0], state[1] + action[1])

        # 检查边界和障碍
        if 0 <= next_state[0] < rows and 0 <= next_state[1] < cols and maze[next_state[0], next_state[1]] != -1:
            reward = 0
            if next_state == end:
                reward = 1
                success = True  # 成功到达终点
            Q[state[0], state[1], action_index] += alpha * (
                reward + gamma * np.max(Q[next_state[0], next_state[1]]) - Q[state[0], state[1], action_index]
            )
            state = next_state
        else:
            Q[state[0], state[1], action_index] += alpha * (-1 - Q[state[0], state[1], action_index])

        count += 1
    
        if success:
            success_episodes.append(episode)
            if len(success_episodes) > 1:
                epsilon = 0.1
                
    if episode<200:
        fig = plot_policy_with_maze(Q, maze, maze.shape, show=False, title=f"Episode: {episode}, Success: {success}")
        fig.savefig(f"frames/policy_{episode:04d}.png")
        plt.close(fig)

## Optional

In [None]:
# Visualize successful rollouts
plt.figure(figsize=(12, 4))
plt.scatter(success_episodes, [1]*len(success_episodes), marker='|', color='green')
plt.yticks([0, 1], ["Failure", "Success"])
plt.xlabel("Episode Index")
plt.title("Episode of successful arrival, reward=0 and epsilon=0.5")
plt.grid(True, axis='x')
plt.tight_layout()
plt.show()

In [None]:
# Visualize policy
fig = plot_policy_with_maze(Q, maze, maze.shape, show=False, title=None)
fig.savefig(f"images.png")
plt.close(fig)

In [None]:
# Visualize Q value
plot_q_values_with_maze(Q, maze, maze.shape)