# 第9课：强化学习入门

## 学习目标
- 理解强化学习的基本概念
- 掌握 Q-Learning 算法
- 实现简单的强化学习环境
- 了解深度强化学习 (DQN)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
import random
import warnings
warnings.filterwarnings('ignore')

## 1. 强化学习基本概念

强化学习是一种通过与环境交互来学习的方法。

### 核心要素

- **Agent (智能体)**：学习和做决策的主体
- **Environment (环境)**：智能体交互的外部世界
- **State (状态)**：环境的当前情况
- **Action (动作)**：智能体可以采取的行为
- **Reward (奖励)**：环境对动作的反馈
- **Policy (策略)**：从状态到动作的映射

### 学习目标

最大化累积奖励：$R = \sum_{t=0}^{\infty} \gamma^t r_t$

In [None]:
# 可视化强化学习循环
fig, ax = plt.subplots(figsize=(10, 6))

# Agent
agent_circle = plt.Circle((0.2, 0.5), 0.1, color='lightblue', ec='black')
ax.add_patch(agent_circle)
ax.text(0.2, 0.5, 'Agent', ha='center', va='center', fontsize=12)

# Environment
env_rect = plt.Rectangle((0.6, 0.3), 0.3, 0.4, color='lightgreen', ec='black')
ax.add_patch(env_rect)
ax.text(0.75, 0.5, 'Environment', ha='center', va='center', fontsize=12)

# Arrows
ax.annotate('', xy=(0.6, 0.6), xytext=(0.3, 0.6),
            arrowprops=dict(arrowstyle='->', color='blue', lw=2))
ax.text(0.45, 0.65, 'Action', ha='center', fontsize=10, color='blue')

ax.annotate('', xy=(0.3, 0.4), xytext=(0.6, 0.4),
            arrowprops=dict(arrowstyle='->', color='red', lw=2))
ax.text(0.45, 0.35, 'State, Reward', ha='center', fontsize=10, color='red')

ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
ax.set_title('Reinforcement Learning Loop', fontsize=14)
plt.show()

## 2. 网格世界环境

In [None]:
class GridWorld:
    """简单的网格世界环境"""
    
    def __init__(self, size=4):
        self.size = size
        self.start = (0, 0)
        self.goal = (size-1, size-1)
        self.obstacles = [(1, 1), (2, 2)]  # 障碍物
        self.state = self.start
        
        # 动作: 0=上, 1=下, 2=左, 3=右
        self.actions = [0, 1, 2, 3]
        self.action_effects = {
            0: (-1, 0),  # 上
            1: (1, 0),   # 下
            2: (0, -1),  # 左
            3: (0, 1)    # 右
        }
    
    def reset(self):
        """重置环境"""
        self.state = self.start
        return self.state
    
    def step(self, action):
        """执行动作"""
        # 计算新位置
        dr, dc = self.action_effects[action]
        new_row = self.state[0] + dr
        new_col = self.state[1] + dc
        
        # 检查边界
        if 0 <= new_row < self.size and 0 <= new_col < self.size:
            new_state = (new_row, new_col)
            # 检查障碍物
            if new_state not in self.obstacles:
                self.state = new_state
        
        # 计算奖励
        if self.state == self.goal:
            reward = 10
            done = True
        elif self.state in self.obstacles:
            reward = -5
            done = False
        else:
            reward = -1  # 每步小惩罚，鼓励快速到达目标
            done = False
        
        return self.state, reward, done
    
    def render(self):
        """可视化当前状态"""
        grid = np.zeros((self.size, self.size))
        
        # 标记障碍物
        for obs in self.obstacles:
            grid[obs] = -1
        
        # 标记目标
        grid[self.goal] = 2
        
        # 标记智能体
        grid[self.state] = 1
        
        plt.figure(figsize=(6, 6))
        plt.imshow(grid, cmap='RdYlGn')
        plt.grid(True)
        
        for i in range(self.size):
            for j in range(self.size):
                if (i, j) == self.state:
                    plt.text(j, i, 'A', ha='center', va='center', fontsize=20)
                elif (i, j) == self.goal:
                    plt.text(j, i, 'G', ha='center', va='center', fontsize=20)
                elif (i, j) in self.obstacles:
                    plt.text(j, i, 'X', ha='center', va='center', fontsize=20)
        
        plt.title('Grid World')
        plt.show()

