# Reinforcement Learning : Acrobot

## Description
The system consists of two links connected linearly to form a chain, with one end of the chain fixed. The joint between the two links is actuated. The goal is to apply torques on the actuated joint to swing the free end of the linear chain above a given height while starting from the initial state of hanging downwards.

Two blue links connected by two green joints. The joint in between the two links is actuated. The goal is to swing the free end of the outer-link to reach the target height (black horizontal line above system) by applying torque on the actuator.

### Action Space
The action is discrete, deterministic, and represents the torque applied on the actuated joint between the two links.
- 0 : apply -1 torque to the actuated joint
- 1 : apply 0 torque to the actuated joint
- 2 : apply 1 torque to the actuated joint

### Observation Space
The observation is a ndarray with shape (6,) that provides information about the two rotational joint angles as well as their angular velocities.
- 0 : cosine of theta1 [-1, 1]
- 1 : sine of theta1 [-1, 1]
- 2 : cosine of theta2 [-1, 1]
- 3 : sine of theta2 [-1, 1]
- 4 : angular velocity of theta1 [-12.567 (-4 * pi), 12.567 (4 * pi)]
- 5 : angular velocity of theta2 [-28.274 (-9 * pi), 28.274 (9 * pi)]

### Reward
The goal is to have the free end reach a designated target height in as few steps as possible, and as such all steps that do not reach the goal incur a reward of -1. Achieving the target height results in termination with a reward of 0. The reward threshold is -100.

### Starting State
Each parameter in the underlying state (theta1, theta2, and the two angular velocities) is initialized uniformly between -0.1 and 0.1. This means both links are pointing downwards with some initial stochasticity.

### Episode End
The episode ends if one of the following occurs:
- Termination: The free end reaches the target height, which is constructed as: -cos(theta1) - cos(theta2 + theta1) > 1.0
- Truncation: Episode length is greater than 500 (200 for v0)

In [1]:
import sys
import tqdm
import logging
import itertools
import numpy as np
np.random.seed(0)
import pandas as pd
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributions as dist
torch.manual_seed(0)
import matplotlib.pyplot as plt

In [2]:
env = gym.make("Acrobot-v1")
np.random.seed(42)
observation, _ = env.reset(seed=42)

In [3]:
env.observation_space

Box([ -1.        -1.        -1.        -1.       -12.566371 -28.274334], [ 1.        1.        1.        1.       12.566371 28.274334], (6,), float32)

In [4]:
env.action_space

Discrete(3)

In [5]:
for key in vars(env):
    logging.info('%s: %s', key, vars(env)[key])
for key in vars(env.spec):
    logging.info('%s: %s', key, vars(env.spec)[key])

## A2C : Advantage Actor-Critic

**A2C module by torch :**

