In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

In [2]:
# 设备配置
device = torch.device("mps")

In [3]:
# 定义Actor网络（策略网络）
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.layer = nn.Sequential(
            nn.Linear(state_dim, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, action_dim),
            nn.Tanh()  # 输出范围[-1,1]
        )
        self.max_action = max_action

    def forward(self, state):
        return self.max_action * self.layer(state)

In [4]:
# 定义Critic网络（价值网络）
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, 1)
        )

    def forward(self, state, action):
        return self.net(torch.cat([state, action], 1))

In [13]:
for x in zip(
    *[
        [1,2,3,4,5],
        [6,7,8,9,10]
    ]
):
    print(x)

(1, 6)
(2, 7)
(3, 8)
(4, 9)
(5, 10)


In [5]:
# DDPG算法实现
class DDPG:
    def __init__(self, state_dim, action_dim, max_action):
        # 在线网络
        self.actor = Actor(state_dim, action_dim, max_action).to(device)
        self.critic = Critic(state_dim, action_dim).to(device)
        
        # 目标网络
        self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        
        # 同步目标网络参数
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())
        
        # 优化器
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
        
        # 经验回放缓冲区
        self.replay_buffer = deque(maxlen=1000000) ## 双端队列。
        
        # 超参数
        self.gamma = 0.99      # 折扣因子
        self.tau = 0.005       # 软更新系数
        self.batch_size = 100  # 批量大小

    def select_action(self, state, noise_scale=0.1):
        state = torch.FloatTensor(state).to(device)
        action = self.actor(state).cpu().data.numpy()
        # 添加探索噪声
        action += np.random.normal(
            0, ## 可以理解为均值。
            noise_scale, ## 可以理解为标准差。
            size=action.shape
        )
        return np.clip(
            action, 
            -self.actor.max_action, ## 【TODO】暂时没搞懂这个max_action是咩哇意思。
            self.actor.max_action
        )

    def store_transition(self, state, action, reward, next_state, done):
        self.replay_buffer.append((state, action, reward, next_state, done))

    def train(self):
        if len(self.replay_buffer) < self.batch_size:
            return
        
        # 从缓冲区采样
        ## 不过这些个batch，都是随机采的。
        batch = random.sample(self.replay_buffer, self.batch_size)

        ## 这里的，比如说state，就是一个batch里面所有的state组成的数组；
        ## action就是一个batch里所有的action组成的数组。
        ## 以此类推。
        state, action, reward, next_state, done = zip(*batch)
        
        state = torch.FloatTensor(np.array(state)).to(device)
        action = torch.FloatTensor(np.array(action)).to(device)
        reward = torch.FloatTensor(reward).unsqueeze(1).to(device)
        next_state = torch.FloatTensor(np.array(next_state)).to(device)
        done = torch.FloatTensor(done).unsqueeze(1).to(device)

        # 计算目标Q值
        next_action = self.actor_target(next_state)
        target_Q = self.critic_target(next_state, next_action)
        target_Q = reward + (1 - done) * self.gamma * target_Q.detach()

        # 更新Critic网络
        current_Q = self.critic(state, action)
        critic_loss = nn.MSELoss()(current_Q, target_Q)
        
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # 更新Actor网络
        actor_loss = -self.critic(state, self.actor(state)).mean()
        
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # 软更新目标网络
        for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        
        for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)



In [7]:
# 使用示例
env = YourEnvironment()  # 需要用户自定义环境
ddpg = DDPG(state_dim=env.observation_space.shape[0],
           action_dim=env.action_space.shape[0],
           max_action=env.action_space.high[0])

for episode in range(1000):
    state = env.reset()
    episode_reward = 0
    while True:
        action = ddpg.select_action(state) ## 用self.actor来选择动作。
        next_state, reward, done, _ = env.step(action) ## 然后实行动作。
        ddpg.store_transition(state, action, reward, next_state, done) ## 然后把实行的动作给它存起来。用来训练。
        ddpg.train() ## 拿存好的动作，来训练。
        episode_reward += reward
        state = next_state
        if done:
            break

NameError: name 'YourEnvironment' is not defined