# Cart Pole

Adapted from: https://github.com/seungeunrho/minimalRL

## Install dependencies (exact version)

In [1]:
!pip install gym==0.26.2

[0m

## Import libraries

In [2]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

## Set hyper-parameters

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
learning_rate = 0.0002
discount_factor = 0.98

# Actor_critic special hyper-parameters
n_rollout = 10


# PPO special hyper-parameters
lmbda = 0.95
eps_clip = 0.1
K_epoch = 3
T_horizon = 20

## Define the model

In [4]:
from enum import Enum

# ReinforcementLearning
class RL(Enum):
  POLICY = 1
  ACTOR_CRITIC = 2
  PPO = 3

In [5]:
class Policy(nn.Module):

    def __init__(self, learning_rate = 0.0002, discount_factor = 0.98):
        super(Policy, self).__init__()
        self.data = []
        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 2)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.discount_factor = discount_factor #gamma

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=0)
        return x

    def add_data(self, item):
        self.data.append(item)

    def train(self):
        cummulative_reward = 0
        self.optimizer.zero_grad()
        for reward, probability in self.data[::-1]:
            cummulative_reward = reward + self.discount_factor * cummulative_reward
            loss = -torch.log(probability) * cummulative_reward
            loss.backward()
        self.optimizer.step()
        self.data = []


In [6]:
class ActorCritic(nn.Module):
    def __init__(self, learning_rate = 0.0002, discount_factor = 0.98):
        super(ActorCritic, self).__init__()
        self.data = []
        self.fc1 = nn.Linear(4, 256)
        self.fc_policy = nn.Linear(256, 2)
        self.fc_value = nn.Linear(256, 1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.discount_factor = discount_factor #gamma

    def policy(self, x, softmax_dim = 0):
        x = F.relu(self.fc1(x))
        x = self.fc_policy(x)
        prob = F.softmax(x, dim=softmax_dim)
        return prob

    def value(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_value(x)
        return v

    def add_data(self, transition):
        self.data.append(transition)

    def make_batch(self):
        s_lst, a_lst, r_lst, next_state_lst, done_lst = [], [], [], [], []
        for transition in self.data:
            s, a, r, next_state, done = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r/100.0])
            next_state_lst.append(next_state)
            done_mask = 0.0 if done else 1.0
            done_lst.append([done_mask])

        """
        s_batch = torch.tensor(s_lst, dtype=torch.float).to(device)
        a_batch = torch.tensor(a_lst).to(device)
        r_batch = torch.tensor(r_lst, dtype=torch.float).to(device)
        next_state_batch = torch.tensor(next_state_lst, dtype=torch.float).to(device)
        done_batch = torch.tensor(done_lst, dtype=torch.float).to(device)
        """

        #s_batch = torch.stack([ torch.from_numpy(s) for s in s_lst]).to(device)
        #s_batch = torch.from_numpy(s_lst).to(device)
        s_batch = torch.stack([ torch.tensor(s, dtype=torch.float).detach().clone()  for s in s_lst]).to(device)
        a_batch = torch.stack([ torch.tensor(a).detach().clone()                     for a in a_lst]).to(device)
        r_batch = torch.stack([ torch.tensor(r, dtype=torch.float).detach().clone()  for r in r_lst]).to(device)
        next_state_batch = torch.stack([ torch.tensor(next_state, dtype=torch.float).detach().clone() for next_state in next_state_lst]).to(device)
        done_batch = torch.stack([ torch.tensor(done, dtype=torch.float).detach().clone()             for done in done_lst]).to(device)

        self.data = []
        return s_batch, a_batch, r_batch, next_state_batch, done_batch

    def train(self):
        s, a, r, next_state, done = self.make_batch()
        td_target = r + self.discount_factor * self.value(next_state) * done
        delta = td_target - self.value(s)

        policy = self.policy(s, softmax_dim=1)
        policy_a = policy.gather(1,a)
        loss = -torch.log(policy_a) * delta.detach() + F.smooth_l1_loss(self.value(s), td_target.detach())

        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()


