In [1]:
import numpy as np
import random

# 网格世界环境
class GridWorld:
    def __init__(self, size=5, goal=(4, 4)):
        self.size = size  # 网格大小
        self.goal = goal  # 目标位置
        self.actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # 上下左右
        self.reset()

    def reset(self):
        self.agent_pos = (0, 0)  # 起始位置
        return self.agent_pos

    def step(self, action):
        next_pos = (self.agent_pos[0] + self.actions[action][0],
                    self.agent_pos[1] + self.actions[action][1])
        # 边界限制
        next_pos = (max(0, min(self.size - 1, next_pos[0])),
                    max(0, min(self.size - 1, next_pos[1])))
        self.agent_pos = next_pos
        # 奖励
        reward = 10 if self.agent_pos == self.goal else -1
        done = self.agent_pos == self.goal
        return self.agent_pos, reward, done

# 线性逼近 Q 学习
class LinearApproximationQLearning:
    def __init__(self, action_dim, alpha=0.1, gamma=0.99, epsilon=0.1):
        self.state_dim = len(self.get_features((0, 0)))  # 状态特征维度
        self.action_dim = action_dim  # 动作数量
        self.alpha = alpha  # 学习率
        self.gamma = gamma  # 折扣因子
        self.epsilon = epsilon  # 探索率
        # 初始化权重
        self.weights = np.random.randn(action_dim, self.state_dim) * 0.01

    def get_features(self, state):
        # 归一化位置 (x, y)，将状态映射为特征向量
        x, y = state
        return np.array([x / 4.0, y / 4.0, (x / 4.0) ** 2, (y / 4.0) ** 2, (x * y) / 16.0, 1.0])  # 多项式

    def q_value(self, state, action):
        # 计算 Q 值：线性函数 w^T * phi(s)
        features = self.get_features(state)
        return np.dot(self.weights[action], features)

    def choose_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)  # 探索
        else:
            # 贪婪选择最大 Q 值的动作
            q_values = [self.q_value(state, a) for a in range(self.action_dim)]
            return np.argmax(q_values)

    def update_incremental(self, state, action, reward, next_state, done):
        # 提取特征向量
        features = self.get_features(state)
        # 计算目标
        if done:
            target = reward
        else:
            next_q_values = [self.q_value(next_state, a) for a in range(self.action_dim)]
            target = reward + self.gamma * max(next_q_values)
        # Q 学习更新权重
        td_error = target - self.q_value(state, action)
        self.weights[action] += self.alpha * td_error * features
    
    def update_batch(self, batch):
        # batch: [(state, action, reward, next_state, done), ...]
        total_gradients = np.zeros_like(self.weights)  # 梯度初始化
        for state, action, reward, next_state, done in batch:
            features = self.get_features(state)
            if done:
                target = reward
            else:
                next_q_values = [self.q_value(next_state, a) for a in range(self.action_dim)]
                target = reward + self.gamma * max(next_q_values)
            td_error = target - self.q_value(state, action)
            total_gradients[action] += td_error * features
        # 平均梯度更新权重
        self.weights += self.alpha * total_gradients / len(batch)

