# Control in a continuous action space with DDPG
_Authors:_ Aristotelis Dimitriou, Konstantinos Spinakis

---

### Introduction

In this reinforcement learning project, we implement the Deep Deterministic Policy Gradient (DDPG) algorithm to handle continuous action spaces while maintaining the benefits of Deep Q-learning (DQN). The objective is to stabilize an inverted pendulum in the Pendulum-v1 environment from OpenAI Gym.

DDPG is an actor-critic algorithm that utilizes one neural network (critic) to estimate the Q function and another (actor) to select the action. It is based on the deterministic policy gradient theorem, allowing both the actor and critic to be trained off-policy from a replay buffer. The policy network outputs a specific action instead of a probability distribution, enabling a flexible exploration strategy.

* The `Pendulum-v1` environment provides a three-dimensional observation vector $(\cos(\alpha), \sin(\alpha), \dot{\alpha})$ where $\alpha$ represents the angle between the pendulum and the vertical line. 

* The action is a scalar value between -2 and 2, representing the torque applied to the pendulum's unique joint. 

* The control policy must learn to swing the pendulum to gain momentum before stabilizing it in a vertical position with minimal torque. 

* The reward function is defined as $-(\alpha^2 + 0.1\cdot\dot{\alpha}^2 + 0.001\cdot\tau^2)$, with the maximum reward of 0 achieved when the pendulum is vertically positioned, motionless, and with no torque applied.


In [21]:
import gym
import numpy as np
from helpers import NormalizedEnv, RandomAgent
import torch
import torch.nn as nn

___
### Heuristic Policy

In this section, we will familiarize ourselves with the `Pendulum-v1` environment by implementing a simple heuristic policy to attempt stabilizing the pendulum. We will compare the heuristic policy with a random policy to verify the increase in average reward.

_**Tasks:**_


1. Create an instance of the `Pendulum-v1` environment and wrap it in a `NormalizedEnv` class.

In [22]:
env = NormalizedEnv(gym.make('Pendulum-v1'))

2. Implement a functions that simulates an interaction between the environment and the agent. Returning the average cumulative reward.

