### 悬崖漫步问题：
1. 要求一个智能体从起点出发，避开悬崖行走，最终到达目标位置。有一个 4×12 的网格世界，每一个网格表示一个状态。
2. 智能体的起点是左下角的状态，目标是右下角的状态，智能体在每一个状态都可以采取 4 种动作：上、下、左、右。
3. 如果智能体采取动作后触碰到边界墙壁则状态不发生改变，否则就会相应到达下一个状态。
4. 环境中有一段悬崖，智能体掉入悬崖或到达目标状态都会结束动作并回到起点，也就是说掉入悬崖或者达到目标状态是终止状态。
5. 智能体每走一步的奖励是 −1，掉入悬崖的奖励是 −100。

#### ReplayBuffer类：支撑off-policy

In [7]:
import random
import gym
import numpy as np
import abc # 用于定义抽象类和抽象方法的模块。

In [None]:
class ReplayBuffer():
    def __init__(self, max_size = 80):
        # 存储状态对的总量
        self.max_size = max_size
        self.buffer = []
    def push_transition(self, transition):
        if transition not in self.buffer: # 这是遍历做法，适合小样本
            self.buffer.append(transition)
            if len(self.buffer) > self.max_size:
                self.buffer = self.buffer[-self.max_size:] # 只保留最新的max_size个状态对(只去掉第一个)
                
    def sample(self, batch_size = 5):
        # 判定batch_size是否大于buffer长度
        if batch_size > self.max_size:
            raise ValueError("采样的长度大于经验回放池大小！")
        # 随机采样
        return random.sample(self.buffer, min(batch_size, len(self.buffer)))
    def isfull(self):
        """某些强化学习算法（如 off-policy Q-learning）会等 buffer 满了才开始更新；"""
        return len(self.buffer) == self.max_size

#### 对于各种 TD Learning 方法而言，除了更新Q估计的具体操作不同外，其他几乎都相同。故抽象出基类class Solver

强化学习中时序差分（TD）控制算法的抽象基类。

本类提供了基于值函数的控制方法（如 **Q-learning**、**SARSA**、**Expected SARSA**）的通用框架结构，  
封装了 ε-greedy 策略、Q 表初始化、贪婪策略更新、状态值计算等通用功能。

子类需实现 `update_Q_table()` 方法，以定义具体的值函数更新规则。

---

##### 参数说明

| 参数名 | 类型 | 说明 |
|--------|------|------|
| `env` | `gym.Env` | Gym 环境实例，需具有 **离散的 observation 和 action 空间** |
| `alpha` | `float` | 学习率，控制 Q 值更新的幅度 |
| `gamma` | `float` | 折扣因子，用于对未来奖励进行折现 |
| `epsilon` | `float` | ε-greedy 策略中的探索概率 |
| `seed` | `int or None` | 随机种子，确保探索策略可复现 |
| `replay_buffer_size` | `int` | 经验回放池最大容量，支持 off-policy 算法 |

---

##### 属性说明
| 属性名 | 类型 | 说明 |
|--------|------|------|
| `Q_table` | `np.ndarray` | 状态-动作值函数 Q(s, a) |
| `V_table` | `np.ndarray` | 状态值函数 V(s)，仅用于可视化 |
| `greedy_policy` | `List[np.ndarray]` | 每个状态下的最优动作集合（支持多个并列最优） |
| `policy_is_updated` | `bool` | 策略是否已根据最新 Q 表更新 |
| `rng` | `np.random.RandomState` | 控制探索行为的随机数生成器 |
| `replay_buffer` | `ReplayBuffer` | 经验回放缓存器，用于支持经验采样 |

---
##### 注意事项

- 本类为 **抽象基类**（使用 `abc` 模块），不能直接实例化；
- 必须由子类实现 `update_Q_table()` 方法；
- 仅支持 **离散状态空间和动作空间**；
- 策略提升使用 `greedy_policy`，支持多最优动作随机选择；
- 与 `ReplayBuffer` 配合可实现 off-policy 学习结构（如 DQN、Q-learning）。