# 测试环境
env = GridWorld(size=4)
env.reset()
env.render()

## 3. Q-Learning 算法

Q-Learning 是一种经典的强化学习算法，学习状态-动作值函数 Q(s, a)。

### Q 值更新公式

$$Q(s, a) \leftarrow Q(s, a) + \alpha [r + \gamma \max_{a'} Q(s', a') - Q(s, a)]$$

其中：
- $\alpha$: 学习率
- $\gamma$: 折扣因子
- $r$: 即时奖励

In [None]:
class QLearning:
    """Q-Learning 算法"""
    
    def __init__(self, n_actions, learning_rate=0.1, gamma=0.99, epsilon=0.1):
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon
        
        # Q 表
        self.q_table = defaultdict(lambda: np.zeros(n_actions))
    
    def choose_action(self, state, training=True):
        """ε-贪婪策略选择动作"""
        if training and random.random() < self.epsilon:
            return random.randint(0, self.n_actions - 1)
        else:
            return np.argmax(self.q_table[state])
    
    def update(self, state, action, reward, next_state, done):
        """更新 Q 值"""
        current_q = self.q_table[state][action]
        
        if done:
            target_q = reward
        else:
            target_q = reward + self.gamma * np.max(self.q_table[next_state])
        
        # Q 值更新
        self.q_table[state][action] += self.lr * (target_q - current_q)
    
    def get_policy(self):
        """获取当前策略"""
        policy = {}
        for state, q_values in self.q_table.items():
            policy[state] = np.argmax(q_values)
        return policy

In [None]:
def train_qlearning(env, agent, n_episodes=1000):
    """训练 Q-Learning 智能体"""
    rewards_history = []
    
    for episode in range(n_episodes):
        state = env.reset()
        total_reward = 0
        done = False
        
        while not done:
            # 选择动作
            action = agent.choose_action(state)
            
            # 执行动作
            next_state, reward, done = env.step(action)
            
            # 更新 Q 值
            agent.update(state, action, reward, next_state, done)
            
            state = next_state
            total_reward += reward
        
        rewards_history.append(total_reward)
        
        if (episode + 1) % 200 == 0:
            avg_reward = np.mean(rewards_history[-100:])
            print(f'Episode {episode+1}, Average Reward (last 100): {avg_reward:.2f}')
    
    return rewards_history

# 训练
env = GridWorld(size=4)
agent = QLearning(n_actions=4, learning_rate=0.1, gamma=0.99, epsilon=0.2)

rewards = train_qlearning(env, agent, n_episodes=1000)

In [None]:
# 绘制学习曲线
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(rewards, alpha=0.3)
window = 50
smoothed = np.convolve(rewards, np.ones(window)/window, mode='valid')
plt.plot(range(window-1, len(rewards)), smoothed, color='red', linewidth=2)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Learning Curve')
plt.grid(True, alpha=0.3)

# 显示 Q 表热图
plt.subplot(1, 2, 2)
q_values = np.zeros((4, 4, 4))  # 状态 x 动作
for state, values in agent.q_table.items():
    q_values[state[0], state[1]] = values

# 显示最大 Q 值
max_q = np.max(q_values, axis=2)
plt.imshow(max_q, cmap='viridis')
plt.colorbar(label='Max Q Value')
plt.title('Learned Value Function')

plt.tight_layout()
plt.show()

In [None]:
# 可视化学习到的策略
def visualize_policy(env, agent):
    """可视化学习到的策略"""
    action_symbols = ['↑', '↓', '←', '→']
    
    plt.figure(figsize=(6, 6))
    
    for i in range(env.size):
        for j in range(env.size):
            state = (i, j)
            if state == env.goal:
                plt.text(j, i, 'G', ha='center', va='center', fontsize=20, color='green')
            elif state in env.obstacles:
                plt.text(j, i, 'X', ha='center', va='center', fontsize=20, color='red')
            else:
                action = agent.choose_action(state, training=False)
                plt.text(j, i, action_symbols[action], ha='center', va='center', fontsize=20)
    
    plt.xlim(-0.5, env.size - 0.5)
    plt.ylim(env.size - 0.5, -0.5)
    plt.grid(True)
    plt.title('Learned Policy')
    plt.show()

visualize_policy(env, agent)

In [None]:
# 测试学习到的策略
def test_policy(env, agent, n_episodes=10):
    """测试学习到的策略"""
    total_rewards = []
    
    for episode in range(n_episodes):
        state = env.reset()
        total_reward = 0
        done = False
        steps = 0
        
        while not done and steps < 50:
            action = agent.choose_action(state, training=False)
            state, reward, done = env.step(action)
            total_reward += reward
            steps += 1
        
        total_rewards.append(total_reward)
        print(f'Episode {episode+1}: Reward={total_reward}, Steps={steps}, Success={done}')
    
    print(f'\nAverage Reward: {np.mean(total_rewards):.2f}')
    print(f'Success Rate: {sum(r > 0 for r in total_rewards) / n_episodes * 100:.1f}%')

test_policy(env, agent)

## 4. 使用 Gymnasium

Gymnasium (原 OpenAI Gym) 是强化学习的标准环境库

In [None]:
# 安装: pip install gymnasium
try:
    import gymnasium as gym
    GYMNASIUM_AVAILABLE = True
    print("Gymnasium 已安装")
except ImportError:
    GYMNASIUM_AVAILABLE = False
    print("请先安装: pip install gymnasium")

In [None]:
if GYMNASIUM_AVAILABLE:
    import gymnasium as gym

    # 创建 FrozenLake 环境
    env_gym = gym.make('FrozenLake-v1', is_slippery=False)

    print(f"状态空间: {env_gym.observation_space}")
    print(f"动作空间: {env_gym.action_space}")
    print(f"动作含义: 0=左, 1=下, 2=右, 3=上")
else:
    print("跳过 Gymnasium 环境（未安装）")

In [None]:
if GYMNASIUM_AVAILABLE:
    # 在 Gymnasium 环境上训练
    def train_on_gym(env, n_episodes=2000):
        agent = QLearning(n_actions=env.action_space.n, 
                          learning_rate=0.8, gamma=0.95, epsilon=0.1)
        rewards_history = []
        
        for episode in range(n_episodes):
            state, _ = env.reset()
            total_reward = 0
            done = False
            truncated = False
            
            while not done and not truncated:
                action = agent.choose_action(state)
                next_state, reward, done, truncated, _ = env.step(action)
                agent.update(state, action, reward, next_state, done)
                state = next_state
                total_reward += reward
            
            rewards_history.append(total_reward)
            
            if (episode + 1) % 500 == 0:
                success_rate = np.mean(rewards_history[-100:]) * 100
                print(f'Episode {episode+1}, Success Rate: {success_rate:.1f}%')
        
        return agent, rewards_history

    agent_gym, rewards_gym = train_on_gym(env_gym)
else:
    print("跳过 Gymnasium 训练（未安装）")

## 5. Deep Q-Network (DQN) 简介

当状态空间很大时，Q 表不再适用，需要用神经网络来近似 Q 函数。

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

class DQN(nn.Module):
    """Deep Q-Network"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(DQN, self).__init__()
        
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, x):
        return self.network(x)

class ReplayBuffer:
    """经验回放缓冲区"""
    
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states), np.array(actions), 
                np.array(rewards), np.array(next_states), np.array(dones))
    
    def __len__(self):
        return len(self.buffer)

print("DQN 网络结构:")
print(DQN(state_dim=4, action_dim=2))

In [None]:
class DQNAgent:
    """DQN 智能体"""
    
    def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99, epsilon=1.0):
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        
        # Q 网络
        self.q_network = DQN(state_dim, action_dim)
        self.target_network = DQN(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.buffer = ReplayBuffer()
    
    def choose_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)
        
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values = self.q_network(state_tensor)
        return q_values.argmax().item()
    
    def train(self, batch_size=32):
        if len(self.buffer) < batch_size:
            return
        
        states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
        
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)
        
        # 计算当前 Q 值
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
        
        # 计算目标 Q 值
        with torch.no_grad():
            next_q = self.target_network(next_states).max(1)[0]
            target_q = rewards + (1 - dones) * self.gamma * next_q
        
        # 计算损失
        loss = nn.MSELoss()(current_q.squeeze(), target_q)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # 衰减 epsilon
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def update_target(self):
        """更新目标网络"""
        self.target_network.load_state_dict(self.q_network.state_dict())

In [None]:
if GYMNASIUM_AVAILABLE:
    # 在 CartPole 环境测试 DQN
    env_cartpole = gym.make('CartPole-v1')

    state_dim = env_cartpole.observation_space.shape[0]
    action_dim = env_cartpole.action_space.n

    print(f"状态维度: {state_dim}")
    print(f"动作数量: {action_dim}")
else:
    print("跳过 CartPole 环境（未安装 Gymnasium）")

In [None]:
if GYMNASIUM_AVAILABLE:
    def train_dqn(env, agent, n_episodes=500):
        rewards_history = []
        
        for episode in range(n_episodes):
            state, _ = env.reset()
            total_reward = 0
            done = False
            truncated = False
            
            while not done and not truncated:
                action = agent.choose_action(state)
                next_state, reward, done, truncated, _ = env.step(action)
                
                agent.buffer.push(state, action, reward, next_state, done)
                agent.train()
                
                state = next_state
                total_reward += reward
            
            # 定期更新目标网络
            if episode % 10 == 0:
                agent.update_target()
            
            rewards_history.append(total_reward)
            
            if (episode + 1) % 50 == 0:
                avg_reward = np.mean(rewards_history[-50:])
                print(f'Episode {episode+1}, Avg Reward: {avg_reward:.1f}, Epsilon: {agent.epsilon:.3f}')
        
        return rewards_history

    # 训练 (减少 episode 数用于演示)
    dqn_agent = DQNAgent(state_dim, action_dim)
    dqn_rewards = train_dqn(env_cartpole, dqn_agent, n_episodes=200)
else:
    print("跳过 DQN 训练（未安装 Gymnasium）")

## 6. 练习题

### 练习1：改进 Q-Learning
实现 ε 衰减策略，让智能体逐渐减少探索

In [None]:
# 在这里编写代码


### 练习2：扩展网格世界
增加更多障碍物和奖励，观察学习效果

In [None]:
# 在这里编写代码


## 7. 本课小结

### 强化学习核心概念

1. **状态 (State)**：环境的观测
2. **动作 (Action)**：智能体的行为
3. **奖励 (Reward)**：即时反馈
4. **策略 (Policy)**：状态到动作的映射
5. **价值函数**：长期奖励的期望

### 算法对比

| 算法 | 状态空间 | 优点 | 缺点 |
|------|----------|------|------|
| Q-Learning | 离散小规模 | 简单易实现 | 不适合大状态空间 |
| DQN | 连续/大规模 | 处理复杂环境 | 训练不稳定 |
| Policy Gradient | 连续动作 | 直接优化策略 | 高方差 |

### 进一步学习

1. **Policy Gradient 方法**：REINFORCE、A2C
2. **Actor-Critic 方法**：A3C、PPO、SAC
3. **模型学习**：World Models、Dreamer