In [7]:
class PPO(nn.Module):
  def __init__(self, learning_rate = 0.0002, discount_factor = 0.98):
    super(PPO, self).__init__()
    self.data = []
    self.fc1 = nn.Linear(4, 256)
    self.fc_policy = nn.Linear(256, 2)
    self.fc_value = nn.Linear(256, 1)
    self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
    self.discount_factor = discount_factor #gamma

  def policy(self, x, softmax_dim = 0):
      x = F.relu(self.fc1(x))
      x = self.fc_policy(x)
      prob = F.softmax(x, dim=softmax_dim)
      return prob

  def value(self, x):
      x = F.relu(self.fc1(x))
      v = self.fc_value(x)
      return v

  def add_data(self, transition):
      self.data.append(transition)

  def make_batch(self):
      s_lst, a_lst, r_lst, next_state_lst, prob_a_lst, done_lst = [], [], [], [], [], []
      for transition in self.data:
          s, a, r, next_state, prob_a, done = transition
          s_lst.append(s)
          a_lst.append([a])
          r_lst.append([r])
          next_state_lst.append(next_state)
          prob_a_lst.append([prob_a])
          done_mask = 0 if done else 1
          done_lst.append([done_mask])

      """
      s_batch = torch.tensor(s_lst, dtype=torch.float).to(device)
      a_batch = torch.tensor(a_lst).to(device)
      r_batch = torch.tensor(r_lst).to(device)
      prob_a_batch = torch.tensor(prob_a_lst).to(device)
      next_state_batch = torch.tensor(next_state_lst, dtype=torch.float).to(device)
      done_batch = torch.tensor(done_lst, dtype=torch.float).to(device)
      """
      s_batch = torch.stack([ torch.tensor(s, dtype=torch.float).detach().clone() for s in s_lst]).to(device)
      a_batch = torch.stack([ torch.tensor(a).detach().clone()                    for a in a_lst]).to(device)
      r_batch = torch.stack([ torch.tensor(r).detach().clone()                    for r in r_lst]).to(device)
      prob_a_batch = torch.stack([ torch.tensor(prob_a).detach().clone()          for prob_a in prob_a_lst]).to(device)
      next_state_batch = torch.stack([ torch.tensor(next_state, dtype=torch.float).detach().clone() for next_state in next_state_lst]).to(device)
      done_batch = torch.stack([ torch.tensor(done, dtype=torch.float).detach().clone()             for done in done_lst]).to(device)

      self.data = []
      return s_batch, a_batch, r_batch, next_state_batch, done_batch, prob_a_batch

  def train(self):
      s, a, r, next_state, done, prob_a = self.make_batch()

      for i in range(K_epoch):
        td_target = r + self.discount_factor * self.value(next_state) * done
        delta = td_target - self.value(s)
        delta = delta.detach().cpu().numpy()

        advantage_lst = []
        advantage = 0.0
        for delta_t in delta[::-1]:
            advantage = self.discount_factor * lmbda * advantage + delta_t[0]
            advantage_lst.append([advantage])
        advantage_lst.reverse()
        advantage = torch.tensor(advantage_lst, dtype=torch.float).to(device)

        policy = self.policy(s, softmax_dim=1)
        policy_a = policy.gather(1, a)
        ratio = torch.exp(torch.log(policy_a) - torch.log(prob_a))  # a/b == exp(log(a)-log(b))

        surr1 = ratio * advantage
        surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
        loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.value(s) , td_target.detach())

        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()


## Define trains

In [8]:
#Defs

def game_with_policy(episodes_range):
  print('Training game with Policy')
  game = 'CartPole-v1'
  env = gym.make(game)
  model = Policy().to(device)
  score = 0.0
  print_interval = 50

  for n_episodes in range(episodes_range):
      state, _ = env.reset()
      done = False

      while not done: # CartPole-v1 forced to terminates at 500 step.
          state = torch.from_numpy(state).float().to(device)
          probabilities = model(state)
          distribution = Categorical(probabilities)
          action = distribution.sample()
          next_state, reward, done, truncated, info = env.step(action.item())
          model.add_data((reward, probabilities[action]))
          state = next_state
          score += reward

      model.train()

      if n_episodes % print_interval == 0 and n_episodes != 0:
          print("# of episode :{}, avg score : {}".format(n_episodes, score/print_interval))
          score = 0.0

  env.close()
  return game, model

def game_with_actor_critic(episodes_range):
  print('Training game with ActorCritic')
  game = 'CartPole-v1'
  env = gym.make(game)
  model = ActorCritic(learning_rate=learning_rate, discount_factor = discount_factor).to(device)
  score = 0.0
  print_interval = 50

  for n_episodes in range(episodes_range):
      state, _ = env.reset()
      done = False

      while not done: # CartPole-v1 forced to terminates at 500 step.
        for t in range(n_rollout):
          state = torch.from_numpy(state).float().to(device)
          probabilities = model.policy(state)
          distribution = Categorical(probabilities)
          action = distribution.sample().item()
          next_state, reward, done, truncated, info = env.step(action)
          model.add_data((state, action, reward, next_state, done))
          state = next_state
          score += reward

          if done:
            break

      model.train()

      if n_episodes % print_interval == 0 and n_episodes != 0:
          print("# of episode :{}, avg score : {}".format(n_episodes, score/print_interval))
          score = 0.0

  env.close()
  return game, model