# 训练线性逼近 Q 学习
def train(grid_world, agent, episodes=500):
    rewards = []
    for episode in range(episodes):
        update_epsilon(agent, episode, episodes)
        state = grid_world.reset()
        total_reward = 0
        done = False
        # 步数统计
        steps = 0
        while not done:
            action = agent.choose_action(state)
            next_state, reward, done = grid_world.step(action)
            agent.update_incremental(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            steps += 1
        rewards.append(total_reward)
        if (episode + 1) % 50 == 0:
            print(f"Episode {episode + 1}, Total Reward: {total_reward}, Steps: {steps}")
    return rewards

def update_epsilon(agent, episode, max_episodes):
    agent.epsilon = max(0.01, agent.epsilon * (1 - episode / max_episodes))

# 初始化环境和代理
grid_world = GridWorld(size=5, goal=(4, 4))
agent = LinearApproximationQLearning(action_dim=4, epsilon=1.0)

# 训练
rewards = train(grid_world, agent, episodes=500)

# 测试最优策略
state = grid_world.reset()
done = False
path = [state]
while not done:
    action = agent.choose_action(state)
    state, _, done = grid_world.step(action)
    path.append(state)

print("Optimal Path:", path)


Episode 50, Total Reward: 3, Steps: 8
Episode 100, Total Reward: 2, Steps: 9
Episode 150, Total Reward: 3, Steps: 8
Episode 200, Total Reward: 3, Steps: 8
Episode 250, Total Reward: 3, Steps: 8
Episode 300, Total Reward: 3, Steps: 8
Episode 350, Total Reward: 3, Steps: 8
Episode 400, Total Reward: 3, Steps: 8
Episode 450, Total Reward: 3, Steps: 8
Episode 500, Total Reward: 3, Steps: 8
Optimal Path: [(0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (4, 1), (4, 2), (4, 3), (4, 4)]


In [2]:
# 批量训练线性逼近 Q 学习
def train_batch(grid_world, agent, episodes=500):
    rewards = []
    for episode in range(episodes):
        update_epsilon(agent, episode, episodes)
        state = grid_world.reset()
        total_reward = 0
        done = False
        batch = []
        # 步数统计
        steps = 0
        while not done:
            action = agent.choose_action(state)
            next_state, reward, done = grid_world.step(action)
            batch.append((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward
            steps += 1
        rewards.append(total_reward)
        # 批量更新
        agent.update_batch(batch)
        if (episode + 1) % 50 == 0:
            print(f"Episode {episode + 1}, Total Reward: {total_reward}, Steps: {steps}")
    return rewards
agent = LinearApproximationQLearning(action_dim=4, epsilon=1.0)

# 训练
rewards = train_batch(grid_world, agent, episodes=500)

# 测试最优策略
state = grid_world.reset()
done = False
path = [state]
while not done:
    action = agent.choose_action(state)
    state, _, done = grid_world.step(action)
    path.append(state)

print("Optimal Path:", path)

Episode 50, Total Reward: 3, Steps: 8
Episode 100, Total Reward: 3, Steps: 8
Episode 150, Total Reward: 3, Steps: 8
Episode 200, Total Reward: 3, Steps: 8
Episode 250, Total Reward: 3, Steps: 8
Episode 300, Total Reward: -99, Steps: 110
Episode 350, Total Reward: 3, Steps: 8
Episode 400, Total Reward: 3, Steps: 8
Episode 450, Total Reward: 3, Steps: 8
Episode 500, Total Reward: 3, Steps: 8
Optimal Path: [(0, 0), (1, 0), (2, 0), (2, 1), (3, 1), (3, 0), (3, 1), (3, 2), (3, 3), (4, 3), (4, 4)]


In [None]:
import random
from collections import deque

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def store(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)

# 经验回放+批量训练线性逼近 Q 学习
def train_batch_ReplayBuffer(grid_world, agent, episodes=500, batch_size=8):
    rewards = []
    # 初始化经验池
    buffer = ReplayBuffer(capacity=10000)
    for episode in range(episodes):
        update_epsilon(agent, episode, episodes)
        state = grid_world.reset()
        total_reward = 0
        done = False
        batch = []
        # 步数统计
        steps = 0
        while not done:
            action = agent.choose_action(state)
            next_state, reward, done = grid_world.step(action)
            buffer.store((state, action, reward, next_state, done))
            state = next_state
            # 如果经验池足够大，开始采样并更新
            if len(buffer) >= batch_size:
                batch = buffer.sample(batch_size)
                agent.update_batch(batch)
            total_reward += reward
            steps += 1
        rewards.append(total_reward)
        if (episode + 1) % 50 == 0:
            print(f"Episode {episode + 1}, Total Reward: {total_reward}, Steps: {steps}")
    return rewards

# 训练
rewards = train_batch_ReplayBuffer(grid_world, agent, episodes=500, batch_size=8)

# 测试最优策略
state = grid_world.reset()
done = False
path = [state]
while not done:
    action = agent.choose_action(state)
    state, _, done = grid_world.step(action)
    path.append(state)

print("Optimal Path:", path)

Episode 50, Total Reward: 3, Steps: 8
Episode 100, Total Reward: 3, Steps: 8
Episode 150, Total Reward: 3, Steps: 8
Episode 200, Total Reward: 3, Steps: 8
Episode 250, Total Reward: 3, Steps: 8
Episode 300, Total Reward: 3, Steps: 8
Episode 350, Total Reward: 3, Steps: 8
Episode 400, Total Reward: 3, Steps: 8
Episode 450, Total Reward: 3, Steps: 8
Episode 500, Total Reward: 3, Steps: 8
Optimal Path: [(0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (4, 1), (4, 2), (4, 3), (4, 4)]
