In [59]:
# grid_world.py

import numpy as np

class GridWorld:
    def __init__(self, grid_matrix, default_reward=-1, terminal_rewards=None, gamma=0.9):
        """
        初始化网格世界环境。

        :param grid_matrix: n x m 的矩阵，定义网格的结构和奖励。
                            使用以下规则定义矩阵元素：
                            - 'S' : 起始位置
                            - 'G' : 目标位置
                            - 'X' : 障碍物
                            - 数值 : 特定位置的奖励
                            - 'O' 或其他字符 : 默认奖励
        :param default_reward: 默认步进奖励，默认为-1。
        :param terminal_rewards: 字典，指定特定位置的终止奖励。
        :param gamma: 折扣因子
        """
        self.grid_matrix = grid_matrix
        self.grid_size = (len(grid_matrix), len(grid_matrix[0]))
        self.default_reward = default_reward
        self.gamma = gamma
        self.terminal_rewards = terminal_rewards if terminal_rewards else {}
        
        # 动作空间：0: 上, 1: 右, 2: 下, 3: 左
        self.action_space = 4
        self.state_space = self.grid_size[0] * self.grid_size[1]

        # 解析网格矩阵，找到起始位置和目标位置
        self.start = None
        self.goal = None
        self.obstacles = []
        self.reward_map = {}  # {state: reward}

        for i in range(self.grid_size[0]):
            for j in range(self.grid_size[1]):
                cell = grid_matrix[i][j]
                state = self._pos_to_state((i, j))
                if cell == 'S':
                    self.start = (i, j)
                elif cell == 'G':
                    self.goal = (i, j)
                    self.reward_map[state] = self.terminal_rewards.get((i, j), 10)
                elif cell == 'X':
                    self.obstacles.append((i, j))
                elif isinstance(cell, (int, float)):
                    self.reward_map[state] = cell

        if self.start is None:
            raise ValueError("Grid must have a start position marked with 'S'.")
        if self.goal is None:
            raise ValueError("Grid must have a goal position marked with 'G'.")

        # 初始化 agent_pos 和 done 状态
        self.reset()

    def reset(self):
        """
        重置环境到起始状态。

        :return: 初始状态的整数表示。
        """
        self.agent_pos = self.start
        self.done = False
        return self._pos_to_state(self.agent_pos)

    def step(self, action):
        """
        执行动作，返回下一个状态、奖励、是否结束以及额外信息。

        :param action: 动作，整数0-3。
        :return: next_state, reward, done, info
        """
        if self.done:
            raise Exception("Episode has finished. Please reset the environment.")

        # 计算新位置
        new_pos = self._move(self.agent_pos, action)

        # 检查是否撞墙或进入障碍物
        if self._is_valid(new_pos):
            self.agent_pos = new_pos
        else:
            # 撞墙或障碍物，位置不变
            pass

        # 检查是否达到终止状态
        state = self._pos_to_state(self.agent_pos)
        if self.agent_pos == self.goal or self.agent_pos in self.terminal_rewards:
            reward = self.reward_map.get(state, self.default_reward)
            self.done = True
        else:
            reward = self.reward_map.get(state, self.default_reward)
            self.done = False

        next_state = state
        info = {}

        return next_state, reward, self.done, info

    def _move(self, position, action):
        """
        根据动作计算新位置。

        :param position: 当前的位置，元组(x, y)。
        :param action: 动作，整数0-3。
        :return: 新的位置，元组(x, y)。
        """
        x, y = position
        if action == 0:  # 上
            x -= 1
        elif action == 1:  # 右
            y += 1
        elif action == 2:  # 下
            x += 1
        elif action == 3:  # 左
            y -= 1
        else:
            raise ValueError("Invalid action.")

        return (x, y)

    def _is_valid(self, position):
        """
        检查位置是否有效（在网格内且不是障碍物）。

        :param position: 位置，元组(x, y)。
        :return: 如果有效返回True，否则返回False。
        """
        x, y = position
        # 检查边界
        if x < 0 or x >= self.grid_size[0] or y < 0 or y >= self.grid_size[1]:
            return False
        # 检查障碍物
        if position in self.obstacles:
            return False
        return True

    def _pos_to_state(self, position):
        """
        将位置转换为状态编号。

        :param position: 位置，元组(x, y)。
        :return: 状态编号，整数。
        """
        x, y = position
        return x * self.grid_size[1] + y

    def _state_to_pos(self, state):
        """
        将状态编号转换为位置。

        :param state: 状态编号，整数。
        :return: 位置，元组(x, y)。
        """
        x = state // self.grid_size[1]
        y = state % self.grid_size[1]
        return (x, y)

    def render(self):
        """
        打印当前网格的状态。
        """
        grid = [['O' for _ in range(self.grid_size[1])] for _ in range(self.grid_size[0])]

        # 设置障碍物
        for obs in self.obstacles:
            grid[obs[0]][obs[1]] = 'X'

        # 设置奖励位置
        for state, reward in self.reward_map.items():
            pos = self._state_to_pos(state)
            if grid[pos[0]][pos[1]] == 'O':
                grid[pos[0]][pos[1]] = f"{reward}"

        # 设置目标
        grid[self.goal[0]][self.goal[1]] = 'G'

        # 设置智能体
        grid[self.agent_pos[0]][self.agent_pos[1]] = 'A'

        # 打印网格
        for row in grid:
            print(' '.join(row))
        print()


