In [1]:
import gym
import numpy as np
np.bool8 = np.bool_

In [2]:
# 创建环境
env = gym.make("Pong-v4")

A.L.E: Arcade Learning Environment (version 0.8.1+53f58b7)
[Powered by Stella]


In [3]:
# env.reset()
# state,_ = env.reset()
# done = False
# count = 0
# while not done:
# #     env.render()
#     action = int(np.random.choice([2,3]))
#     next_state, reward, done, truncated, _ = env.step(action)
#     print(action,reward)
#     count += 1
# print(count)

In [4]:
# env.close()

In [5]:
from torch import nn
import torch

In [6]:
def prepro(I):
    """将 210x160x3 uint8 帧预处理为 6400 (80x80) 1D float 向量"""
    I = I[35:195]  # 裁剪
    I = I[::2, ::2, 0]  # 下采样因子为 2
    I[I == 144] = 0  # 删除背景类型 1
    I[I == 109] = 0  # 删除背景类型 2
    I[I != 0] = 1  # 其他设置为 1
    return I.astype(np.float32).ravel()

In [145]:
import torch
import torch.nn as nn

class ImageCNN(nn.Module):
    def __init__(self, output_dim: int = 10):
        super(ImageCNN, self).__init__()
        
        # 特征提取部分（总参数：234）
        self.features = nn.Sequential(
            # 卷积层1：kernel_size=2 减少参数
            nn.Conv2d(in_channels=3, out_channels=3, kernel_size=8),  # 输出形状: (12, 209, 159)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=8)  # 输出形状: (12, 104, 79)
        )
        
        # 全局平均池化 + 分类器（总参数：12*output_dim + output_dim）
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),  # 输出形状: (12, 1, 1)
            nn.Flatten(),
            nn.Linear(12, output_dim)       # 输出形状: (batch_size, output_dim)
        )
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # 输入归一化
        x = x.float() / 255.0
        x = self.features(x)
        return self.classifier(x)

In [138]:
state,_ = env.reset()

In [139]:
state.shape

(210, 160, 3)

In [140]:
model = ImageCNN()

In [120]:
import numpy as np
import torch

def numpy_to_tensor(numpy_data: np.ndarray) -> torch.Tensor:
    """
    将形状为 (210, 160, 3) 的NumPy数组转换为PyTorch模型所需的Tensor格式
    步骤：
      1. 归一化到 [0,1]
      2. 转换通道顺序 HWC -> CHW
      3. 添加batch维度
      4. 转为PyTorch Tensor
    """
    # 验证输入形状
    assert numpy_data.shape == (210, 160, 3), \
        f"输入形状应为 (210, 160, 3)，但得到 {numpy_data.shape}"
    
    # 1. 转换为float32并归一化
    tensor_data = numpy_data.astype(np.float32) / 255.0
    
    # 2. 调整通道顺序 (HWC -> CHW)
    tensor_data = np.transpose(tensor_data, (2, 0, 1))  # 输出形状: (3, 210, 160)
    
    # 3. 添加batch维度 (CHW -> BCHW)
    tensor_data = np.expand_dims(tensor_data, axis=0)  # 输出形状: (1, 3, 210, 160)
    
    # 4. 转为PyTorch Tensor
    return torch.from_numpy(tensor_data)

In [146]:
tt = numpy_to_tensor(state)

In [147]:
tt.shape

torch.Size([1, 3, 210, 160])

In [148]:
model = ImageCNN()

In [149]:
model(tt).shape

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x3 and 12x10)

In [130]:
class PolicyNet(nn.Module):
    
    def __init__(self,input_dim,output_dim):
        super().__init__()
        self.project = ImageCNN(100)
        self.linear1 = nn.Linear(input_dim,200)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(200,output_dim)
        self.softmax = nn.Softmax(dim=-1)
    
    def forward(self,image_data):
        ### n
        state = self.project(image_data)
        state = state.squeeze(0)
#         print(state.shape)
        x = self.linear1(state)
        x = self.relu(x)
        x = self.linear2(x) # n
        x = self.softmax(x) # n
        return x

In [131]:
from torch.distributions import Categorical
import numpy as np
np.bool8 = np.bool_

from torch.optim import AdamW

