# 值迭代算法
作者：stzhao
github: https://github.com/zhaoshitian

# 一、定义环境


In [1]:
import numpy as np
import sys,os
curr_path = os.path.abspath('')
parent_path = os.path.dirname(curr_path)
sys.path.append(parent_path)
from envs.simple_grid import DrunkenWalkEnv

In [2]:
def all_seed(env,seed = 1):
    ## 这个函数主要是为了固定随机种子
    import numpy as np
    import random
    import os
    env.seed(seed) 
    np.random.seed(seed)
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed) 


In [3]:
#env = DrunkenWalkEnv(map_name="theAlley")
env = DrunkenWalkEnv(map_name="4x4")
all_seed(env, seed = 1) # 设置随机种子为1

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import ListedColormap
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False    # 用来正常显示负号

def visualize_env(env):
    # 获取地图
    desc = env.desc.tolist()
    desc = [[c.decode('utf-8') for c in line] for line in desc]
    
    # 创建图形
    fig, ax = plt.subplots(figsize=(10, 2))
    
    # 设置颜色映射
    colors = ['white', 'gray', 'green', 'red']  # 对应 '.', 'H', 'S', 'G'
    cmap = ListedColormap(colors)
    
    # 创建数值矩阵
    value_map = {
        '.': 0,
        'H': 1,
        'S': 2,
        'G': 3
    }
    
    # 转换为数值矩阵
    grid = np.array([[value_map[cell] for cell in row] for row in desc])
    
    # 绘制网格
    im = ax.imshow(grid, cmap=cmap, aspect='equal')
    
    # 添加文字标注
    for i in range(len(desc)):
        for j in range(len(desc[0])):
            text = desc[i][j]
            ax.text(j, i, text, ha='center', va='center', color='black')
    
    # 设置轴标签
    ax.set_xticks(range(len(desc[0])))
    ax.set_yticks(range(len(desc)))
    
    # 添加标题
    plt.title('醉汉走路环境', fontsize=12)
    
    # 添加图例
    legend_elements = [plt.Rectangle((0,0),1,1, facecolor=color, label=label)
                      for color, label in [
                          ('green', '起点 (S)'),
                          ('red', '终点 (G)'),
                          ('gray', '坑洞 (H)'),
                          ('white', '正常路面 (.)')
                      ]]
    ax.legend(handles=legend_elements, bbox_to_anchor=(1.05, 1), loc='upper left')
    
    plt.tight_layout()
    plt.show()

# 使用示例
env = DrunkenWalkEnv(map_name="4x4")
visualize_env(env)


# 二、价值迭代算法


In [5]:
def value_iteration(env, theta=0.005, discount_factor=0.9, max_count = 100):
    Q = np.zeros((env.nS, env.nA)) # 初始化一个Q表格
    count = 0
    deltas = []  # 记录每次迭代的delta值
    while True:
        delta = 0.0
        Q_tmp = np.zeros((env.nS, env.nA))
        for state in range(env.nS):
            for a in range(env.nA):
                accum = 0.0
                reward_total = 0.0
                for prob, next_state, reward, done in env.P[state][a]:
                    accum += prob* np.max(Q[next_state, :])
                    reward_total += prob * reward
                Q_tmp[state, a] = reward_total + discount_factor * accum
                delta = max(delta, abs(Q_tmp[state, a] - Q[state, a]))
        Q = Q_tmp.copy()
        deltas.append(delta)
        count += 1
        if delta < theta or count > max_count: # 这里设置了即使算法没有收敛，跑100次也退出循环
            print(f"算法在第 {count} 次迭代后收敛")
            break 
    # 绘制收敛过程
    plt.figure(figsize=(10, 5))
    plt.plot(deltas)
    plt.xlabel('迭代次数')
    plt.ylabel('最大变化量 (delta)')
    plt.title('价值迭代收敛过程')
    plt.yscale('log')  # 使用对数尺度更容易观察
    plt.grid(True)
    plt.show()
    return Q

In [None]:
Q = value_iteration(env, theta=0.005, discount_factor=0.9, max_count=100)
print(Q)

In [None]:
policy = np.zeros([env.nS, env.nA]) # 初始化一个策略表格
for state in range(env.nS):
    best_action = np.argmax(Q[state, :]) #根据价值迭代算法得到的Q表格选择出策略
    policy[state, best_action] = 1

policy = [int(np.argwhere(policy[i]==1)) for i in range(env.nS) ]
print(policy)

# 三、测试

In [8]:
num_episode = 1000 # 测试1000次

def test(env,policy):
    
    rewards = []  # 记录所有回合的奖励
    success = []  # 记录该回合是否成功走到终点
    for i_ep in range(num_episode):
        ep_reward = 0  # 记录每个episode的reward
        state = env.reset()  # 重置环境, 重新开一局（即开始新的一个回合） 这里state=0
        while True:
            action = policy[state]  # 根据算法选择一个动作
            next_state, reward, done, _ = env.step(action)  # 与环境进行一个交互
            state = next_state  # 更新状态
            ep_reward += reward
            if done:
                break
        if state==15: # 即走到终点
            success.append(1)
        else:
            success.append(0)
        rewards.append(ep_reward)
    acc_suc = np.array(success).sum()/num_episode
    print("测试的成功率是：", acc_suc)
    

In [None]:
test(env, policy)
# 打印策略
print("\n最优策略：")
for i in range(env.nrow):
    row = ""
    for j in range(env.ncol):
        state = i * env.ncol + j
        action = policy[state]
        if env.desc[i][j] in b'GH':
            row += " * "  # 目标或坑洞
        else:
            row += " " + ["←", "↓", "→", "↑"][action] + " "
    print(row)