## Control in a continuous action space with DDPG

### Heuristic policy

In [1]:
import gym 
import numpy as np
from helpers import NormalizedEnv, RandomAgent
from matplotlib import pyplot
import torch

In [2]:
# Initialization

env = gym.make("Pendulum-v1")
norm_env = NormalizedEnv(env) # accept actions between -1 and 1

rand_ag = RandomAgent(norm_env)

In [3]:
# one episode with a defined number of steps

def episode(agent): 
    state, info = norm_env.reset()
    tot_reward = 0
    truncated = False

    while not truncated:
        action = agent.compute_action(state)
        next_state, reward, terminated, truncated, info = norm_env.step(action)
        tot_reward += reward
        
        if truncated:
            state, info = norm_env.reset()
            
    return tot_reward

In [4]:
# Execute 10 episodes 

arr_reward = []
num_ep = 10
for x in range(num_ep): 
    reward = episode(rand_ag)
    arr_reward.append(reward)

print("array of reward :", arr_reward)

av_reward = sum(arr_reward)/num_ep
print("average cumulative reward :", av_reward)

array of reward : [-1289.6196920408427, -1542.2102992915043, -1413.0309438154127, -918.1516068433682, -862.1930697438868, -1101.0063537917836, -1346.0926994053716, -1514.9667587227898, -1300.0919531774455, -1077.0732295215737]
average cumulative reward : -1236.443660635398


In [5]:
# Implementation of a heuristic policy for the pendulum
    
class HeuristicPendulumAgent:
    def __init__(self, env):
        self.state_size = env.observation_space.shape[0]
        self.action_size = env.action_space.shape[0]
        
    def compute_action(self, state):
        _, y, v = state
        torque = env.m*env.g*env.l*y # fixed torque
        action = np.empty((1,))
        
        if (y < 0):
            np.append(action, np.sign(v)*torque) # same direction to angular velocity
        else:
            np.append(action, (-1)*np.sign(v)*torque) # opposite direction to angular velocity
        return action


In [6]:
# Execute 10 episodes with Heuristic agent
heur_ag = HeuristicPendulumAgent(norm_env)

arr_reward = []
num_ep = 10
for x in range(num_ep): 
    reward = episode(heur_ag)
    arr_reward.append(reward)

print("array of reward :", arr_reward)

av_reward = sum(arr_reward)/num_ep
print("average cumulative reward :", av_reward)

array of reward : [-1666.0455191434114, -1687.151799593187, -1681.0061164486185, -1666.8698794844715, -1388.1506374083795, -1654.700067899993, -1639.1654516194753, -1650.1731060060715, -1676.455174969775, -1668.7891495776178]
average cumulative reward : -1637.8506902151003


## QNetwork

In [7]:
import random
import torch
import torch.nn as nn
import torch.nn.functional as F

class ReplayBuffer:
    def __init__(self, max_size):
        self.max_size = max_size
        self.buffer = []
        self.idx = 0

    def __len__(self):
        return len(self.buffer)

    def add(self, state, action, reward, next_state, trunc):
        transition = (state, action, reward, next_state, trunc)
        if len(self.buffer) < self.max_size:
            self.buffer.append(transition)
        else:
            # not sure about the behaviour when buffer is overloaded.
            self.buffer[self.idx] = transition
            self.idx = (self.idx + 1) % self.max_size

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, truncs = zip(*batch)
        return states, actions, rewards, next_states, truncs
    
class QNetwork(nn.Module):
    def __init__(self, agent, norm_env):
        super(QNetwork, self).__init__()
        self.agent = agent
        self.norm_env = norm_env
        self.fc1 = nn.Linear(4, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
    def update(self, transition):
        state = transition[:, :3]
        action = transition[:, 3]
        # Compute the TD target
        with torch.no_grad():
            targets = []
            for s, a in zip(state, action):
                next_state, reward, terminated, truncated, info = norm_env.step(a.numpy()) 
                next_actions = self.agent.compute_action(next_state)
                
                next_state, next_actions  = torch.Tensor(next_state).view(1, -1), torch.Tensor(next_actions).view(1, -1)
                q_next = self.forward(torch.cat([next_state, next_actions], dim=1))
                target = reward + gamma * q_next * (1 - truncated)
                targets.append(target[0])  
            targets = torch.Tensor(targets)
            
        q_values = self.forward(transition)
        
        loss = F.mse_loss(q_values.view(-1), targets)
        return loss
    

In [8]:
import torch.optim as optim
gamma = 0.01
model = QNetwork(heur_ag, norm_env)

state, info = norm_env.reset()

action = heur_ag.compute_action(state)

action = torch.Tensor([action])

  action = torch.Tensor([action])


In [9]:
state_t = torch.Tensor([state, state, state])
action_t = torch.Tensor([action, action, action]).view(-1, 1)

transition = torch.cat([state_t, action_t], dim=1)
model.update(transition)

tensor(2.8912, grad_fn=<MseLossBackward0>)

In [10]:
model.update(transition)

tensor(23.0153, grad_fn=<MseLossBackward0>)