In [132]:
class Agent:
    
    def __init__(self):
        self.policy_net = PolicyNet(100,2)
        self.optimizer = AdamW(self.policy_net.parameters(),lr=1e-3)
    
    def sample_action(self,state):
        probs = self.policy_net(state) # 4
        if np.random.uniform() < 0.2:
            action = np.random.randint(0,2)
            return action + 2, torch.log(probs[action]+1e-8)
        dist = Categorical(probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action.item()+2,log_prob
    
    def update(self,rewards,log_probs):
        ### 一次游戏时间
        ret = []
        adding = 0
        for r in rewards[::-1]:
            if r != 0:
                adding = 0
            adding = adding * 0.99 + r
            ret.insert(0,adding)
        ret = torch.FloatTensor(ret)
        ret = ret - ret.mean()
        ret = ret / (ret.std()+1e-8)
        
        r_log_probs = []
        for r,log_prob in zip(ret,log_probs):
            r_log_probs.append(-r*log_prob)
        r_log_probs = torch.vstack(r_log_probs)
        
        loss = r_log_probs.sum()
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss


In [133]:
def train(agent,env):
    success_count = []
    max_size = 2000
    for epoch in range(20000):
        rewards = []
        log_probs = []
        terminated = False
        state,_ = env.reset()
        prev_x = None
        while not terminated:
            diff = numpy_to_tensor(state)
            action, log_prob = agent.sample_action(diff)
            next_state, reward, terminated, truncated, _ = env.step(action)
            state = next_state
            rewards.append(reward)
            log_probs.append(log_prob)
        
        loss = agent.update(rewards,log_probs) 
        
        
        if (epoch+1) % 10 == 0:
#             torch.save('pong.pt',agent.policy_net)
            torch.save(agent.policy_net,'pong.pt')
            print(f'epoch: {epoch}, loss: {loss}, rewards: {sum(rewards)}, count: {len(rewards)}')

In [134]:
agent = Agent()

In [135]:
# torch.save(agent.policy_net,'pong.pt')

In [136]:
# torch.load('pong.pt')

In [137]:
env = gym.make("Pong-v4")
train(agent,env)

epoch: 9, loss: -1.5405373573303223, rewards: -21.0, count: 1264
epoch: 19, loss: 9.316274642944336, rewards: -20.0, count: 1303
epoch: 29, loss: -2.579474925994873, rewards: -21.0, count: 1452
epoch: 39, loss: 0.5740667581558228, rewards: -20.0, count: 1223
epoch: 49, loss: 0.934499204158783, rewards: -21.0, count: 1255
epoch: 59, loss: 1.7972705364227295, rewards: -20.0, count: 1536
epoch: 69, loss: -5.789192199707031, rewards: -21.0, count: 1437
epoch: 79, loss: -2.7945001125335693, rewards: -21.0, count: 1280
epoch: 89, loss: -4.030975818634033, rewards: -21.0, count: 1176
epoch: 99, loss: 3.4906625747680664, rewards: -21.0, count: 1500
epoch: 109, loss: 0.192344069480896, rewards: -21.0, count: 1181
epoch: 119, loss: 1.6203093528747559, rewards: -20.0, count: 1385
epoch: 129, loss: -6.714287757873535, rewards: -21.0, count: 1105
epoch: 139, loss: 1.8146634101867676, rewards: -21.0, count: 1427
epoch: 149, loss: 3.6673386096954346, rewards: -19.0, count: 1517
epoch: 159, loss: -3.0

epoch: 1249, loss: 0.1621636152267456, rewards: -21.0, count: 1154
epoch: 1259, loss: 1.0230016708374023, rewards: -20.0, count: 1391
epoch: 1269, loss: 3.307645082473755, rewards: -20.0, count: 1385
epoch: 1279, loss: 1.2871754169464111, rewards: -21.0, count: 1120
epoch: 1289, loss: -0.08570241928100586, rewards: -20.0, count: 1228
epoch: 1299, loss: 4.659457206726074, rewards: -21.0, count: 1268
epoch: 1309, loss: 6.132356643676758, rewards: -21.0, count: 1103
epoch: 1319, loss: -5.305800914764404, rewards: -21.0, count: 1181
epoch: 1329, loss: 3.8089215755462646, rewards: -21.0, count: 1352
epoch: 1339, loss: 7.208146095275879, rewards: -21.0, count: 1094
epoch: 1349, loss: 6.443857669830322, rewards: -20.0, count: 1469
epoch: 1359, loss: 0.12514591217041016, rewards: -21.0, count: 1423
epoch: 1369, loss: -7.249266624450684, rewards: -21.0, count: 1344
epoch: 1379, loss: 2.024204969406128, rewards: -21.0, count: 1406
epoch: 1389, loss: -1.8568624258041382, rewards: -21.0, count: 11

KeyboardInterrupt: 

In [None]:
def sample_action(self,state):
    probs = self.policy_net(state) # 4
    if np.random.uniform() < 0.0:
        action = np.random.randint(0,2)
        return action + 2, torch.log(probs[action]+1e-8)
    dist = Categorical(probs)
    action = dist.sample()
    log_prob = dist.log_prob(action)
    return action.item()+2,log_prob

# 替换方法
import types
agent.sample_action = types.MethodType(sample_action, agent)

In [None]:
import time
def visualize_agent(env, agent, num_episodes=5):
    """
    渲染显示智能体的行动
    """
    env = gym.make('CliffWalking-v0', render_mode='human')  # 创建可视化环境
    
    for episode in range(num_episodes):
        state_tuple = env.reset()
        state = state_tuple[0] if isinstance(state_tuple, tuple) else state_tuple
        total_reward = 0
        steps = 0
        done = False
        
        print(f"\nEpisode {episode + 1}")
        
        while not done:
            env.render()  # 渲染当前状态
            
            # 将状态转换为one-hot编码
            state_onehot = np.zeros(48)
            state_onehot[state] = 1
            
            # 使用训练好的策略选择动作
            with torch.no_grad():
                if np.random.random() < 0.0:
                    action = np.random.randint(0, 4)
                else:
                    state_tensor = torch.FloatTensor(state_onehot)
                    probs = agent.policy_net(state_tensor)
                    action = probs.argmax().item()  # 使用最可能的动作
            
            # 执行动作
            step_result = env.step(action)
            if len(step_result) == 4:
                next_state, reward, done, _ = step_result
            else:
                next_state, reward, terminated, truncated, _ = step_result
                done = terminated or truncated
            
            total_reward += reward
            steps += 1
            state = next_state
            
            # 添加小延迟使动作更容易观察
            time.sleep(0.5)
        
        print(f"Episode finished after {steps} steps. Total reward: {total_reward}")
    
    env.close()

# 在主程序最后添加：
if __name__ == "__main__":    
    # 训练完成后显示智能体行动
    print("\nVisualizing trained agent behavior...")
    env = gym.make('CliffWalking-v0',render_mode='human')
    visualize_agent(env, agent)

In [None]:
env.close()