In [None]:
class Solver():
    def __init__(self,env:gym.Env, alpha=0.1, gamma=0.9, epsilon=0.1, seed=None, replay_buffer_size=80):
        # 初始化函数
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        # 环境中提取动作空间大小和状态空间大小（要求 env.action_space 和 env.observation_space 必须是 gym.spaces.Discrete 类型）
        self.n_action = env.action_space.n
        self.n_state = env.observation_space.n
        # 初始化Q值表,每个状态-动作对的初始值设为 0（代表“完全不了解环境”）
        self.Q_table = np.zeros((self.n_state, self.n_action),dtype = np.float32)
        # 初始化当前策略的状态值
        self.V_table = np.zeros((self.n_state),dtype = np.float32)
        # 初始时默认每个动作都是最优的(每个状态对应的最优动作列表，而这个列表的长度是不确定的、可能不同的->object)
        self.greedy_policy = np.array([np.arange(self.n_action)] * self.n_state,dtype = object)
        # 标志变量：表示当前 greedy_policy 是否与最新 Q 表匹配(当 Q 值更新后，应将其设为 False)
        self.policy_is_updated = False
        self.rng = np.random.RandomState(1) # 每个智能体独立随机数，不影响全局
        # 设置经验回放池
        self.replay_buffer = ReplayBuffer(replay_buffer_size)
    
    def take_action(self, state):
        """用于epsilon-greedy策略选择动作,该部分属于policy improvement的范畴"""
        # 确保策略已经更新
        if not self.policy_is_updated:
            self.update_policy()
        # epsilon-greedy 策略选择动作
        if np.random.rand() < self.epsilon:
            return self.rng.randint(self.n_action) # 随机选择动作(从 [0, n) 中选一个整数)
        else:
            return self.rng.choice(self.greedy_policy[state]) # 从当前最优动作中随机选择一个，鼓励策略多样性
    
    def update_policy(self):
        """更新当前策略,从Q_table中提取最优动作"""
        # 找出所有最大的Q值对应的动作
        # 这里的np.where()返回的是一个元组，元组中包含了所有最大值的索引[0]将ndarray提取出来
        self.greedy_policy = np.array([np.where(self.Q_table[s] == np.max(self.Q_table[s]))[0] 
                                        for s in range(self.n_state)], dtype=object)
        # 策略更新标志设为 True
        self.policy_is_updated = True
    
    def update_V_table(self):
        """根据当前 Q 表和贪婪策略计算每个状态的状态值函数 V(s)
        若某个状态是接近终点或高奖励区域，那么它的状态值函数 V(s) 会较高；
        若某个状态是接近障碍物或低奖励区域，那么它的状态值函数 V(s) 会较低；
        如果 V 值从左到右、从起点向终点逐渐升高，说明策略在学习从起点走向目标；"""
        # 判断策略是否更新
        if not self.policy_is_updated:
            self.update_policy()
        # 计算每个状态对的状态函数(V(s)=max_a Q(s,a)=E_(a~pi(·|s))[Q(s,a)])
        for s in range(self.n_state):
            self.V_table[s] = self.Q_table[s][self.greedy_policy[s][0]]
            
    @abc.abstractmethod
    def update_Q_table(self):
        """抽象实现"""
        pass

#### 一、Sarsa：
1. Sarsa为一种on-policy算法，使用当前策略π来于环境交互。在每轮交互时先采样得到transition $(s_t,a_t,r_t,s_{t+1},a_{t+1})$。
2. **Bellman Expectation Equation**进行更新，并对当前策略进行评估：
$$
Q_\pi(s_t, a_t) \leftarrow Q_\pi(s_t, a_t) + \alpha \Bigl( r + \gamma Q_\pi(s_{t+1}, a_{t+1}) - Q_\pi(s_t, a_t) \Bigr)
$$
其中：
- $\alpha$：学习率
- $\gamma$：折扣因子
- $Q_\pi(s,a)$：当前策略下的状态-动作值
- $a_{t+1} \sim \pi(\cdot | s_{t+1})$：下一步动作依旧从当前策略中采样（如 ε-greedy）
3. 再使用贪婪算法选取某个状态下动作价值最大的那个动作。

In [10]:
class Sarsa(Solver):
    def __init__(self, env:gym.Env, alpha=0.1, gamma=0.9,epsilon=0.1,seed=None):
        # 让 Sarsa 自动执行它继承的父类,可以自动执行父类 __init__() 中的所有初始化逻辑。
        super().__init__(env, alpha, gamma, epsilon, seed)
        
    def update_Q_table(self, state, action, reward, next_state, next_action):
        td_target = reward + self.gamma * self.Q_table[next_state, next_action] # 计算 TD 目标
        td_error = td_target - self.Q_table[state, action]
        self.Q_table[state, action] += self.alpha * td_error
        self.policy_is_updated = False # 更新 Q 表后，策略需要更新

