# 十分钟强化学习第十一讲：DDPG方法

- Policy-based + Value-based方法
- 先使用经验池数据训练Q网络
- 再基于Q网络训练策略网络
- 在action中增加噪音进行探索
- 适合于连续的行动空间场景

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gym
import numpy as np
import random
import copy
from collections import deque

In [4]:
env = gym.make("Pendulum-v1")


In [5]:
print(env.action_space)

Box(-2.0, 2.0, (1,), float32)


In [6]:
print(env.observation_space)

Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)


In [7]:
env.reset()

(array([-0.6418603 , -0.76682156, -0.13603155], dtype=float32), {})

In [8]:
env.step([-1.0])

  if not isinstance(terminated, (bool, np.bool8)):


(array([-0.6742726, -0.7384826, -0.8611477], dtype=float32),
 -5.145396125494486,
 False,
 False,
 {})

In [9]:
class Pocily_net(nn.Module):
    def __init__(self, input_size, h1_size, h2_size,output_size):
        super().__init__()
        self.linear1 = nn.Linear(input_size, h1_size)
        self.linear2 = nn.Linear(h1_size, h2_size)
        self.linear3 = nn.Linear(h2_size, output_size)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = F.tanh(self.linear3(x))
        return x

In [10]:
class Value_net(nn.Module):
    def __init__(self, input_size, h1_size,h2_size, output_size):
        super().__init__()
        self.linear1 = nn.Linear(input_size, h1_size)
        self.linear2 = nn.Linear(h1_size+output_size, h2_size)
        self.linear3 = nn.Linear(h2_size, 1)

    def forward(self, x, action):
        x = F.relu(self.linear1(x))
        x = torch.cat((x, action), dim = 1)
        x = F.relu(self.linear2(x))
        x = self.linear3(x)
        return x  

In [11]:
class Noise:

    def __init__(self, size, mu=0., theta=0.15, sigma=0.2):
        self.mu = mu * np.ones(size)
        self.theta = theta
        self.sigma = sigma
        self.reset()

    def reset(self):
        self.state = copy.copy(self.mu)

    def sample(self):
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.array([random.random() for i in range(len(x))])
        self.state = x + dx
        return self.state

In [12]:
class Agent:
    def __init__(self,state_space, action_space,h1_size = 200,h2_size = 100,  gamma = 0.99,
                max_memory=50000, lr=0.001):
        self.memory = deque(maxlen=max_memory) 
        self.gamma = gamma
        self.online_value_model = Value_net(state_space,h1_size,h2_size,action_space)
        self.target_value_model = Value_net(state_space,h1_size,h2_size,action_space)
        self.online_policy_model = Pocily_net(state_space,h1_size,h2_size,action_space)
        self.target_policy_model = Pocily_net(state_space,h1_size,h2_size,action_space)
        self.value_optimizer = optim.Adam(self.online_value_model.parameters(), lr=lr)
        self.policy_optimizer = optim.Adam(self.online_policy_model.parameters(), lr=lr)
        self.noise = Noise(action_space)
        self.criterion = nn.MSELoss()
        self.copy_model()

    def copy_model(self):
        self.target_value_model.load_state_dict(self.online_value_model.state_dict())
        self.target_policy_model.load_state_dict(self.online_policy_model.state_dict())

    def train_step(self, experiences):
        states, actions, rewards, next_states, dones = experiences

        Q_value = self.online_value_model(states,actions)

        next_policy_action = self.target_policy_model(next_states)
        next_Q_value = self.target_value_model(next_states, next_policy_action)

        target_Q_value = (rewards + self.gamma * next_Q_value * (1 - dones))

        value_loss = self.criterion(Q_value,target_Q_value)
        self.value_optimizer.zero_grad()
        value_loss.backward()
        self.value_optimizer.step()

        policy_action = self.online_policy_model(states)
        policy_action_q = self.online_value_model(states,policy_action)
        policy_loss = -policy_action_q.mean()
        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()


    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) 

    def train_long_memory(self,batch_size):
        if len(self.memory) > batch_size:
            mini_sample = random.sample(self.memory, batch_size) # list of tuples

            states, actions, rewards, next_states, dones = zip(*mini_sample)
            states = np.array(states)
            actions = np.array(actions)
            next_states = np.array(next_states)
            experiences = self.load((states, actions, rewards, next_states, dones))
            self.train_step(experiences)


    def get_action(self, state, add_noise = True):
        state = torch.tensor(state, dtype=torch.float)
        action = self.online_policy_model(state).detach().numpy()
        action *= 2
        if add_noise:
            action += self.noise.sample()

        return np.clip(action, -2, 2)

    def reset(self):
        self.noise.reset()

    
    @staticmethod
    def load(experiences):
        states, actions, rewards, next_states, dones = experiences
        states = torch.tensor(states, dtype=torch.float)
        next_states = torch.tensor(next_states, dtype=torch.float)
        actions = torch.tensor(actions, dtype=torch.float)
        #actions = torch.unsqueeze(actions, -1)
        rewards = torch.tensor(rewards, dtype=torch.float)
        rewards =torch.unsqueeze(rewards, -1)
        dones = torch.tensor(dones, dtype=torch.long)
        dones =torch.unsqueeze(dones, -1)
        return states, actions, rewards, next_states, dones

In [13]:
def train(env, max_game=1000,  max_step=200, evl_step = 100):
    agent = Agent(state_space = 3, action_space = 1)
    scores = []

    for i in  range(max_game):

        state_new, _ = env.reset()
        agent.reset()
        done = False
        score = 0

        for t in range(max_step):
            state_old = state_new
            action = agent.get_action(state_old)
            state_new, reward, done, _, _ = env.step(action)
            agent.remember(state_old, action, reward, state_new, done)
            agent.train_long_memory(batch_size=256)
            score += reward

            if done:
                break

        agent.copy_model()
        scores.append(score)

        if (i>0) and (i % evl_step ==0):         
            print("Running episode  {}, avg reward {:.2f}. ".format(
                i, np.mean(scores[-100:])))


In [14]:
env = gym.make("Pendulum-v1")
train(env) 

  if not isinstance(terminated, (bool, np.bool8)):


Running episode  100, avg reward -388.93. 
Running episode  200, avg reward -208.34. 
Running episode  300, avg reward -151.56. 
Running episode  400, avg reward -194.24. 
Running episode  500, avg reward -179.47. 
Running episode  600, avg reward -254.59. 
Running episode  700, avg reward -181.43. 
Running episode  800, avg reward -168.52. 
Running episode  900, avg reward -194.77. 
