In [9]:
# agent_pytorch.ipynb
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

class DQN(nn.Module):
    """完整的DQN网络，基于原始TF架构转换"""
    def __init__(self, input_shape, num_actions):
        super(DQN, self).__init__()
        
        # 基于原始TF代码的CNN架构
        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        
        # 计算卷积层输出尺寸
        conv_out_size = self._get_conv_out(input_shape)
        
        self.fc_layers = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, num_actions)
        )
    
    def _get_conv_out(self, shape):
        x = torch.zeros(1, *shape)
        x = self.conv_layers(x)
        return int(np.prod(x.size()))
    
    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

class StateProcessor:
    """状态处理器 - 不使用OpenCV"""
    def __init__(self):
        self.state_buffer = deque(maxlen=4)
    
    def process_state(self, state):
        """处理游戏状态"""
        if isinstance(state, np.ndarray):
            # 如果是彩色图像，转换为灰度
            if len(state.shape) == 3 and state.shape[2] == 3:
                # 手动RGB转灰度: 0.299*R + 0.587*G + 0.114*B
                state = np.dot(state[...,:3], [0.299, 0.587, 0.114])
            
            # 调整尺寸到84x84
            if state.shape != (84, 84):
                # 使用简单的调整尺寸方法
                from scipy.ndimage import zoom
                zoom_factors = (84/state.shape[0], 84/state.shape[1])
                state = zoom(state, zoom_factors, order=0)
            
            # 归一化
            state = state.astype(np.float32) / 255.0
            
        return state
    
    def get_state_stack(self, state):
        """获取状态堆栈"""
        processed_state = self.process_state(state)
        
        # 初始化状态缓冲区
        if len(self.state_buffer) == 0:
            for _ in range(4):
                self.state_buffer.append(processed_state)
        else:
            self.state_buffer.append(processed_state)
        
        # 堆叠状态
        state_stack = np.stack(self.state_buffer, axis=0)
        return state_stack

class DeepQLearningAgent:
    """完整的Deep Q-Learning Agent"""
    def __init__(self, state_size, action_size, learning_rate=1e-4):
        self.state_size = state_size
        self.action_size = action_size
        
        # 超参数
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = learning_rate
        self.memory = deque(maxlen=10000)
        self.batch_size = 32
        
        # 设备选择
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")
        
        # 网络
        self.model = DQN(state_size, action_size).to(self.device)
        self.target_model = DQN(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.loss_fn = nn.MSELoss()
        
        self.update_target_network()
    
    def update_target_network(self):
        """更新目标网络"""
        self.target_model.load_state_dict(self.model.state_dict())
    
    def remember(self, state, action, reward, next_state, done):
        """存储经验"""
        self.memory.append((state, action, reward, next_state, done))
    
    def act(self, state):
        """选择动作"""
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values = self.model(state)
        return torch.argmax(q_values).item()
    
    def replay(self):
        """经验回放训练"""
        if len(self.memory) < self.batch_size:
            return
        
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        # 转换为张量
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.BoolTensor(dones).to(self.device)
        
        # 当前Q值
        current_q = self.model(states).gather(1, actions.unsqueeze(1))
        
        # 目标Q值
        with torch.no_grad():
            next_q = self.target_model(next_states).max(1)[0]
        target_q = rewards + (self.gamma * next_q * ~dones)
        
        # 计算损失
        loss = self.loss_fn(current_q.squeeze(), target_q)
        
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # 衰减epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def load(self, name):
        """加载模型"""
        self.model.load_state_dict(torch.load(name))
    
    def save(self, name):
        """保存模型"""
        torch.save(self.model.state_dict(), name)

# 测试代码
if __name__ == "__main__":
    # 测试网络
    state_size = (4, 84, 84)
    action_size = 4
    
    agent = DeepQLearningAgent(state_size, action_size)
    print("✅ Agent created successfully!")
    
    # 测试状态处理器
    processor = StateProcessor()
    test_state = np.random.rand(100, 100, 3)
    processed = processor.get_state_stack(test_state)
    print(f"✅ State processing successful! Input: {test_state.shape}, Output: {processed.shape}")

Using device: cpu
✅ Agent created successfully!
✅ State processing successful! Input: (100, 100, 3), Output: (4, 84, 84)