In [60]:
def simulate_step(env, state, action):
    """
    模拟在特定状态下执行动作的结果。

    :param env: 环境对象。
    :param state: 当前状态。
    :param action: 动作。
    :return: next_state, reward, done, info
    """
    # 保存当前状态
    original_pos = env.agent_pos
    env.agent_pos = env._state_to_pos(state)

    # 执行动作
    next_state, reward, done, info = env.step(action)

    # 恢复原始状态
    env.agent_pos = original_pos
    env.done = False

    return next_state, reward, done, info


In [61]:
grid_matrix = [
    ['S', 'O', 'O', 'O', 'O'],
    ['O', 'X',  -5, 'X', 'O'],
    ['O', 'O', 'X', 'O', 'O'],
    ['O', 'O', 'O', 'X', 'O'],
    ['O', 'O', 'O', 'O', 'G']
]

# 定义终止奖励（例如目标位置）
terminal_rewards = {
    (4, 4): 10
}

# 创建 GridWorld 环境
env = GridWorld(
    grid_matrix=grid_matrix,
    default_reward=-1,
    terminal_rewards=terminal_rewards,
    gamma=0.9
)

# 初始化价值函数 V(s) = 0
V = np.zeros(env.state_space)
gamma = env.gamma

# 设置收敛阈值和最大迭代次数
theta = 1e-4
max_iterations = 1000

# 进行一次迭代
i = 0
delta = 0
state = 0  # 示例状态
print("价值函数 V(s) 在第 1 次迭代前的值:")
print(V.reshape(env.grid_size))
print(f"开始第 {i + 1} 次迭代")
for state in range(env.state_space):
    pos = env._state_to_pos(state)

    # 跳过障碍物
    if pos in env.obstacles:
        print(f"状态 {state} 位置 {pos} 是障碍物，跳过")
        continue

    # 计算所有动作的价值
    action_values = []
    print(f"评估状态 {state} 位置 {pos}")
    for action in range(env.action_space):
        next_state, reward, done, _ = simulate_step(env, state, action)
        if done:
            action_value = reward
            print(f"  动作 {action} 导致终止状态: 奖励 = {reward}")
        else:
            action_value = reward + gamma * V[next_state]
            print(f"  动作 {action}: 下一状态 {next_state}, 奖励 = {reward}, 价值 = {V[next_state]:.2f}, Q值 = {action_value:.2f}")
        action_values.append(action_value)

    # 更新价值函数
    max_action_value = max(action_values)
    print(f"  状态 {state} 的最大动作价值: {max_action_value:.2f}")
    delta = max(delta, abs(max_action_value - V[state]))
    print(f"  更新前 V[{state}] = {V[state]:.2f}, 更新后 V[{state}] = {max_action_value:.2f}\n")
    V[state] = max_action_value
    print("grid_matrix:")
    print(V.reshape(env.grid_size))

print(f"第 {i + 1} 次迭代完成，delta={delta:.6f}")

# 显示价值函数
print("价值函数 V(s) 在第 1 次迭代后的值:")
print(V.reshape(env.grid_size))

