### 动作值估计Sarsa

In [None]:
import numpy as np
import time
from rl_utils.GNGridWorldEnv import GridWorldEnv

# ε-Greedy策略生成
def epsilon_greedy_action(q_table, state, epsilon, n_actions):
    if np.random.rand() < epsilon:
        return np.random.randint(0, n_actions)
    else:
        return np.argmax(q_table[state])

# 设置环境
grid_world_size = 10
obstacle_count = 20
env = GridWorldEnv(size=grid_world_size,obstacle_count=obstacle_count)
n_actions = env.action_space.n
q_table = np.zeros((grid_world_size * grid_world_size, n_actions))

env.render()  # 显示窗口


# 初始参数
gamma = 0.95
epsilon = 1.0   # 随机epsilon，强探索
epsilon_decay = 0.9995   # eps的每轮衰减率
epsilon_min = 0.01 # 最小的eps
max_iterate = 10000
seed = 20
learning_rate = 0.01

state_count = np.zeros((grid_world_size * grid_world_size, n_actions))  # 访问次数
for it_index in range(max_iterate):

    observation, _ = env.reset(seed=seed, options={'enable_random_pos': True})
    state = int((observation['agent'][0] * grid_world_size) + observation['agent'][1])
    done = False
    episode_max = 200
    episode = []
    while not done and episode_max >0:
        action = epsilon_greedy_action(q_table, state, epsilon, n_actions)
        next_state, reward, terminated, truncated, _ = env.step(action)
        next_state = int((next_state['agent'][0] * grid_world_size) + next_state['agent'][1])
        episode.append((state, action, reward))
        # sarsa的核心步骤
        next_action = epsilon_greedy_action(q_table, next_state, epsilon, n_actions)
        q_table[state][action] += learning_rate * (reward + gamma * q_table[next_state][next_action] - q_table[state][action])
        action = next_action

        state = next_state
        done = terminated or truncated
        episode_max -= 1

    # 更新epsilon
    epsilon = max(epsilon * epsilon_decay,epsilon_min)
    # 打印进度
    if it_index % 100 == 0:
        print(f"Episode {it_index}, epsilon: {epsilon:.4f}")


print("\n正在用新策略运行一个可视化 episode...")
test_env = GridWorldEnv(size=grid_world_size, render_mode='human',obstacle_count=obstacle_count)
n_actions = test_env.action_space.n
observation, _ = test_env.reset(seed=seed, options={'enable_random_pos': True})
state = int((observation['agent'][0] * grid_world_size) + observation['agent'][1])
done = False
total_reward = 0

while not done:
    action = np.argmax(q_table[state])
    next_state, reward, terminated, truncated, _ = test_env.step(action)
    state = int((next_state['agent'][0] * grid_world_size) + next_state['agent'][1])
    total_reward += reward
    done = terminated or truncated
    time.sleep(0.5)

print(f"测试 episode 总奖励: {total_reward}")
test_env.close()
env.close()

### Expected Sarsa算法

In [None]:
import numpy as np
import time
from rl_utils.GNGridWorldEnv import GridWorldEnv

# ε-Greedy策略生成
def epsilon_greedy_action(q_table, state, epsilon, n_actions):
    if np.random.rand() < epsilon:
        return np.random.randint(0, n_actions)
    else:
        return np.argmax(q_table[state])

# 下一步所有可能得动作期望
def compute_expected_q(q_table, next_state, epsilon, n_actions):
    max_action = np.argmax(q_table[next_state])
    max_action_probability = 1.0 - epsilon + (epsilon / n_actions)
    other_actions_probability = (1.0 - max_action_probability) / (n_actions - 1) if n_actions > 1 else 1.0

    other_score = 0.0
    for a in range(n_actions):
        if a != max_action:
            other_score += other_actions_probability * q_table[next_state][a]

    return max_action_probability * q_table[next_state][max_action] + other_score

# 设置环境
grid_world_size = 10
obstacle_count = 20
env = GridWorldEnv(size=grid_world_size,obstacle_count=obstacle_count)
n_actions = env.action_space.n
q_table = np.zeros((grid_world_size * grid_world_size, n_actions))

env.render()  # 显示窗口


# 初始参数
gamma = 0.95
epsilon = 1.0   # 随机epsilon，强探索
epsilon_decay = 0.9995   # eps的每轮衰减率
epsilon_min = 0.01 # 最小的eps
max_iterate = 10000
seed = 20
learning_rate = 0.01

