In [None]:
"""
Created on Aug. 12, 2021
Group: Alpha IceCream
Title: A2C on LunarLander
"""

Install dependencies

In [None]:
# # @title Install dependencies
!pip install rarfile --quiet
!pip install stable-baselines3 > /dev/null
!pip install box2d-py > /dev/null
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!sudo apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

Imports

In [None]:
import io
import os
import glob
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import base64
import stable_baselines3

import numpy as np
import matplotlib.pyplot as plt

import gym
from gym import spaces
from gym.wrappers import Monitor

Set device

In [None]:
def set_device():
  device = "cuda" if torch.cuda.is_available() else "cpu"
  if device != "cuda":
    print("WARNING: For this notebook to perform best, "
        "if possible, in the menu under `Runtime` -> "
        "`Change runtime type.`  select `GPU` ")
  else:
    print("GPU is enabled in this notebook.")

  return device
set_device()

In [None]:
# # @title Plotting/Video functions
# from IPython.display import HTML
# from pyvirtualdisplay import Display
# from IPython import display as ipythondisplay

# display = Display(visible=0, size=(1400, 900))
# display.start()

# """
# Utility functions to enable video recording of gym environment
# and displaying it.
# To enable video, just do "env = wrap_env(env)""
# """

# def show_video():
#   mp4list = glob.glob('video/*.mp4')
#   if len(mp4list) > 0:
#     mp4 = mp4list[0]
#     video = io.open(mp4, 'r+b').read()
#     encoded = base64.b64encode(video)
#     ipythondisplay.display(HTML(data='''<video alt="test" autoplay
#                 loop controls style="height: 400px;">
#                 <source src="data:video/mp4;base64,{0}" type="video/mp4" />
#              </video>'''.format(encoded.decode('ascii'))))
#   else:
#     print("Could not find video")


# def wrap_env(env):
#   env = Monitor(env, './video', force=True)
#   return env

Before training

In [None]:
log_dir = "/tmp/gym/"
os.makedirs(log_dir, exist_ok=True)

# Create environment
env = gym.make("LunarLander-v2")

# On Kaggle, these two lines for visualization raise an error.
#env = wrap_env(gym.make("LunarLander-v2"))
#env = stable_baselines3.common.monitor.Monitor(env, log_dir )

n_episodes = 100
for episode in range(n_episodes):
  observation = env.reset()
  total_reward = 0
  done = False
  while not done:
    # Visualize
    #env.render()
    action = env.action_space.sample() # 0- Do nothing, 1- Fire left engine, 2- Fire bottom engine, 3- Fire right engine
    observation_, reward, done, info = env.step(action)
    total_reward += reward
    if done:
      break;
  if episode%10 == 0:
    print('Episode: {}, Total Reward: {:.2f}.'.format(episode, total_reward))
#env.close()
#show_video()  

Policy network (Failed on RNN)

In [None]:
# Loss
criterion = nn.CrossEntropyLoss()

class ActorCriticNetwork(nn.Module):
  def __init__(self, lr, input_dim, hidden_dim, n_actions, gamma):

    super(ActorCriticNetwork, self).__init__()
    self.lr = lr
    self.input_dim = input_dim
    self.hidden_dim = hidden_dim
    self.n_actions = n_actions
    self.n_layers = n_layers
    #self.model = model

#     # Models
#     if self.model == "gru":
#       self.rnn = nn.GRU(input_dim, hidden_dim, n_layers, batch_first=True, nonlinearity='relu')
#     elif self.model == "lstm":
#       self.rnn = nn.LSTM(input_dim, hidden_dim, n_layers, batch_first=True, nonlinearity='relu')
#     elif self.model == "rnn":
#       self.rnn = nn.RNN(input_dim, hidden_dim, n_layers, batch_first=True, nonlinearity='relu')
  
    # Fully connected layer
    self.layer1 = nn.Linear(*self.input_dim, self.hidden_dim)
    self.layer2 = nn.Linear(self.hidden_dim, self.hidden_dim)
    self.layer_norm = nn.LayerNorm(self.hidden_dim)
    self.pi = nn.Linear(self.hidden_dim, self.n_actions)
    self.v = nn.Linear(self.hidden_dim, 1)
    self.optimizer = torch.optim.Adam(self.parameters(), lr = lr)
    
    self.device = set_device()
    self.to(self.device)

  def forward(self, state):
    # Initialize hidden state with zeros