价值函数 V(s) 在第 1 次迭代前的值:
[[0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
开始第 1 次迭代
评估状态 0 位置 (0, 0)
  动作 0: 下一状态 0, 奖励 = -1, 价值 = 0.00, Q值 = -1.00
  动作 1: 下一状态 1, 奖励 = -1, 价值 = 0.00, Q值 = -1.00
  动作 2: 下一状态 5, 奖励 = -1, 价值 = 0.00, Q值 = -1.00
  动作 3: 下一状态 0, 奖励 = -1, 价值 = 0.00, Q值 = -1.00
  状态 0 的最大动作价值: -1.00
  更新前 V[0] = 0.00, 更新后 V[0] = -1.00

grid_matrix:
[[-1.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]]
评估状态 1 位置 (0, 1)
  动作 0: 下一状态 1, 奖励 = -1, 价值 = 0.00, Q值 = -1.00
  动作 1: 下一状态 2, 奖励 = -1, 价值 = 0.00, Q值 = -1.00
  动作 2: 下一状态 1, 奖励 = -1, 价值 = 0.00, Q值 = -1.00
  动作 3: 下一状态 0, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  状态 1 的最大动作价值: -1.00
  更新前 V[1] = 0.00, 更新后 V[1] = -1.00

grid_matrix:
[[-1. -1.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]]
评估状态 2 位置 (0, 2)
  动作 0: 下一状态 2, 奖励 = -1, 价值 = 0.00, Q值 = -1.00
  动作 1: 下一状态 3, 奖励 = -1, 

假设只存在value 函数，并且policy就只是选择value最大的那个选项

In [62]:
# secont iteration

# 进行第二次迭代
i += 1
delta = 0
print("价值函数 V(s) 在第 2 次迭代前的值:")
print(V.reshape(env.grid_size))
print(f"开始第 {i + 1} 次迭代")
for state in range(env.state_space):
    pos = env._state_to_pos(state)

    # 跳过障碍物
    if pos in env.obstacles:
        print(f"状态 {state} 位置 {pos} 是障碍物，跳过")
        continue

    # 计算所有动作的价值
    action_values = []
    print(f"评估状态 {state} 位置 {pos}")
    for action in range(env.action_space):
        next_state, reward, done, _ = simulate_step(env, state, action)
        if done:
            action_value = reward
            print(f"  动作 {action} 导致终止状态: 奖励 = {reward}")
        else:
            action_value = reward + gamma * V[next_state]
            print(f"  动作 {action}: 下一状态 {next_state}, 奖励 = {reward}, 价值 = {V[next_state]:.2f}, Q值 = {action_value:.2f}")
        action_values.append(action_value)

    # 更新价值函数
    max_action_value = max(action_values)
    print(f"  状态 {state} 的最大动作价值: {max_action_value:.2f}")
    delta = max(delta, abs(max_action_value - V[state]))
    print(f"  更新前 V[{state}] = {V[state]:.2f}, 更新后 V[{state}] = {max_action_value:.2f}\n")
    V[state] = max_action_value
    print("grid_matrix:")
    print(V.reshape(env.grid_size))

print(f"第 {i + 1} 次迭代完成，delta={delta:.6f}")

# 显示价值函数
print("价值函数 V(s) 在第 2 次迭代后的值:")
print(V.reshape(env.grid_size))

价值函数 V(s) 在第 2 次迭代前的值:
[[-1.  -1.  -1.  -1.  -1. ]
 [-1.   0.  -1.9  0.  -1. ]
 [-1.  -1.   0.  -1.  -1. ]
 [-1.  -1.  -1.   0.  10. ]
 [-1.  -1.  -1.  10.  10. ]]
开始第 2 次迭代
评估状态 0 位置 (0, 0)
  动作 0: 下一状态 0, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  动作 1: 下一状态 1, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  动作 2: 下一状态 5, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  动作 3: 下一状态 0, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  状态 0 的最大动作价值: -1.90
  更新前 V[0] = -1.00, 更新后 V[0] = -1.90

grid_matrix:
[[-1.9 -1.  -1.  -1.  -1. ]
 [-1.   0.  -1.9  0.  -1. ]
 [-1.  -1.   0.  -1.  -1. ]
 [-1.  -1.  -1.   0.  10. ]
 [-1.  -1.  -1.  10.  10. ]]
评估状态 1 位置 (0, 1)
  动作 0: 下一状态 1, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  动作 1: 下一状态 2, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  动作 2: 下一状态 1, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  动作 3: 下一状态 0, 奖励 = -1, 价值 = -1.90, Q值 = -2.71
  状态 1 的最大动作价值: -1.90
  更新前 V[1] = -1.00, 更新后 V[1] = -1.90

grid_matrix:
[[-1.9 -1.9 -1.  -1.  -1. ]
 [-1.   0.  -1.9  0.  -1. ]
 [-1.  -1.   0.  -1.  -1. ]
 [-1.  -1.  -1.   0.  10. ]
 [-1.  -1

我们可以把代码对应到公式

The actual value function is the expected return

$$
v^\pi(s) = \mathbb{E}[G_t \mid S_t = s, \pi] = \mathbb{E}[R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + \dots \mid S_t = s, \pi]
$$


Bellman Optimality Equation：

$$
V^*(s) = \max_a \mathbb{E} \left[ R_{t+1} + \gamma V^*(S_{t+1}) \mid S_t = s, A_t = a \right]
$$



### Expanding Expectation

By definition of expectation:

$$
V^*(s) = \max_a \sum_{s'} P(s' \mid s, a) \left[ R(s, a, s') + \gamma V^*(s') \right]
$$

where:
- \( P(s' \mid s, a) \) is the transition probability from \( s \) to \( s' \) given action \( a \).
- \( R(s, a, s') \) is the reward received when transitioning to \( s' \).
- \( V^*(s') \) is the estimated value of the next state.

### Deterministic Case

If the environment is deterministic, then for a given \( s, a \), there is only one possible next state \( s' \). This simplifies the equation to:

$$
V^*(s) = \max_a \left[ R(s, a) + \gamma V^*(s') \right]
$$

### Iterative Approximation

To approximate \( V^*(s) \), we define an iterative update:

$$
V_{k+1}(s) = \max_a \sum_{s'} P(s' \mid s, a) \left[ R(s, a, s') + \gamma V_k(s') \right]
$$

where:
- $ V_k(s) $ is the value function at iteration k
- We iterate until convergence, where $V_{k+1}(s) \approx V_k(s) $

This forms the basis of the **Value Iteration Algorithm**.


In [63]:
"""
grid_matrix:
[[-1.9  -1.9  -1.9  -1.9  -1.9 ]
 [-1.9   0.   -2.71  0.   -1.9 ]
 [-1.9  -1.9   0.   -1.9   8.  ]
 [-1.9  -1.9  -1.9   0.   10.  ]
 [-1.9  -1.9  -1.   10.   10.  ]]
评估状态 22 位置 (4, 2)
  动作 0: 下一状态 17, 奖励 = -1, 价值 = -1.90, Q值 = -2.71
  动作 1: 下一状态 23, 奖励 = -1, 价值 = 10.00, Q值 = 8.00
  动作 2: 下一状态 22, 奖励 = -1, 价值 = -1.00, Q值 = -1.90
  动作 3: 下一状态 21, 奖励 = -1, 价值 = -1.90, Q值 = -2.71
  状态 22 的最大动作价值: 8.00
  更新前 V[22] = -1.00, 更新后 V[22] = 8.00
"""
print('')




其中一个print
更新了-1到 8的迭代

对于动作0 $R_{t+1}=-1 V(S_{t+1})=1.90$

对于动作1 $R_{t+1}=-1 V(S_{t+1})=10$ 

对于动作2 $R_{t+1}=-1 V(S_{t+1})=-1$

对于动作3 $R_{t+1}=-1 V(S_{t+1})=-1.90$



reward + gamma * V[next_state]

其中最大的是动作1，所以把V(s)更新为-1+10*0.9(gamma)=8

## Proof of Value Iteration Convergence

### **1. Bellman Optimality Operator**
Define the Bellman optimality operator $\mathcal{T}^*$:

$$
(\mathcal{T}^*[V])(s) = \max_a \sum_{s'} P(s' \mid s,a) 
\Big( R(s,a,s') + \gamma V(s') \Big).
$$

### **2. Contraction Property**
For any two value functions $V_1$ and $V_2$:

$$
\begin{aligned}
\bigl\lVert \mathcal{T}^*[V_1] - \mathcal{T}^*[V_2] \bigr\rVert_\infty
&= \max_{s} \Bigl\lvert \max_a \sum_{s'} P(s' \mid s,a)\bigl(R + \gamma V_1(s')\bigr) \\
&\quad - \max_a \sum_{s'} P(s' \mid s,a)\bigl(R + \gamma V_2(s')\bigr) \Bigr\rvert.
\end{aligned}
$$

Using the **triangle inequality** and properties of the max operator, we get:

$$
\bigl\lVert \mathcal{T}^*[V_1] - \mathcal{T}^*[V_2] \bigr\rVert_\infty
\leq \gamma\, \bigl\lVert V_1 - V_2 \bigr\rVert_\infty.
$$

Since $0 \leq \gamma < 1$, $\mathcal{T}^*$ is a **contraction mapping**.

### **3. Existence of Unique Fixed Point $V^*$**
Define $V^*$ as the unique fixed point of $\mathcal{T}^*$:

$$
\mathcal{T}^*[V^*] = V^*.
$$

The existence and uniqueness of $V^*$ follows from the **Banach Fixed-Point Theorem**.

### **4. Convergence of Value Iteration**
Starting from any initial $V_0$, the iteration:

$$
V_{k+1} = \mathcal{T}^*[V_k]
$$

produces a sequence $V_0, V_1, V_2, \dots$ that converges to $V^*$ as $k \to \infty$.

Thus, **Value Iteration approximates $V^*$ as the number of iterations increases**.


In [64]:
import numpy as np

def value_iteration(env, theta=1e-4, max_iterations=1000):
    """
    价值迭代算法。

    :param env: 环境对象。
    :param theta: 收敛阈值。
    :param max_iterations: 最大迭代次数。
    :return: 最优价值函数和最优策略。
    """
    V = np.zeros(env.state_space)
    gamma = env.gamma

    for i in range(max_iterations):
        delta = 0
        for state in range(env.state_space):
            pos = env._state_to_pos(state)

            # 跳过障碍物
            if pos in env.obstacles:
                continue

            # 计算所有动作的价值
            action_values = []
            for action in range(env.action_space):
                next_state, reward, done, _ = simulate_step(env, state, action)
                action_value = reward + gamma * V[next_state] if not done else reward
                action_values.append(action_value)

            # 更新价值函数
            max_action_value = max(action_values)
            delta = max(delta, abs(max_action_value - V[state]))
            V[state] = max_action_value

        print(f"Iteration {i + 1}: delta={delta}")
        if delta < theta:
            print("价值迭代收敛")
            break

    # 导出最优策略
    policy = np.zeros(env.state_space, dtype=int)
    for state in range(env.state_space):
        pos = env._state_to_pos(state)

        # 跳过障碍物
        if pos in env.obstacles:
            policy[state] = -1  # 表示无效动作
            continue

        # 选择最佳动作
        action_values = []
        for action in range(env.action_space):
            next_state, reward, done, _ = simulate_step(env, state, action)
            action_value = reward + gamma * V[next_state] if not done else reward
            action_values.append(action_value)
        best_action = np.argmax(action_values)
        policy[state] = best_action

    return V, policy



grid_matrix = [
    ['S', 'O', 'O', 'O', 'O'],
    ['O', 'X',  -5, 'X', 'O'],
    ['O', 'O', 'X', 'O', 'O'],
    ['O', 'O', 'O', 'X', 'O'],
    ['O', 'O', 'O', 'O', 'G']
]

# 定义终止奖励（例如目标位置）
terminal_rewards = {
    (4, 4): 10
}

env = GridWorld(
    grid_matrix=grid_matrix,
    default_reward=-1,
    terminal_rewards=terminal_rewards,
    gamma=0.9
)

V, policy = value_iteration(env)

print("最优价值函数:")
print(V.reshape(env.grid_size))

print("最优策略:")
action_mapping = {0: '↑', 1: '→', 2: '↓', 3: '←', -1: 'X'}
policy_grid = []
for state in range(env.state_space):
    action = policy[state]
    policy_grid.append(action_mapping.get(action, '?'))
policy_grid = np.array(policy_grid).reshape(env.grid_size)
print(policy_grid)

# 可视化策略
print("\n策略可视化:")
for i in range(env.grid_size[0]):
    row = ""
    for j in range(env.grid_size[1]):
        state = env._pos_to_state((i, j))
        if (i, j) in env.obstacles:
            row += " X "
        elif (i, j) == env.goal:
            row += " G "
        elif (i, j) == env.start:
            row += " S "
        else:
            action = policy[state]
            row += f" {action_mapping.get(action, '?')} "
    print(row)
print()


Iteration 1: delta=10.0
Iteration 2: delta=9.0
Iteration 3: delta=8.1
Iteration 4: delta=7.29
Iteration 5: delta=6.561
Iteration 6: delta=5.9049000000000005
Iteration 7: delta=5.3144100000000005
Iteration 8: delta=4.7829690000000005
Iteration 9: delta=0
价值迭代收敛
最优价值函数:
[[-0.434062  0.62882   1.8098    3.122     4.58    ]
 [ 0.62882   0.        0.62882   0.        6.2     ]
 [ 1.8098    3.122     0.        6.2       8.      ]
 [ 3.122     4.58      6.2       0.       10.      ]
 [ 4.58      6.2       8.       10.       10.      ]]
最优策略:
[['→' '→' '→' '→' '↓']
 ['↓' 'X' '↑' 'X' '↓']
 ['→' '↓' 'X' '→' '↓']
 ['→' '→' '↓' 'X' '↓']
 ['→' '→' '→' '→' '→']]

策略可视化:
 S  →  →  →  ↓ 
 ↓  X  ↑  X  ↓ 
 →  ↓  X  →  ↓ 
 →  →  ↓  X  ↓ 
 →  →  →  →  G 