state_count = np.zeros((grid_world_size * grid_world_size, n_actions))  # 访问次数
for it_index in range(max_iterate):

    observation, _ = env.reset(seed=seed, options={'enable_random_pos': True})
    state = int((observation['agent'][0] * grid_world_size) + observation['agent'][1])
    done = False
    episode_max = 200
    episode = []
    while not done and episode_max >0:
        action = epsilon_greedy_action(q_table, state, epsilon, n_actions)
        next_state, reward, terminated, truncated, _ = env.step(action)
        next_state = int((next_state['agent'][0] * grid_world_size) + next_state['agent'][1])
        episode.append((state, action, reward))
        # Expected Sarsa的核心步骤
        next_action = epsilon_greedy_action(q_table, next_state, epsilon, n_actions)
        expected = compute_expected_q(q_table,next_state, epsilon, n_actions)
        q_table[state][action] += learning_rate * (reward + gamma * expected - q_table[state][action])
        action = next_action

        state = next_state
        done = terminated or truncated
        episode_max -= 1

    # 更新epsilon
    epsilon = max(epsilon * epsilon_decay,epsilon_min)
    # 打印进度
    if it_index % 100 == 0:
        print(f"Episode {it_index}, epsilon: {epsilon:.4f}")


print("\n正在用新策略运行一个可视化 episode...")
test_env = GridWorldEnv(size=grid_world_size, render_mode='human',obstacle_count=obstacle_count)
n_actions = test_env.action_space.n
observation, _ = test_env.reset(seed=seed, options={'enable_random_pos': True})
state = int((observation['agent'][0] * grid_world_size) + observation['agent'][1])
done = False
total_reward = 0

while not done:
    action = np.argmax(q_table[state])
    next_state, reward, terminated, truncated, _ = test_env.step(action)
    state = int((next_state['agent'][0] * grid_world_size) + next_state['agent'][1])
    total_reward += reward
    done = terminated or truncated
    time.sleep(0.5)

print(f"测试 episode 总奖励: {total_reward}")
test_env.close()
env.close()

### n-step Sarsa算法

In [10]:
import numpy as np
import time
from rl_utils.GNGridWorldEnv import GridWorldEnv

# ε-Greedy策略生成
def epsilon_greedy_action(q_table, state, epsilon, n_actions):
    if np.random.rand() < epsilon:
        return np.random.randint(0, n_actions)
    else:
        return np.argmax(q_table[state])

# 设置环境
grid_world_size = 10
obstacle_count = 20
env = GridWorldEnv(size=grid_world_size,obstacle_count=obstacle_count)
n_actions = env.action_space.n
q_table = np.zeros((grid_world_size * grid_world_size, n_actions))

env.render()  # 显示窗口


# 初始参数
gamma = 0.95
epsilon = 1.0   # 随机epsilon，强探索
epsilon_decay = 0.9995   # eps的每轮衰减率
epsilon_min = 0.01 # 最小的eps
max_iterate = 10000
seed = 98
learning_rate = 0.01
n_step = 10

state_count = np.zeros((grid_world_size * grid_world_size, n_actions))  # 访问次数
for it_index in range(max_iterate):

    observation, _ = env.reset(seed=seed, options={'enable_random_pos': True})
    state = int((observation['agent'][0] * grid_world_size) + observation['agent'][1])
    done = False
    episode_max = 0
    episode = []
    while not done and episode_max < 200:
        action = epsilon_greedy_action(q_table, state, epsilon, n_actions)
        next_state, reward, terminated, truncated, _ = env.step(action)
        next_state = int((next_state['agent'][0] * grid_world_size) + next_state['agent'][1])
        episode.append((state, action, reward))
        # n-step Sarsa核心步骤
        G = 0.0
        tau = len(episode) - n_step - 1
        if episode_max > n_step:
            for t in range(n_step):
                s, a, r = episode[tau + t]
                G += (gamma ** t) * r
            G +=  (gamma ** n_step) * q_table[state][action]
            before_step,before_action,before_reward = episode[tau]
            q_table[before_step][before_action] = q_table[before_step][before_action] + learning_rate *(G - q_table[before_step][before_action])
        state = next_state
        done = terminated or truncated
        episode_max += 1

    # n-step Sarsa核心步骤更新尾部信息
    T = len(episode)
    for tau in range(max(0, T - n_step), T):
        s_tau, a_tau, _ = episode[tau]
        G = 0.0
        for i in range(tau, T):
            G += (gamma ** (i - tau)) * episode[i][2]
        q_table[s_tau][a_tau] += learning_rate * (G - q_table[s_tau][a_tau])
    # 更新epsilon
    epsilon = max(epsilon * epsilon_decay,epsilon_min)
    # 打印进度
    if it_index % 100 == 0:
        print(f"Episode {it_index}, epsilon: {epsilon:.4f}")


print("\n正在用新策略运行一个可视化 episode...")
test_env = GridWorldEnv(size=grid_world_size, render_mode='human',obstacle_count=obstacle_count)
n_actions = test_env.action_space.n
observation, _ = test_env.reset(seed = seed, options={'enable_random_pos': True})
state = int((observation['agent'][0] * grid_world_size) + observation['agent'][1])
done = False
total_reward = 0