#     h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
    # One time step
#     out, hn = self.rnn(state, h0)
#     pi = self.pi(out[:, -1, :]) 
#     v = self.v(out[:, -1, :]) 
    x = F.relu(self.layer1(state))
    x = self.layer_norm(x)
    x = F.relu(self.layer2(x))
    x = self.layer_norm(x)
    pi = self.pi(x)
    v = self.v(x)

    return (pi, v)

Agent

In [None]:
class Agent():
    def __init__(self, lr, input_dim, hidden_dim, n_actions, gamma):

        self.lr = lr
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.n_actions = n_actions
        self.gamma = gamma   # Discount factor
#         self.n_layers = n_layers
        #self.model = model   # RNN
        self.network = ActorCriticNetwork(lr, input_dim, hidden_dim, n_actions, gamma)  
        
    def action_selection(self, observation):
        state = torch.tensor([observation]).float().to(self.network.device)
        action_prob, _ = self.network.forward(state)
        action_prob = F.softmax(action_prob, dim=1)
        action_dist = torch.distributions.Categorical(action_prob)
        action = action_dist.sample()
        action_log_prob = action_dist.log_prob(action)
        self.action_log_prob = action_log_prob

        return action.item()

    def forward(self, state, reward, state_, done, advantage):
        self.network.optimizer.zero_grad()

        state = torch.tensor([state]).float().to(self.network.device)
        state_ = torch.tensor([state_]).float().to(self.network.device)
        reward = torch.tensor([reward]).float().to(self.network.device)
        self.advantage = advantage

        _, value = self.network.forward(state)
        _, value_ = self.network.forward(state_)
        
        if self.advantage:
            reward = reward + self.gamma * value_ *(1-int(done)) 
            
            # https://gitee.com/nidao/Deep-reinforcement-learning-with-pytorch/blob/master/Char04%20A2C/A2C.py
            advantage = reward - value
            actor_loss  = -(self.action_log_prob * advantage.detach()).mean()
            critic_loss = np.mean(self.advantage**2)
            #loss = actor_loss + 0.5 * critic_loss - 0.001 * entropy

        else:
            # In terminal(1-int(done)==1) state, the value will be 0. 
            # delta: the estimation error for updating the actor.
            delta = reward + self.gamma * value_ *(1-int(done)) - value
            actor_loss = -self.action_log_prob*delta
            critic_loss = delta**2

        (actor_loss + critic_loss).backward()
        self.network.optimizer.step()

Plotting function

In [None]:
def plot_runing_curve(x, total_reward, figure_file):
    running_avg = np.zeros(len(total_reward))
    for i in range(len(running_avg)):
        running_avg[i] = np.mean(total_reward[max(0, i-100):(i+1)])
    plt.plot(x, running_avg)
    plt.title('Running average of previous 100 total_reward')
    plt.savefig(figure_file)

Hyperparameters

In [None]:
#n_layers = 2
input_dim = [8]
hidden_dim = 1024
n_layers = 1
#model = None
lr = 0.0000001
gamma = 0.999
n_actions = 4
advantage = 1

In [None]:
if __name__ == '__main__':

  env = gym.make("LunarLander-v2")

  n_episode = 1000
  agent = Agent(lr, input_dim, hidden_dim, n_actions, gamma)

  # Conventional terms used from https://gym.openai.com/docs/
  total_reward = []
  for episode in range(n_episode):
      done = False
      observation = env.reset()
      temp_reward = 0
      while not done:
          action = agent.action_selection(observation)
          state_, reward, done, info = env.step(action)
          temp_reward += reward
          agent.forward(observation, reward, observation_, done, advantage)
          observation = observation_
      total_reward.append(temp_reward)

      reward_avg = np.mean(total_reward[-100:])
      if episode %100 == 0:
          print('episode ', episode, 'score %.1f' % temp_reward, 'average score %.1f' % reward_avg)

  x = [episode+1 for episode in range(n_episode)]

  fname = 'Actor_Critic_on_Lunar_Lander' + str(agent.lr) + '_' +str(n_episodes) + 'games'
  plot_dir = "/tmp/gym/plots"
  os.makedirs(plot_dir, exist_ok=True)
  figure_file = os.path.join(plot_dir, fname +'.png')
  plot_runing_curve(x, total_reward, figure_file)
    
  #   if done:
  #     break;
  env.close()
  # show_video()  