In [23]:

import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import deque
import random
import torch.optim as optim
import numpy as np
import gym
import os
import torch
import numpy as np
import random
import Env
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation
from torch.utils.tensorboard import SummaryWriter
from torchviz import make_dot, make_dot_from_trace
from IPython import display


In [24]:
# 超参数
train_eps = 2000
test_eps = 20
max_steps = 100000
gamma = 0.99
batch_size = 256
device = torch.device('cuda')
tau = 1e-3


In [25]:
class Actor(nn.Module):  #定义Actor网络
    def __init__(self, num_states, num_actions):
        super(Actor, self).__init__()
        self.linear1 = nn.Linear(num_states, 128)
        self.linear2 = nn.Linear(128, 128)
        self.linear3 = nn.Linear(128, 128)
        self.linear4 = nn.Linear(128, num_actions)


    def change(self, x):
        y = torch.zeros(x.shape).to(device)
        if len(x.shape) == 1:
            y[0] = x[0]*17.25 - 6.75
            y[1] = x[1]*25.0
            y[2] = x[2]*30
            y[3] = torch.abs(x[3])
        else :
            y[:,0] = x[:,0]*17.25 - 6.75
            y[:,1] = x[:,1]*25.0
            y[:,2] = x[:,2]*30
            y[:,3] = torch.abs(x[:,3])
        return y

    def forward(self, x):
        x = F.leaky_relu(self.linear1(x))
        x = F.leaky_relu(self.linear2(x))
        x = F.leaky_relu(self.linear3(x))
        x = torch.tanh(self.linear4(x))
        x = self.change(x)

        return x

In [26]:
class Critic(nn.Module):  #定义Critic网络
    def __init__(self, num_state_action, num_action_value=1, init_w=3e-2):
        super(Critic, self).__init__()
        self.linear1 = nn.Linear(num_state_action, 128)
        self.linear2 = nn.Linear(128, 128)
        self.linear3 = nn.Linear(128, num_action_value)

    def forward(self, state, action):
        # 按维数1拼接
        x = torch.cat([state, action], 1)
        x = F.leaky_relu(self.linear1(x))
        x = F.leaky_relu(self.linear2(x))
        x = self.linear3(x)
        return x

In [27]:
class ReplayBuffer:
    def __init__(self, capacity: int) -> None:
        self.capacity = capacity
        self.buffer = deque(maxlen=self.capacity)

    def push(self, transitions):
        '''_summary_
        Args:
            trainsitions (tuple): _description_
        '''
        self.buffer.append(transitions)

    def sample(self, batch_size: int, sequential: bool = True):
        if batch_size > len(self.buffer):
            batch_size = len(self.buffer)
        if sequential:  # sequential sampling
            rand = random.randint(0, len(self.buffer) - batch_size)
            batch = [self.buffer[i] for i in range(rand, rand + batch_size)]
            return zip(*batch)
        else:
            batch = random.sample(self.buffer, batch_size)
            return zip(*batch)

    def clear(self):
        self.buffer.clear()

    def __len__(self):
        return len(self.buffer)

In [28]:
class DDPG:
    def __init__(self, device, action_space, state_space, batch_size, gamma, tau):
        self.device = device
        self.critic = Critic(action_space + state_space, 1).to(device)
        self.actor = Actor(state_space, action_space).to(device)

        if os.path.exists('actor_dic'):
            self.actor.load_state_dict(torch.load('actor_dic'))
            self.critic.load_state_dict(torch.load('critic_dic'))

        self.target_critic = Critic(action_space + state_space, 1).to(device)
        self.target_actor = Actor(state_space, action_space).to(device)

        # 复制参数到目标网络
        for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
            target_param.data.copy_(param.data)

        for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
            target_param.data.copy_(param.data)

        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=5e-3)

        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-3)

        self.memory = ReplayBuffer(capacity=10000)
        self.batch_size = batch_size
        self.gamma = gamma
        self.tau = tau

    def sample_action(self, state):
        state = torch.FloatTensor(state).to(self.device)
        action = self.actor(state)
        return action.detach().cpu().numpy()

    @torch.no_grad()
    def predict_action(self, state):
        state = torch.FloatTensor(state).to(self.device)
        action = self.actor(state)
        return action.cpu().numpy()

    def update(self):
        if len(self.memory) < self.batch_size:  # 当memory中不满足一个批量时，不更新策略
            # print('no')
            return
        # 从经验回放中中随机采样一个批量的transition
        state, action, reward, next_state, done = self.memory.sample(self.batch_size)
        # 转变为张量

        state = torch.FloatTensor(np.array(state)).to(self.device)
        next_state = torch.FloatTensor(np.array(next_state)).to(self.device)
        action = torch.FloatTensor(np.array(action)).to(self.device)
        reward = torch.FloatTensor(reward).unsqueeze(1).to(self.device)
        done = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(self.device)

        # print(state,'state',next_state,'next_state',action,'action',reward,'reward',done,'done')

        # 计算下一时刻的预测动作价值
        next_action = self.target_actor(next_state)
        target_value = self.target_critic(next_state, next_action.detach())

        # 计算y_t
        expected_value = reward + (1.0 - done) * self.gamma * target_value
        expected_value = torch.clamp(expected_value, -np.inf, np.inf)

        # 计算critic_loss
        # print(state, 'state', action, 'action')
        actual_value = self.critic(state, action)

        # print(actual_value, 'actual_value', expected_value, 'expected_value')

        critic_loss = nn.MSELoss()(actual_value, expected_value.detach())
        # print("critic_loss",critic_loss)

        # 计算actor_loss
        actor_loss = self.critic(state, self.actor(state)).mean()

        # print('actor_loss',actor_loss)

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        # 软更新
        for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
            target_param.data.copy_(
                target_param.data * (1.0 - self.tau) +
                param.data * self.tau
            )
        for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
            target_param.data.copy_(
                target_param.data * (1.0 - self.tau) +
                param.data * self.tau
            )