In [23]:
def run_agent(agent, env, episodes=10, verbose=False):
    rewards = []
    for i in range(episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            action = agent.compute_action(state)
            next_state, reward, _, done, _ = env.step(action)
            total_reward += reward
            state = next_state
            if verbose:
                print(f'Episode {i+1}/{episodes}')
                print(f'State: {state}')
                print(f'Action: {action}')
                print(f'Reward: {reward}')
                print(f'Done: {done}')
                print('------------------')
        rewards.append(total_reward)
    return np.mean(rewards)

3. Implement a heuristic policy for the pendulum (`HeuristicPendulumAgent`).

In [24]:
class HeuristicPendulumAgent:
    def __init__(self, env, fixed_torque=0.5, verbose=False):
        self.state_size = env.observation_space.shape[0]
        self.action_size = env.action_space.shape[0]
        self.fixed_torque = fixed_torque
        self.verbose = verbose

    def compute_action(self, state):
        if self.verbose:
            print(f'State: {state}')
        x, y, angular_velocity = state
        if y < 0:   # Lower half of the domain
            action = np.sign(angular_velocity) * self.fixed_torque
        else:       # Upper half of the domain
            action = -np.sign(angular_velocity) * self.fixed_torque
        return np.array([action])

5. Compare the average cumulative reward obtained by the heuristic policy and compare it with the reward of the random agent.

In [25]:
heuristic_agent = HeuristicPendulumAgent(env, verbose=False)
random_agent = RandomAgent(env)

heuristic_agent_avg_reward = run_agent(heuristic_agent, env, verbose=False)
random_agent_avg_reward = run_agent(random_agent, env, verbose=False)
print(f'Random agent average reward: {random_agent_avg_reward:.2f}')
print(f'Heuristic agent average reward: {heuristic_agent_avg_reward:.2f}')

Random agent average reward: -1024.38
Heuristic agent average reward: -1173.38


In [26]:
class ReplayBuffer:
    """ A buffer for storing transitions sampled from the environment. """
    def __init__(self, max_size, verbose=False):
        self.max_size = max_size
        self.transitions = []
        self.verbose = verbose
        
    def store(self, transition):
        """ Store a transition. """
        if self.verbose:
            print(f'Storing transition {transition}')
            
        if len(self.transitions) < self.max_size:
            self.transitions.append(transition)
        else:
            self.transitions.pop(0)
            self.transitions.append(transition)
        
    def sample(self, batch_size):
        """ Sample a batch of transitions. """
        batch = []
        for _ in range(batch_size):
            idx = np.random.randint(0, len(self.transitions))
            batch.append(self.transitions[idx])
            if self.verbose:
                print(f'Sampling transition {self.transitions[idx]}')
        return batch
        

In [27]:
class QNetwork(torch.nn.Module):
    def __init__(self):
        super(QNetwork, self).__init__()
        self.activation = nn.ReLU()
        self.fc1 = nn.Linear(4, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, 1)

    def forward(self, x):
        """ Forward pass of the network. """
        x = x.view(-1, 4)
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = self.fc3(x)

        return x
        

In [32]:
def train_q_network(q_network, transitions, optimizer, gamma):
    """
    Train Q-Network using 1-step TD-learning rule.

    Parameters:
    q_network (QNetwork): The Q-Network instance to be trained.
    transitions (list): A list of tuples containing the transitions used for training.
                        Each tuple should have the format (state, action, reward, next_state, trunc).
    optimizer (torch.optim.Optimizer): The optimizer used for updating the Q-Network's weights.
    gamma (float): The discount factor for future rewards (0 <= gamma <= 1).

    Returns:
    loss (float): The loss value after training the Q-Network with the given batch of transitions.
    """

    # Unpack transitions
    states, actions, rewards, next_states, trunc = zip(*transitions)
    print(f'States: {states}')
    print(f'Actions: {actions}')
    print(f'Rewards: {rewards}')
    print(f'Next states: {next_states}')
    print(f'Trunc: {trunc}')
    

    # Convert lists to PyTorch tensors
    states = torch.tensor(states, dtype=torch.float32)
    actions = torch.tensor(actions, dtype=torch.float32)
    rewards = torch.tensor(rewards, dtype=torch.float32)
    next_states = torch.tensor(next_states, dtype=torch.float32)
    trunc = torch.tensor(trunc, dtype=torch.float32)

    # Conacatenate states and actions to form input to Q-network
    state_action_pair = torch.cat((states, actions), dim=-1)
    print(f'State-action pair: {state_action_pair}')
    

    # Computes Q-values for the given state-action pairs
    #   Use `QNetwork` class
    peos = q_network(state_action_pair)
    print(f'Predicted Q-values: {peos}')

    # Compute target Q-values
    #   Use torck.no_grad() or something like that

    # Compute loss
    #   Use MSE
    # torch.nn.functional.mse_loss

    # Perform backpropagation
    #   zero_grad() -> backward() -> step()

    # return loss.item()
    return None



In [39]:

# # Make variables to be given to the train_q_network function
q_network = QNetwork()
optimizer = torch.optim.Adam(q_network.parameters(), lr=0.001)
gamma = 0.99

# Make a replay buffer
replay_buffer = ReplayBuffer(max_size=1000, verbose=False)

# Make a random agent
random_agent = RandomAgent(env)

# Collect some transitions
transitions = []
for _ in range(3):
    state, _,  = env.reset()
    action = random_agent.compute_action(state)
    next_state, reward, _, done, _ = env.step(action)
    transition = (state, action, reward, next_state, done)
    transitions.append(transition)
    replay_buffer.store(transition)


train_q_network(q_network, transitions, None, None)

States: (array([-0.03370433, -0.99943185,  0.3547029 ], dtype=float32), array([-0.88589674, -0.46388254, -0.75104225], dtype=float32), array([ 0.7519081 ,  0.6592679 , -0.03331237], dtype=float32))
Actions: (array([0.24059471]), array([0.11533584]), array([0.08065662]))
Rewards: (-2.5872557846275797, -7.127909880672323, -0.5183133532392202)
Next states: (array([-0.0498247 , -0.99875796, -0.32269257], dtype=float32), array([-0.9093176, -0.4161027, -1.0643535], dtype=float32), array([0.73569   , 0.6773184 , 0.48533553], dtype=float32))
Trunc: (False, False, False)
State-action pair: tensor([[-0.0337, -0.9994,  0.3547,  0.2406],
        [-0.8859, -0.4639, -0.7510,  0.1153],
        [ 0.7519,  0.6593, -0.0333,  0.0807]])
Predicted Q-values: tensor([[0.0725],
        [0.1138],
        [0.1549]], grad_fn=<AddmmBackward0>)


In [None]:
def create_heatmap(q_network, action_range, velocity_range, n_points):
    """Create heatmap of Q-values for the pendulum environment."""