In [1]:
'''
深度强化学习——原理、算法与PyTorch实战
'''
import numpy as np

class sweeprobot():
    def __init__(self):
        # 状态空间
        self.S = [[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4], [0, 5]],
                  [[1, 0], [1, 1], [1, 2], [1, 3], [1, 4], [1, 5]],
                  [[2, 0], [2, 1], [2, 2], [2, 3], [2, 4], [2, 5]],
                  [[3, 0], [3, 1], [3, 2], [3, 3], [3, 4], [4, 5]],
                  [[4, 0], [4, 1], [4, 2], [4, 3], [4, 4], [4, 5]],
                  [[5, 0], [5, 1], [5, 2], [5, 3], [5, 4], [5, 5]]]
        # 动作空间
        self.A = [[None, None], [-1, 0], [1, 0], [0, -1], [0, 1]]
        # 状态值
        self.V = [[None for i in range(6)] for j in range(6)]
        self.V[1][1] = 0
        self.V[5][4] = 0
        # 策略
        self.pi = None
        self.gamma = 0.8

    def reward(self, s, a):
        # 奖励函数
        [truth1, truth2] = np.add(s, a) == [5, 4]
        [truth3, truth4] = np.add(s, a) == [1, 1]
        [truth5, truth6] = np.add(s, a) == [3, 3]
        # 若状态s转移到[5,4](收集垃圾)
        if s != [5, 4] and (truth1 and truth2):
            return 3
        # 若状态s转移到[1,1](充电)
        if s != [1, 1] and (truth3 and truth4):
            return 1
        # 若状态s转移到[3,3](撞到障碍物)
        if truth5 and truth6:
            return -10
        return 0

    def cal_coefficient(self):
        # 该函数用来计算出线性方程组的系数矩阵和向量值
        # 首先初始化一个25 * 25的系数矩阵和25个元素的向量
        coef_Matrix = [[0 for i in range(25)] for j in range(25)]
        b = [0 for i in range(25)]
        for i in range(1, 6):
            for j in range(1, 6):
                # 判断是否是终止情况，如果是的话直接计算下一个
                [truth1, truth2] = [i == 5, j == 4]
                [truth3, truth4] = [i == 1, j == 1]
                [truth5, truth6] = [i == 3, j == 3]
                if truth1 and truth2:
                    continue
                if truth3 and truth4:
                    continue
                if truth5 and truth6:
                    continue
                # 计算当前状态下的动作数，以用于计算策略pi
                count_action = 0
                if i - 1 >= 1:
                    count_action += 1
                if i + 1 <= 5:
                    count_action += 1
                if j - 1 >= 1:
                    count_action += 1
                if j + 1 <= 5:
                    count_action += 1
                self.pi = 1 / count_action
                # 具体计算每一个状态值的函数
                b_value = 0
                coef_CurrentState = 0
                # 向上的情况
                if i - 1 >= 1:
                    b_value = b_value + self.pi * self.reward(self.S[i][j], self.A[1])
                    if i - 1 == 3 and j == 3:
                        coef_CurrentState = self.pi * self.gamma
                    else:
                        coef1 = self.pi * self.gamma
                        coef_Matrix[(i - 1) * 5 + j - 1][((i - 1) - 1) * 5 + j - 1] = coef1
                # 向下的情况
                if i + 1 <= 5:
                    b_value = b_value + self.pi * self.reward(self.S[i][j], self.A[2])
                    if i + 1 == 3 and j == 3:
                        coef_CurrentState = self.pi * self.gamma
                    else:
                        coef2 = self.pi * self.gamma
                        coef_Matrix[(i - 1) * 5 + j - 1][((i + 1) - 1) * 5 + j - 1] = coef2
                # 向左的情况
                if j - 1 >= 1:
                    b_value = b_value + self.pi * self.reward(self.S[i][j], self.A[3])
                    if j - 1 == 3 and i == 3:
                        coef_CurrentState = self.pi * self.gamma
                    else:
                        coef3 = self.pi * self.gamma
                        coef_Matrix[(i - 1) * 5 + j - 1][(i - 1) * 5 + (j - 1) - 1] = coef3
                # 向右的情况
                if j + 1 <= 5:
                    b_value = b_value + self.pi * self.reward(self.S[i][j], self.A[4])
                    if j + 1 == 3 and i == 3:
                        coef_CurrentState = self.pi * self.gamma
                    else:
                        coef4 = self.pi * self.gamma
                        coef_Matrix[(i - 1) * 5 + j - 1][(i - 1) * 5 + (j + 1) - 1] = coef4
                # 将左边的移项，所以系数为-1 (单位矩阵减系数矩阵)
                coef_Matrix[(i - 1) * 5 + j - 1][(i - 1) * 5 + j - 1] = -1 + coef_CurrentState
                # 同理，将常数项移项需要乘-1
                b[(i - 1) * 5 + j - 1] = -1 * b_value
        # 因为状态[1,1]和状态[5,4]可以确定其状态值为0,状态[3,3]不存在，所以其实只需求22*22的矩阵和22个元素的向量值
        # 把矩阵和向量第[(1-1)*5+1-1]和[(5-1)*5+4-1]删除
        del coef_Matrix[23]
        del b[23]
        del coef_Matrix[12]
        del b[12]
        del coef_Matrix[0]
        del b[0]
        # 把矩阵每一行的[(1-1)*5+1-1]和[(5-1)*5+4-1]和[(3-1)*5+3-1]删除
        for item in coef_Matrix:
            del item[23]
            del item[12]
            del item[0]
        # 得到系数矩阵coef_Matrix = (γP-I)与 b = -R,其中γ为衰退因子，P为状态转移矩阵，I为单位矩阵，R为奖励函数
        return coef_Matrix, b

    def solve_equation(self, coef_Matrix, b):
        # 计算状态值函数
        # 解方程组A*x = b,其中A = (γP-I)，b = -R 
        A = np.array(coef_Matrix)
        b = np.array(b)
        x = np.linalg.solve(A, b)
        x = list(x)
        for i in range(1, 6):
            for j in range(1, 6):
                [truth1, truth2] = [i == 5, j == 4]
                [truth3, truth4] = [i == 1, j == 1]
                [truth5, truth6] = [i == 3, j == 3]
                if truth1 and truth2:
                    continue
                if truth3 and truth4:
                    continue
                if truth5 and truth6:
                    continue
                self.V[i][j] = x.pop(0)

    def print_value(self):
        # 输出扫地机器人的状态值
        print('扫地机器人在随机策略下的状态值：')
        for i in range(1, 6):
            for j in range(1, 6):
                if self.V[j][6 - i] != None:
                    print('%.3f'%self.V[j][6 - i], end=" ")
                else:
                    print(self.V[j][6 - i], end=" ")
            print()