In [29]:
def test(env, agent, test_eps, max_steps):
    print("Start Testing")
    rewards = []  # 记录所有回合的奖励

    for episode in range(test_eps):
        state = env.reset()
        ep_reward = 0

        fig = plt.figure(figsize=(4, 5))
        ax1 = Axes3D(fig)
        ax1.plot3D([0], [0], [0], 'red')

        x = []
        y = []
        z = []
        for step in range(max_steps):
            action = agent.predict_action(state)
            next_state, reward, done = env.step(action, 0.1)
            ep_reward += reward
            state = next_state

            x.append(env.x_b)
            y.append(env.y_b)
            z.append(-env.z_b)

            ax1.plot3D(x, y, z, 'red')
            display.display(fig)
            plt.pause(0.001)
            display.clear_output(wait=True)

            if done:
                break
        rewards.append(ep_reward)
        print(f"Episode：{episode + 1}/{test_eps}，Reward：{ep_reward:.2f}")
    print("Testing Complete")
    return rewards

In [30]:
def train(env, agent, train_eps, max_steps):
    print("Start Training")
    rewards = []  # 记录所有回合的奖励

    for episode in range(train_eps):

        state = env.reset()
        ep_reward = 0
        x = []
        y = []
        z = []

        if episode > 11100:
            fig = plt.figure(figsize=(4, 5))
            ax1 = Axes3D(fig)
            draw = 1
        else:
            draw = 0

        for step in range(max_steps):
            # print(state,'state')
            action = agent.sample_action(state)
            # print(action,'action')
            # print(action.shape,'action.shape')
            action = action + np.random.normal(0, 4, action.shape)

            next_state, reward, done = env.step(action, 0.1)
            # next_state, reward, done,_  = env.step(action)

            # print(reward)
            ep_reward += reward
            x.append(env.x_b)
            y.append(env.y_b)
            z.append(-env.z_b)
            agent.memory.push((state, action, reward, next_state, done))
            agent.update()
            state = next_state

            # print(next_state,'next_state','episode',episode)

            # display.clear_output(wait=True)

            if draw:
                print(next(agent.actor.parameters()), episode, 'actor')
                print(next(agent.critic.parameters()), episode, 'critic')
                ax1.plot3D(x, y, z, 'red')
                display.display(fig)
                plt.pause(0.001)
                display.clear_output(wait=True)

            if done:
                # display.clear_output(wait=True)
                break

        # print(ep_reward)

        if (episode + 1) % 10 == 0:
            print(next(agent.actor.parameters()), episode, 'actor')
            print(next(agent.critic.parameters()), episode, 'critic')
            print(f"Episode：{episode + 1}/{train_eps}，Reward：{ep_reward:.2f}")
        rewards.append(ep_reward)
        # print("X",x[-1])
        # print('--------------------------------------------------')
        # print("Y",y[-1])
        # print('--------------------------------------------------')
        # print("Z",z[-1])
        # print('--------------------------------------------------')
    print("Training Complete")
    return rewards

In [31]:
# env = gym.make('Pendulum-v1')
# env.action_space.shape[0], env.observation_space.shape[0]

In [32]:
env = Env.Plane(1, 60, 45, 0, 100)

In [33]:
agent = DDPG(device, env.action_space, env.observation_space, batch_size, gamma, tau)
train_res = train(env, agent, train_eps, max_steps)
test_res = test(env, agent, test_eps, max_steps)

torch.save(agent.target_critic.state_dict(), 'critic_dict')
torch.save(agent.target_actor.state_dict(), 'actor_dict')

Start Training
Parameter containing:
tensor([[-0.2634,  0.0504, -0.0517, -0.2467,  0.1856,  0.1771,  0.0969,  0.0209,
         -0.0363,  0.0901,  0.1060, -0.1355, -0.1766,  0.1491],
        [ 0.1980, -0.2405,  0.2095, -0.0072, -0.0700, -0.1445,  0.0481,  0.1439,
          0.2310, -0.1358,  0.2319,  0.1242,  0.1101, -0.2651],
        [ 0.1316, -0.0737,  0.2276,  0.0536,  0.1476, -0.1756,  0.2323,  0.0076,
         -0.1849,  0.1094,  0.1982,  0.0014,  0.2022,  0.1081],
        [ 0.0817, -0.0070,  0.0143, -0.2575,  0.2168, -0.2769, -0.1152, -0.2184,
          0.1386, -0.1119, -0.2183, -0.2596, -0.0675,  0.1359],
        [-0.0080,  0.2474, -0.1284, -0.2229,  0.0174, -0.0309,  0.0751,  0.0469,
         -0.0245, -0.1546, -0.1557,  0.2665, -0.0253, -0.0607],
        [ 0.2594,  0.2150,  0.2584,  0.1316, -0.1730,  0.2063, -0.1555,  0.1750,
         -0.2157, -0.2247, -0.0512,  0.0380, -0.0827, -0.1008],
        [ 0.1579,  0.1147, -0.0441, -0.0713,  0.0940,  0.0738,  0.1248,  0.2188,
         -0.

KeyboardInterrupt: 