def game_with_ppo(episodes_range):
  print('Training game with PPO')
  game = 'CartPole-v1'
  env = gym.make(game)
  model = PPO(learning_rate=learning_rate, discount_factor = discount_factor).to(device)
  score = 0.0
  print_interval = 50

  for n_episodes in range(episodes_range):
      state, _ = env.reset()
      done = False

      while not done: # CartPole-v1 forced to terminates at 500 step.
        for t in range(T_horizon):
          state = torch.from_numpy(state).float().to(device)
          probabilities = model.policy(state)
          distribution = Categorical(probabilities)
          action = distribution.sample().item()
          next_state, reward, done, truncated, info = env.step(action)
          model.add_data((state, action, reward/100.0, next_state, probabilities[action].item(), done))
          state = next_state
          score += reward

          if done:
              break

      model.train()

      if n_episodes % print_interval == 0 and n_episodes != 0:
          print("# of episode :{}, avg score : {}".format(n_episodes, score/print_interval))
          score = 0.0

  env.close()
  return game, model


In [9]:
training_game_mapper = {
    RL.POLICY.name:       lambda episodes_range: game_with_policy(episodes_range),
    RL.ACTOR_CRITIC.name: lambda episodes_range: game_with_actor_critic(episodes_range),
    RL.PPO.name:          lambda episodes_range: game_with_ppo(episodes_range),
}

# Perform Training

In [10]:
episodes_range = 505

# policy | actor_critic | ppo
rl_model = 'ppo'


rl_model_type = rl_model.upper()
training_game = training_game_mapper[rl_model_type]
game, model = training_game(episodes_range)

Training game with PPO


  s_batch = torch.stack([ torch.tensor(s, dtype=torch.float).detach().clone() for s in s_lst]).to(device)


# of episode :50, avg score : 24.06
# of episode :100, avg score : 43.7
# of episode :150, avg score : 75.0
# of episode :200, avg score : 96.78
# of episode :250, avg score : 138.42
# of episode :300, avg score : 174.7
# of episode :350, avg score : 209.44
# of episode :400, avg score : 212.28
# of episode :450, avg score : 279.46
# of episode :500, avg score : 267.4


## Record a video of a sample playthrough

Adapted from: https://www.anyscale.com/blog/an-introduction-to-reinforcement-learning-with-openai-gym-rllib-and-google

In [11]:
from gym.wrappers.monitoring.video_recorder import VideoRecorder
from base64 import b64encode
from IPython.display import HTML

def render_mp4(videopath):
  mp4 = open(videopath, 'rb').read()
  base64_encoded_mp4 = b64encode(mp4).decode()
  return f'<video width=400 controls><source src="data:video/mp4;' \
         f'base64,{base64_encoded_mp4}" type="video/mp4"></video>'

def play_policy(env, video, model_policy):
  print('Play with Policy model')
  state, _ = env.reset()
  done = False
  with torch.no_grad():
    while not done:
        env.render()
        video.capture_frame()
        state = torch.from_numpy(state).float().to(device)
        probabilities = model_policy(state)
        distribution = Categorical(probabilities)
        action = distribution.sample()
        state, reward, done, truncated, info = env.step(action.item())
  return env, video


def play_actor_critic(env, video, model_actor_critic):
  print('Play with Actor Critic model')
  state, _ = env.reset()
  done = False
  with torch.no_grad():
    while not done:
      env.render()
      video.capture_frame()
      state = torch.from_numpy(state).float().to(device)
      probabilities = model_actor_critic.policy(state)
      distribution = Categorical(probabilities)
      action = distribution.sample().item()
      state, r, done, truncated, info = env.step(action)
  return env, video


def play_ppo(env, video, model_ppo):
  print('Play with PPO model')
  state, _ = env.reset()
  done = False
  with torch.no_grad():
    while not done: # could add a T_horizon in loop condition
      env.render()
      video.capture_frame()
      state = torch.from_numpy(state).float().to(device)
      probabilities = model_ppo.policy(state)
      distribution = Categorical(probabilities)
      action = distribution.sample().item()
      state, r, done, truncated, info = env.step(action)
  return env, video

get_play_by_models = {
    RL.POLICY.name:       lambda env, video, model: play_policy(env, video, model),
    RL.ACTOR_CRITIC.name: lambda env, video, model: play_actor_critic(env, video, model),
    RL.PPO.name:          lambda env, video, model: play_ppo(env, video, model),
}

def play(game, model):
  env = gym.make(game, render_mode="rgb_array")
  videopath = "video.mp4"
  video = VideoRecorder(env, videopath)
  video, env = get_play_by_models[rl_model_type](env, video, model)
  video.close()
  env.close()
  return render_mp4(videopath)

HTML(play(game, model))

Play with PPO model
Moviepy - Building video video.mp4.
Moviepy - Writing video video.mp4



                                                               

Moviepy - Done !
Moviepy - video ready video.mp4




# TODOs:

1. [X] Extend the code with Actor-Critic.
2. [X] Extend the code with PPO (+ Actor-Critic).