In [6]:
## A2C module
class A2C:
    ## initialization
    def __init__(self, env, gamma=0.99):
        self.gamma = gamma
        self.discount = 1.
        self.action_n = env.action_space.n
        self.actor_net = self.build_net(input_size=env.observation_space.shape[0], 
                                        hidden_sizes=[100,],
                                        output_size=env.action_space.n,
                                        output_activator=nn.Softmax(1))
        self.actor_optimizer = optim.Adam(self.actor_net.parameters(), 0.001)
        self.critic_net = self.build_net(input_size=env.observation_space.shape[0], 
                                         hidden_sizes=[100,])
        self.critic_optimizer = optim.Adam(self.critic_net.parameters(), 0.001)
        self.critic_loss = nn.MSELoss()
    
    ## set A2C mode to None or train, default None
    def reset_mode(self, mode=None):
        self.mode = mode
        if self.mode=='train':
            self.traj = []
            self.discount = 1.
    
    ## take in the current observation and reward -> take an action 
    def play_step(self, observation, reward, done):
        state_tensor = torch.as_tensor(observation, dtype=torch.float).reshape(1, -1)
        proba_tensor = self.actor_net(state_tensor)
        action_tensor = dist.Categorical(proba_tensor).sample()
        action = action_tensor.numpy()[0]
        
        if self.mode=='train':
            self.traj += [observation, reward, done, action]
            if len(self.traj)>=8:
                self.reinforce()
            self.discount *= self.gamma
            
        return action
    
    ## build a neural network with ReLU activation function
    def build_net(self, input_size, hidden_sizes, output_size=1, 
                  output_activator=None):
        layers=[]
        
        # build layers
        for input_size, output_size in zip(
                [input_size,]+hidden_sizes, hidden_sizes+[output_size,]):
            layers.append(nn.Linear(input_size, output_size))
            layers.append(nn.ReLU())
        layers = layers[:-1]
        if output_activator:
            layers.append(output_activator)
            
        #build network
        net = nn.Sequential(*layers)
        
        return net
    
    # implementation of reinforce algorithme 
    def reinforce(self):
        state, _, _, action, next_state, reward, done, next_action = self.traj[-8:]
        state_tensor = torch.as_tensor(state, dtype=torch.float).unsqueeze(0)
        next_state_tensor = torch.as_tensor(next_state, dtype=torch.float).unsqueeze(0)
        
        # TD error
        next_v_tensor = self.critic_net(next_state_tensor)
        target_tensor = reward + (1.-done)*self.gamma*next_v_tensor
        v_tensor = self.critic_net(state_tensor)
        td_error_tensor = target_tensor - v_tensor
        
        # train actor network
        # proba of taking current action at current state
        pi_tensor = self.actor_net(state_tensor)[0, action]
        logpi_tensor = torch.log(pi_tensor.clamp(1e-6, 1.))
        actor_loss_tensor = -(self.discount*td_error_tensor*logpi_tensor).squeeze()
        self.actor_optimizer.zero_grad()
        # gradients of the actor loss
        actor_loss_tensor.backward(retain_graph=True)
        # update the parameters of actor network using gradients
        self.actor_optimizer.step()
        
        # train critic network
        pred_tensor = self.critic_net(state_tensor)
        critic_loss_tensor = self.critic_loss(pred_tensor, target_tensor)
        self.critic_optimizer.zero_grad()
        # gradients of the critic loss
        critic_loss_tensor.backward()
        # update the parameters of critic network using gradients
        self.critic_optimizer.step()
        
## simulate an episode
## return the total reward of the episode and elapsed steps
def play_epis(env, agent, max_episode_steps=None, 
              mode=None, render=False):
    observation = env.reset()
    reward = 0.
    done = False
    agent.reset_mode(mode=mode)
    episode_reward = 0.
    elapsed_steps = 0
    
    while True:
        action = agent.play_step(observation, reward, done)
        if render:
            env.render()
        if done:
            break
        observation, reward, done, _ = env.step(action)
        episode_reward += reward
        elapsed_steps += 1
        if max_episode_steps and elapsed_steps>=max_episode_steps:
            break
            
        agent.close()
            
        return episode_reward, elapsed_steps

## Main function

In [7]:
logging.basicConfig(level=logging.DEBUG, 
                    format='%(asctime)s [%(levelname)s] %(message)s', 
                    stream=sys.stdout, 
                    datefmt='%H:%M:%S')

In [None]:
agent = A2C(env)

## train
logging.info('==== train ====')
episode_rewards = []
for episode in itertools.count():
    episode_reward, elapsed_steps = play_epis(env, agent, 
                                         max_episode_steps=env._max_episode_steps, 
                                         mode='train', render=1)
    episode_rewards.append(episode_reward)
    logging.debug('train episode %d: reward = %.2f, steps = %d', 
                  episode, episode_reward, elapsed_steps)
    if np.mean(episode_rewards[-10:])>-120:
        break
plt.plot(episode_rewards)

## test
logging.info('==== test ====')
episode_rewards = []
for episode in range(100):
    episode_reward, elapsed_steps = play_epis(env, agent)
    episode_rewards.append(episode_reward)
    logging.debug('test episode %d: reward = %.2f, steps = %d', 
                  episode, episode_reward, elapsed_steps)
logging.info('average episode reward = %.2f', 
             np.mean(episode_rewards))

env.close()