while not done:
    action = np.argmax(q_table[state])
    next_state, reward, terminated, truncated, _ = test_env.step(action)
    state = int((next_state['agent'][0] * grid_world_size) + next_state['agent'][1])
    total_reward += reward
    done = terminated or truncated
    time.sleep(0.5)

print(f"测试 episode 总奖励: {total_reward}")
test_env.close()
env.close()

Episode 0, epsilon: 0.9995
Episode 100, epsilon: 0.9507
Episode 200, epsilon: 0.9044
Episode 300, epsilon: 0.8602
Episode 400, epsilon: 0.8183
Episode 500, epsilon: 0.7784
Episode 600, epsilon: 0.7404
Episode 700, epsilon: 0.7043
Episode 800, epsilon: 0.6699
Episode 900, epsilon: 0.6372
Episode 1000, epsilon: 0.6062
Episode 1100, epsilon: 0.5766
Episode 1200, epsilon: 0.5485
Episode 1300, epsilon: 0.5217
Episode 1400, epsilon: 0.4963
Episode 1500, epsilon: 0.4720
Episode 1600, epsilon: 0.4490
Episode 1700, epsilon: 0.4271
Episode 1800, epsilon: 0.4063
Episode 1900, epsilon: 0.3865
Episode 2000, epsilon: 0.3676
Episode 2100, epsilon: 0.3497
Episode 2200, epsilon: 0.3326
Episode 2300, epsilon: 0.3164
Episode 2400, epsilon: 0.3010
Episode 2500, epsilon: 0.2863
Episode 2600, epsilon: 0.2723
Episode 2700, epsilon: 0.2590
Episode 2800, epsilon: 0.2464
Episode 2900, epsilon: 0.2344
Episode 3000, epsilon: 0.2229
Episode 3100, epsilon: 0.2121
Episode 3200, epsilon: 0.2017
Episode 3300, epsilon:

### 最优动作值估计Q-learning

In [None]:
import numpy as np
import time
from rl_utils.GNGridWorldEnv import GridWorldEnv

# ε-Greedy策略生成
def epsilon_greedy_action(q_table, state, epsilon, n_actions):
    if np.random.rand() < epsilon:
        return np.random.randint(0, n_actions)
    else:
        return np.argmax(q_table[state])

# 设置环境
grid_world_size = 10
obstacle_count = 20
env = GridWorldEnv(size=grid_world_size,obstacle_count=obstacle_count)
n_actions = env.action_space.n
q_table = np.zeros((grid_world_size * grid_world_size, n_actions))

env.render()  # 显示窗口


# 初始参数
gamma = 0.95
epsilon = 1.0   # 随机epsilon，强探索
epsilon_decay = 0.9995   # eps的每轮衰减率
epsilon_min = 0.01 # 最小的eps
max_iterate = 10000
seed = 20
learning_rate = 0.01

state_count = np.zeros((grid_world_size * grid_world_size, n_actions))  # 访问次数
for it_index in range(max_iterate):

    observation, _ = env.reset(seed=seed, options={'enable_random_pos': True})
    state = int((observation['agent'][0] * grid_world_size) + observation['agent'][1])
    done = False
    episode_max = 200
    episode = []
    while not done and episode_max >0:
        action = epsilon_greedy_action(q_table, state, epsilon, n_actions)
        next_state, reward, terminated, truncated, _ = env.step(action)
        next_state = int((next_state['agent'][0] * grid_world_size) + next_state['agent'][1])
        episode.append((state, action, reward))
        # q-learning的核心步骤
        q_table[state][action] += learning_rate * (reward + gamma * q_table[next_state][np.argmax(q_table[next_state])] - q_table[state][action])
        state = next_state
        done = terminated or truncated
        episode_max -= 1

    # 更新epsilon
    epsilon = max(epsilon * epsilon_decay,epsilon_min)
    # 打印进度
    if it_index % 100 == 0:
        print(f"Episode {it_index}, epsilon: {epsilon:.4f}")


print("\n正在用新策略运行一个可视化 episode...")
test_env = GridWorldEnv(size=grid_world_size, render_mode='human',obstacle_count=obstacle_count)
n_actions = test_env.action_space.n
observation, _ = test_env.reset(seed=seed, options={'enable_random_pos': True})
state = int((observation['agent'][0] * grid_world_size) + observation['agent'][1])
done = False
total_reward = 0

while not done:
    action = np.argmax(q_table[state])
    next_state, reward, terminated, truncated, _ = test_env.step(action)
    state = int((next_state['agent'][0] * grid_world_size) + next_state['agent'][1])
    total_reward += reward
    done = terminated or truncated
    time.sleep(0.5)

print(f"测试 episode 总奖励: {total_reward}")
test_env.close()
env.close()