if __name__ == '__main__':
    sr = sweeprobot()
    A, b = sr.cal_coefficient()
    sr.solve_equation(A, b)
    sr.print_value()

扫地机器人在随机策略下的状态值：
-1.111 -1.359 -1.615 -0.329 1.368 
-1.417 -2.372 -4.369 -0.987 0.000 
-1.831 -4.716 None -3.987 -0.300 
-0.731 -2.162 -4.649 -2.160 -0.887 
0.000 -0.716 -1.772 -1.280 -0.867 


In [2]:
'''
深度强化学习——原理、算法与PyTorch实战
'''
import numpy as np

class SweepRobot():
    def __init__(self):
        # 状态空间
        self.S = [[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4], [0, 5]],
                  [[1, 0], [1, 1], [1, 2], [1, 3], [1, 4], [1, 5]],
                  [[2, 0], [2, 1], [2, 2], [2, 3], [2, 4], [2, 5]],
                  [[3, 0], [3, 1], [3, 2], [3, 3], [3, 4], [4, 5]],
                  [[4, 0], [4, 1], [4, 2], [4, 3], [4, 4], [4, 5]],
                  [[5, 0], [5, 1], [5, 2], [5, 3], [5, 4], [5, 5]]]
        # 动作空间
        self.A = [[None, None], [-1, 0], [1, 0], [0, -1], [0, 1]]
        # 状态值
        self.V = [[None for i in range(6)] for j in range(6)]
        self.V[1][1] = 0
        self.V[5][4] = 0
        # 无策略
        self.gamma = 0.8
        self.theta = 0.0001

    def reward(self, s, a):
        # 奖励函数
        [truth1, truth2] = np.add(s, a) == [5, 4]
        [truth3, truth4] = np.add(s, a) == [1, 1]
        [truth5, truth6] = np.add(s, a) == [3, 3]
        # 若状态s转移到[5,4](收集垃圾)
        if s != [5, 4] and (truth1 and truth2):
            return 3
        # 若状态s转移到[1,1](充电)
        if s != [1, 1] and (truth3 and truth4):
            return 1
        # 若状态s转移到[3,3](撞到障碍物)
        if truth5 and truth6:
            return -10
        return 0

    def cal_optimal_value(self):
        # 建立V的副本
        copy_V = self.V
        # 首先初始化V值,便于计算，都初始化为0
        for i in range(1, 6):
            for j in range(1, 6):
                # 判断是否是终止情况，如果是的话直接计算下一个
                [truth1, truth2] = [i == 5, j == 4]
                [truth3, truth4] = [i == 1, j == 1]
                [truth5, truth6] = [i == 3, j == 3]
                if truth1 and truth2:
                    continue
                if truth3 and truth4:
                    continue
                if truth5 and truth6:
                    continue
                self.V[i][j] = 0
                copy_V[i][j] = 0
        while True:
            Delta = 0
            for i in range(1, 6):
                for j in range(1, 6):
                    # 判断是否是终止情况，如果是的话直接计算下一个
                    [truth1, truth2] = [i == 5, j == 4]
                    [truth3, truth4] = [i == 1, j == 1]
                    [truth5, truth6] = [i == 3, j == 3]
                    if truth1 and truth2:
                        continue
                    if truth3 and truth4:
                        continue
                    if truth5 and truth6:
                        continue
                    v = self.V[i][j]
                    # 因为每个状态的动作空间不一样，所以需要分情况讨论
                    max_value = 0
                    # 向上的情况
                    if i - 1 >= 1:
                        if i - 1 == 3 and j == 3:
                            max_value = max(max_value, self.reward(self.S[i][j], self.A[1]) + self.gamma * self.V[i][j])
                        else:
                            max_value = max(max_value, self.reward(self.S[i][j], self.A[1]) + self.gamma * self.V[i - 1][j])
                    # 向下的情况
                    if i + 1 <= 5:
                        if i + 1 == 3 and j == 3:
                            max_value = max(max_value, self.reward(self.S[i][j], self.A[1]) + self.gamma * self.V[i][j])
                        else:
                            max_value = max(max_value, self.reward(self.S[i][j], self.A[2]) + self.gamma * self.V[i + 1][j])
                    # 向左的情况
                    if j - 1 >= 1:
                        if j - 1 == 3 and i == 3:
                            max_value = max(max_value, self.reward(self.S[i][j], self.A[1]) + self.gamma * self.V[i][j])
                        else:
                            max_value = max(max_value, self.reward(self.S[i][j], self.A[3]) + self.gamma * self.V[i][j - 1])
                    # 向右的情况
                    if j + 1 <= 5:
                        if j + 1 == 3 and i == 3:
                            max_value = max(max_value, self.reward(self.S[i][j], self.A[1]) + self.gamma * self.V[i][j])
                        else:
                            max_value = max(max_value, self.reward(self.S[i][j], self.A[4]) + self.gamma * self.V[i][j + 1])
                    copy_V[i][j] = max_value
                    Delta = max(Delta, abs(v - copy_V[i][j]))
            self.V = copy_V
            if Delta < self.theta:
                break

    def print_value(self):
        # 输出扫地机器人的状态值
        print('扫地机器人最优状态值：')
        for i in range(1, 6):
            for j in range(1, 6):
                if self.V[j][6 - i] != None:
                    print('%.3f'%self.V[j][6 - i], end=" ")
                else:
                    print(self.V[j][6 - i], end=" ")
            print()