#### 二、Expected Sarsa:
1. **Expected SARSA 的更新公式：**
    $$
    Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[ r + \gamma \mathbb{E}_{a_{t+1} \sim \pi(\cdot|s_{t+1})} \left[ Q(s_{t+1}, a_{t+1}) \right] - Q(s_t, a_t) \right]
    $$
    在 ε-greedy 策略下，动作概率为：贪婪动作以概率 $ 1 - \epsilon + \frac{\epsilon}{|\mathcal{A}|} $ 被选择，其他动作以 $ \frac{\epsilon}{|\mathcal{A}|} $ 被选择。将其代入期望项可得：
    $$
    \mathbb{E}_{a' \sim \pi} \left[ Q(s', a') \right] =
    \epsilon \cdot \frac{1}{|\mathcal{A}|} \sum_{a'} Q(s', a') +
    (1 - \epsilon) \cdot \max_{a'} Q(s', a')
    $$
    因此，TD 目标为：
    $$
    Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[ r + \gamma \left( \epsilon \cdot \frac{1}{|\mathcal{A}|} \sum_{a'} Q(s_{t+1}, a') + (1 - \epsilon) \cdot \max_{a'} Q(s_{t+1}, a') \right) - Q(s_t, a_t) \right]
    $$
2. **Expected Sarsa** 每次给出的 TD target 都是无偏的，因此每一步更新的移动方向都会确定性地减小 TD error，这样下一步更新的移动距离也会相应减小，直到最后 TD error = 0 时，再优化的移动距离也减小到 0，就好像实现了一种自适应学习率的梯度下降优化，所以其学习率可以设置为1。
3. **SARSA 依赖行为策略，不能 Off-policy 使用**  
   SARSA 的 TD 目标使用的是行为策略 $ b $ 采样得到的动作 $ a' $，即：
   $$
   \text{TD-target}_{\text{SARSA}} = r + \gamma Q_b(s', b(s'))
   $$
   所以更新必须依赖生成数据的策略，不能复用旧数据。
4. **Expected SARSA 只依赖目标策略，可 Off-policy 使用**  
   Expected SARSA 的 TD 目标是：
   $$
   \text{TD-target}_{\text{Expected}} = r + \gamma \mathbb{E}_{a' \sim \pi(s')} [Q(s', a')]
   $$
   不依赖行为策略，可直接使用经验回放等旧样本实现 Off-policy 学习。
5. **Q-learning 是 Expected SARSA 的特例**  
   若目标策略$ \pi $ 是 greedy（贪婪）策略，则：
   $$
   \mathbb{E}_{a' \sim \pi(s')} [Q(s', a')] = \max_{a'} Q(s', a')
   $$
   即 Q-learning = Expected SARSA + 贪婪策略。


In [None]:
class ExpectedSarsa(Solver):
    def __init__(self, env:gym.Env, alpha=0.1, gamma=0.9,epsilon=0.1,seed=None):
        # 让 Sarsa 自动执行它继承的父类,可以自动执行父类 __init__() 中的所有初始化逻辑。
        super().__init__(env, alpha, gamma, epsilon, seed)
        
    def update_Q_table(self, state, action, reward, next_state, batch_size=0):
        # batch_size = 0为on-policy,否则为off-policy
        if batch_size == 0: 
            Q_Exp = (1-self.epsilon) * self.Q_table[next_state].max() + self.epsilon * self.Q_table[next_state].mean()
            td_target = reward + self.gamma * Q_Exp # 计算 TD 目标
            td_error = td_target - self.Q_table[state, action]
            self.Q_table[state, action] += self.alpha * td_error
        else:
            self.replay_buffer.push_transition(transition=(state, action, reward, next_state))
            transitions = self.replay_buffer.sample(batch_size)
            for s,a,r,s_ in transitions:
                Q_Exp = (1-self.epsilon) * self.Q_table[s_].max() + self.epsilon * self.Q_table[s_].mean()
                td_target = r + self.gamma * Q_Exp # 计算 TD 目标
                td_error = td_target - self.Q_table[s, a]
                self.Q_table[s, a] += self.alpha * td_error
        self.policy_is_updated = False # 更新 Q 表后，策略需要更新