if __name__ == '__main__':
    sr = SweepRobot()
    sr.cal_optimal_value()
    sr.print_value()

扫地机器人最优状态值：
1.229 1.536 1.920 2.400 3.000 
1.536 1.920 2.400 3.000 0.000 
1.229 1.536 None 2.400 3.000 
1.000 1.229 1.536 1.920 2.400 
0.000 1.000 1.229 1.536 1.920 


In [4]:
# 代05-例4.1-基于状态值函数的确定环境扫地机器人任务策略评估
import numpy as np
import sys
import os

# 添加当前目录到路径，确保可以导入自定义模块
sys.path.append(os.path.dirname(__file__))

# 尝试导入自定义的 GridWorldEnv，如果失败则使用我们之前定义的类
try:
    from book_gridword import GridWorldEnv
    print("成功导入 book_gridword 模块")
except ImportError:
    print("无法导入 book_gridword，使用内置的 GridWorldEnv 类")
    # 这里放置我们之前定义的完整 GridWorldEnv 类
    import gymnasium as gym
    from gymnasium import spaces
    from gymnasium.utils import seeding
    import matplotlib.pyplot as plt
    import matplotlib.patches as patches
    
    class Grid(object):
        def __init__(self, x: int = None, y: int = None, grid_type: int = 0, enter_reward: float = 0.0):
            self.x = x
            self.y = y
            self.grid_type = grid_type
            self.enter_reward = enter_reward
            self.name = "X{0}-Y{1}".format(self.x, self.y)
    
    class GridMatrix(object):
        def __init__(self, n_width: int, n_height: int, default_type: int = 0, default_reward: float = 0.0):
            self.n_height = n_height
            self.n_width = n_width
            self.default_reward = default_reward
            self.default_type = default_type
            self.grids = None
            self.len = n_width * n_height
            self.reset()
        
        def reset(self):
            self.grids = []
            for x in range(self.n_height):
                for y in range(self.n_width):
                    self.grids.append(Grid(x, y, self.default_type, self.default_reward))
        
        def get_grid(self, x, y=None):
            xx, yy = None, None
            if isinstance(x, int):
                xx, yy = x, y
            elif isinstance(x, tuple):
                xx, yy = x[0], x[1]
            assert (0 <= xx < self.n_width and 0 <= yy < self.n_height)
            index = yy * self.n_width + xx
            return self.grids[index]
        
        def set_reward(self, x, y, reward):
            grid = self.get_grid(x, y)
            if grid is not None:
                grid.enter_reward = reward
        
        def set_type(self, x, y, grid_type):
            grid = self.get_grid(x, y)
            if grid is not None:
                grid.grid_type = grid_type
        
        def get_reward(self, x, y):
            grid = self.get_grid(x, y)
            if grid is None:
                return None
            return grid.enter_reward
        
        def get_type(self, x, y):
            grid = self.get_grid(x, y)
            if grid is None:
                return None
            return grid.grid_type
    
    class GridWorldEnv(gym.Env):
        metadata = {'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 30}
        
        def __init__(self, n_width: int = 5, n_height: int = 5, u_size=40, default_reward: float = 0, default_type=0):
            self.n_width = n_width
            self.n_height = n_height
            self.default_reward = default_reward
            self.default_type = default_type
            self.u_size = u_size
            
            self.grids = GridMatrix(n_width=self.n_width, n_height=self.n_height,
                                   default_reward=self.default_reward, default_type=self.default_type)
            self.reward = 0
            self.action = None
            
            self.action_space = spaces.Discrete(4)
            self.observation_space = spaces.Discrete(self.n_height * self.n_width)
            
            self.state = None
            self.ends = [(0, 0), (4, 3)]
            self.start = (0, 4)
            self.types = [(2, 2, 1)]
            self.rewards = [(0, 0, 1), (4, 3, 5), (2, 2, -10)]
            self.refresh_setting()
            self.viewer = None
            self.seed()
            self.reset()
        
        def seed(self, seed=None):
            self.np_random, seed = seeding.np_random(seed)
            return [seed]
        
        def step(self, action):
            assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
            self.action = action
            old_x, old_y = self._state_to_xy(self.state)
            new_x, new_y = old_x, old_y
            
            if action == 0: new_x -= 1  # left
            elif action == 1: new_x += 1  # right
            elif action == 2: new_y += 1  # up
            elif action == 3: new_y -= 1  # down
            
            if new_x < 0: new_x = 0
            if new_x >= self.n_width: new_x = self.n_width - 1
            if new_y < 0: new_y = 0
            if new_y >= self.n_height: new_y = self.n_height - 1
            
            if self.grids.get_type(new_x, new_y) == 1:
                new_x, new_y = old_x, old_y
            
            self.reward = self.grids.get_reward(new_x, new_y)
            done = self._is_end_state(new_x, new_y)
            self.state = self._xy_to_state(new_x, new_y)
            info = {"x": new_x, "y": new_y, "grids": self.grids}
            terminated = done
            truncated = False
            return self.state, self.reward, terminated, truncated, info
        
        def _state_to_xy(self, s):
            x = s % self.n_width
            y = int((s - x) / self.n_width)
            return x, y
        
        def _xy_to_state(self, x, y=None):
            if isinstance(x, int):
                return x + self.n_width * y
            elif isinstance(x, tuple):
                return x[0] + self.n_width * x[1]
            return -1
        
        def refresh_setting(self):
            for x, y, r in self.rewards:
                self.grids.set_reward(x, y, r)
            for x, y, t in self.types:
                self.grids.set_type(x, y, t)
        
        def reset(self, seed=None, options=None):
            if seed is not None:
                self.seed(seed)
            self.state = self._xy_to_state(self.start)
            info = {}
            return self.state, info
        
        def _is_end_state(self, x, y=None):
            if y is not None:
                xx, yy = x, y
            elif isinstance(x, int):
                xx, yy = self._state_to_xy(x)
            else:
                assert (isinstance(x, tuple)), "坐标数据不完整"
                xx, yy = x[0], x[1]
            for end in self.ends:
                if xx == end[0] and yy == end[1]:
                    return True
            return False
        
        def render(self):
            fig, ax = plt.subplots(figsize=(10, 10))
            
            for x in range(self.n_width):
                for y in range(self.n_height):
                    grid_type = self.grids.get_type(x, y)
                    reward = self.grids.get_reward(x, y)
                    
                    color = 'white'
                    if grid_type == 1: color = 'gray'
                    elif (x, y) in self.ends: color = 'gold'
                    elif (x, y) == self.start: color = 'lightblue'
                    elif reward > 0: color = 'lightgreen'
                    elif reward < 0: color = 'lightcoral'
                    
                    rect = patches.Rectangle((x, y), 1, 1, linewidth=2, edgecolor='black', facecolor=color, alpha=0.8)
                    ax.add_patch(rect)
                    
                    if reward != 0:
                        ax.text(x + 0.5, y + 0.5, f'{reward}', ha='center', va='center', 
                               fontsize=14, fontweight='bold',
                               bbox=dict(boxstyle="round,pad=0.3", facecolor='white', alpha=0.8))
            
            agent_x, agent_y = self._state_to_xy(self.state)
            agent_circle = plt.Circle((agent_x + 0.5, agent_y + 0.5), 0.3, color='red', alpha=0.8, zorder=10)
            ax.add_patch(agent_circle)
            
            ax.set_xlim(-0.5, self.n_width + 0.5)
            ax.set_ylim(-0.5, self.n_height + 0.5)
            ax.set_aspect('equal')
            ax.set_xticks(range(self.n_width + 1))
            ax.set_yticks(range(self.n_height + 1))
            ax.grid(True, linestyle='-', linewidth=1, alpha=0.3)
            ax.set_title('扫地机器人环境 - 策略评估', fontsize=16, fontweight='bold', pad=20)
            ax.set_xlabel('X 坐标', fontsize=12)
            ax.set_ylabel('Y 坐标', fontsize=12)
            
            legend_elements = [
                plt.Rectangle((0,0),1,1, facecolor='lightblue', edgecolor='black', label='起点'),
                plt.Rectangle((0,0),1,1, facecolor='gold', edgecolor='black', label='终点'),
                plt.Rectangle((0,0),1,1, facecolor='gray', edgecolor='black', label='障碍物'),
                plt.Rectangle((0,0),1,1, facecolor='lightgreen', edgecolor='black', label='正奖励'),
                plt.Rectangle((0,0),1,1, facecolor='lightcoral', edgecolor='black', label='负奖励'),
                plt.Circle((0,0),0.3, facecolor='red', edgecolor='black', label='智能体')
            ]
            ax.legend(handles=legend_elements, loc='upper right', bbox_to_anchor=(1.15, 1), fontsize=10)
            ax.invert_yaxis()
            plt.tight_layout()
            plt.show()
            return fig, ax
        
        def close(self):
            if hasattr(self, 'viewer') and self.viewer is not None:
                plt.close('all')

# 创建环境实例
env = GridWorldEnv()

"""定义格子世界参数"""
world_h = 5
world_w = 5
length = world_h * world_w
gamma = 0.8
state = [i for i in range(length)]  # 状态（编号）
action = ['n', 's', 'w', 'e']  # 动作名称
ds_action = {'n': -world_w, 'e': 1, 's': world_w, 'w': -1}
policy = np.zeros([length, len(action)])
suqe = [20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4]

# 定义奖励函数 - 注意：这里的奖励设置与GridWorldEnv中的不同
def reward(s):
    if s == 20:  # 到充电站
        return 1
    elif s == 12:  # 到陷阱中
        return -10
    elif s == 9:  # 到垃圾处
        return 3
    else:
        return 0  # 其他

# 将动作名称转换为数字索引
def getAction(a):
    if a == 'n':
        return 0
    elif a == 'e':
        return 3
    elif a == 's':
        return 1
    elif a == 'w':
        return 2

# 在s状态下执行动作a，返回下一状态（编号）
def next_states(s, a):
    # 越过边界时pass
    if (s < world_w and a == 'n') \
            or (s % world_w == 0 and a == 'w') \
            or (s > length - world_w - 1 and a == 's') \
            or ((s + 1) % world_w == 0 and a == 'e'):
        next_state = s  # 表现为next_state不变
    else:
        next_state = s + ds_action[a]  # 进入下一个状态
    return next_state

# 在s状态下执行动作，返回所有可能的下一状态（编号）list
def getsuccessor(s):
    successor = []
    for a in action:  # 遍历四个动作
        if s == next_states(s, a):
            continue
        else:
            next = next_states(s, a)  # 得到下一个状态（编号）
        successor.append(next)  # 以list保存当前状态s下执行四个动作的下一状态
    return successor

# 初始化策略 - 均匀随机策略
def initPolicy():
    for s in range(length):
        for a in action:
            if next_states(s, a) == s:
                continue
            newAction = getAction(a)
            policy[s][newAction] = 1 / len(getsuccessor(s))
    print("策略初始化完成")
    print("策略矩阵形状:", policy.shape)

# 策略评估函数
def policy_eval(theta=0.0001):
    V = np.zeros(length)  # 初始化状态值函数列表
    iter = 0

    while True:
        k = -1
        delta = 0  # 定义最大差值，判断是否有进行更新
        
        print(f"\n=== 第 {iter+1} 次迭代 ===")
        
        for s in suqe:  # 遍历所有状态 [20,21,...,0,1,2,3,4]
            k += 1
            if s in [9, 20, 12]:  # 若当前状态为终止状态，则直接pass不做操作
                continue
                
            v = 0  # 针对每个状态值函数进行计算
            print(f"状态 {s}: ", end="")
            
            # 遍历所有可能的动作
            for a in action:
                newAction = getAction(a)
                next_state = next_states(s, a)
                rewards = reward(next_state)
                
                # 计算值函数
                if next_state == 12:  # 特殊处理陷阱状态
                    v += policy[s][newAction] * (rewards + gamma * V[s])
                    print(" %.2f*(%d+%.1f*%.3f)+" % (policy[s][newAction], rewards, gamma, V[next_state]), end="")
                else:
                    v += policy[s][newAction] * (rewards + gamma * V[next_state])
                    print(" %.2f*(%d+%.1f*%.3f)+" % (policy[s][newAction], rewards, gamma, V[next_state]), end="")
            
            print(" => v = %.3f" % (v))
            delta = max(delta, np.abs(v - V[s]))  # 更新差值
            V[s] = v  # 存储(更新)每个状态下的状态值函数
        
        # 将值函数重塑为网格形式并显示
        value = np.array(V).reshape(world_h, world_w)
        iter += 1
        
        print('\n迭代次数 k =', iter)
        print("当前的状态值函数为：")
        print(np.round(value, decimals=3))
        print(f"最大变化量 delta = {delta:.6f}")
        
        if delta < theta:  # 收敛判断
            print(f"\n策略评估收敛！共迭代 {iter} 次")
            break
            
        if iter >= 100:  # 防止无限循环
            print("达到最大迭代次数，强制停止")
            break
            
    return V  # 返回最终的状态值函数

# 可视化值函数
def plot_value_function(V):
    value_grid = np.array(V).reshape(world_h, world_w)
    
    fig, ax = plt.subplots(figsize=(10, 8))
    im = ax.imshow(value_grid, cmap='viridis', interpolation='nearest')
    
    # 添加数值标签
    for i in range(world_h):
        for j in range(world_w):
            text = ax.text(j, i, f'{value_grid[i, j]:.2f}',
                          ha="center", va="center", color="w", fontweight='bold')
    
    # 设置坐标轴
    ax.set_xticks(np.arange(world_w))
    ax.set_yticks(np.arange(world_h))
    ax.set_xticklabels(np.arange(world_w))
    ax.set_yticklabels(np.arange(world_h))
    ax.set_xlabel('X 坐标')
    ax.set_ylabel('Y 坐标')
    ax.set_title('状态值函数 V(s)')
    
    # 添加颜色条
    cbar = ax.figure.colorbar(im, ax=ax)
    cbar.ax.set_ylabel('值函数大小', rotation=-90, va="bottom")
    
    plt.tight_layout()
    plt.show()

# 主程序
if __name__ == "__main__":
    print("=" * 60)
    print("基于状态值函数的确定环境扫地机器人任务策略评估")
    print("=" * 60)
    
    # 初始化策略
    initPolicy()
    
    # 执行策略评估
    print("\n开始策略评估...")
    final_values = policy_eval(theta=0.001)
    
    # 可视化最终的值函数
    print("\n最终状态值函数:")
    final_value_grid = np.array(final_values).reshape(world_h, world_w)
    print(np.round(final_value_grid, decimals=3))
    
    # 绘制值函数热力图
    print("\n生成值函数可视化...")
    plot_value_function(final_values)
    
    # 使用环境渲染显示最终状态
    print("\n显示环境状态...")
    env.reset()
    env.render()
    
    print("\n策略评估完成！")

NameError: name '__file__' is not defined