In [1]:
#!apt install -y xvfb xserver-xephyr tigervnc-standalone-server xfonts-base python-opengl ffmpeg > /dev/null 2>&1
#!apt update > /dev/null 2>&1
#!pip install Box2D  > /dev/null 2>&1
#!pip install gym[all] > /dev/null 2>&1
#!pip install tqdm > /dev/null 2>&1

In [2]:
#!pip install PyVirtualDisplay > /dev/null 2>&1

In [3]:
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from tqdm.notebook import tqdm
from torch.distributions import Normal, Categorical

import gym
import matplotlib.pyplot as plt
import numpy as np

# REINFORCE

In [4]:
class Actor(nn.Module):
    def act(self, state):
        distrib = self.get_action_distribution(state)
        return distrib.sample()

    def get_action_distribution(self, state):
        pass

    def get_logprob(self, state, action):
        pass

class ContinuousActor(Actor):
    def __init__(self, state_size, action_size, hidden_size=256):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 2 * action_size)
        )

    def get_action_distribution(self, state):
        mu, log_sigma = torch.chunk(self.model(state), 2, dim=-1)
        sigma = torch.exp(log_sigma)
        return Normal(mu, sigma) # batch_size x action_size

    def get_logprob(self, state, action):
        distrib = self.get_action_distribution(state)
        return distrib.log_prob(action).sum(-1)

class DiscreteActor(Actor):
    def __init__(self, state_size, action_size, hidden_size=256):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_size)
        )

    def get_action_distribution(self, state):
        logits = self.model(state)
        return Categorical(logits=logits)

    def get_logprob(self, state, action):
        distrib = self.get_action_distribution(state)
        return distrib.log_prob(action)

In [5]:
class Reinforce:
    def __init__(self, actor: Actor, learning_rate=1e-4):
        self.actor = actor
        self.optim = optim.Adam(actor.parameters(), learning_rate)

    def update(self, state, action, score):
        loss = -(score.detach() * self.actor.get_logprob(state, action)).mean()
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()

In [6]:
class Agent:
    def __init__(self, actor, device="cpu"):
        self.actor = actor
        self.device = device

    def act(self, state):
        with torch.no_grad():
            state = torch.tensor([state], dtype=torch.float32, device=self.device)
            action = self.actor.act(state).cpu().numpy()[0]
            return action

In [7]:
def sample_episode(env, agent, gamma=0.99, is_discrete=False):
    state = env.reset()
    done = False
    
    states = []
    actions = []
    rewards = []

    while not done:
        action = agent.act(state)

        states.append(state)
        actions.append(action)

        if not is_discrete:
            action = np.tanh(action)
        state, reward, done, _ = env.step(action)

        rewards.append(reward)
    
    scores = []
    last_score = 0.
    for r in reversed(rewards):
        last_score = last_score * gamma + r
        scores.append(last_score)
    scores = list(reversed(scores))

    return states, actions, scores, np.sum(rewards)

def plot_rewards(reward_means, reward_stds):
    xs = list(range(len(reward_means)))
    y_mean = np.array(reward_means)
    y_std = np.array(reward_stds)

    plt.figure(figsize=(9,6))
    plt.fill_between(xs, y_mean-y_std, y_mean+y_std, color=(0.8, 0.1, 0.1, 0.4))
    plt.plot(xs, y_mean, color=(0.8, 0.1, 0.1))
    plt.grid()
    plt.show()

In [8]:
def train_reinforce(env_name, lr=1e-3, iterations=100, episodes_per_iteration=10, 
                    is_discrete=False, device="cpu", gamma=0.99):
    
    env = gym.make(env_name)
    action_size = env.action_space.n if is_discrete else env.action_space.shape[0]
    state_size = env.observation_space.shape[0]
    
    actor = DiscreteActor(state_size, action_size) if is_discrete else ContinuousActor(state_size, action_size)
    actor.to(device)
    agent = Agent(actor, device)
    reinforce = Reinforce(actor, lr)

    rewards_means = []
    rewards_stds = []

    for i in tqdm(range(iterations+1)):
        states = []
        actions = []
        scores = []
        total_rewards = []
        for _ in range(episodes_per_iteration):
            s, a, dr, tr = sample_episode(env, agent, gamma, is_discrete)
            states.extend(s)
            actions.extend(a)
            scores.extend(dr)
            total_rewards.append(tr)
        
        states = torch.tensor(states, dtype=torch.float32, device=device)
        actions = torch.tensor(actions, dtype=torch.float32, device=device)
        scores = torch.tensor(scores, dtype=torch.float32, device=device)

        reinforce.update(states, actions, scores)
        
        rewards_means.append(np.mean(total_rewards))
        rewards_stds.append(np.std(total_rewards))
    return rewards_means, rewards_stds

In [9]:
rm, rs = train_reinforce("MountainCarContinuous-v0", is_discrete=False, iterations=500, device="cuda")
plot_rewards(rm, rs)

  0%|          | 0/501 [00:00<?, ?it/s]

  state = torch.tensor([state], dtype=torch.float32, device=self.device)


KeyboardInterrupt: 

In [None]:
rm, rs = train_reinforce("LunarLanderContinuous-v2", is_discrete=False, iterations=500, device="cuda")
plot_rewards(rm, rs)

In [None]:
rm, rs = train_reinforce("LunarLander-v2", is_discrete=True, iterations=200, device="cuda")
plot_rewards(rm, rs)

# A2C

In [None]:
class Critic(nn.Module):
    def __init__(self, state_size, hidden_size=256):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )

    def get_v(self, state):
        return self.model(state).squeeze(-1)

In [None]:
class AdvantageActorCritic:
    def __init__(self, actor: Actor, critic: Critic, learning_rate=1e-4, gamma=0.99):
        self.actor = actor
        self.critic = critic
        self.optim = optim.Adam(list(actor.parameters()) 
                                + list(critic.parameters()), learning_rate)
        self.gamma = gamma

    def update(self, states, action, next_states, reward, dones):
        value = self.critic.get_v(states)
        with torch.no_grad():
            next_value = self.critic.get_v(next_states)
            target_value = reward + self.gamma * (1 - dones) * next_value
            advantage = target_value - value.detach()
        
        actor_loss = -(advantage * self.actor.get_logprob(states, action)).mean()
        critic_loss = F.mse_loss(value, target_value)
        entropy_loss = -self.actor.get_action_distribution(state).entropy()

        loss = actor_loss + critic_loss + 0.01 * entropy_loss
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()

In [None]:
def sample_episode(env, agent, is_discrete=False):
    state = env.reset()
    done = False
    
    states = []
    next_states = []
    actions = []
    rewards = []

    while not done:
        action = agent.act(state)

        states.append(state)
        actions.append(action)

        if not is_discrete:
            action = np.tanh(action)
        state, reward, done, _ = env.step(action)

        next_states.append(state)
        rewards.append(reward)
    return states, actions, next_states, rewards, np.sum(rewards)

In [None]:
def train_a2c(env_name, lr=1e-3, iterations=100, episodes_per_iteration=10, 
                    is_discrete=False, device="cpu", gamma=0.99):
    
    env = gym.make(env_name)
    action_size = env.action_space.n if is_discrete else env.action_space.shape[0]
    state_size = env.observation_space.shape[0]
    
    actor = DiscreteActor(state_size, action_size) if is_discrete else ContinuousActor(state_size, action_size)
    actor.to(device)
    critic = Critic(state_size)
    critic.to(device)
    agent = Agent(actor, device)
    a2c = AdvantageActorCritic(actor, critic, lr, gamma)

    rewards_means = []
    rewards_stds = []

    for i in tqdm(range(iterations+1)):
        states = []
        actions = []
        next_states = []
        rewards = []
        dones = []
        total_rewards = []
        for _ in range(episodes_per_iteration):
            s, a, ns, dr, tr = sample_episode(env, agent, is_discrete)
            states.extend(s)
            next_states.extend(ns)
            actions.extend(a)
            rewards.extend(dr)
            dones.extend([0.] * (len(s) - 1) + [1.])

            total_rewards.append(tr)
        
        states = torch.tensor(states, dtype=torch.float32, device=device)
        next_states = torch.tensor(next_states, dtype=torch.float32, device=device)
        actions = torch.tensor(actions, dtype=torch.float32, device=device)
        rewards = torch.tensor(rewards, dtype=torch.float32, device=device)
        dones = torch.tensor(dones, dtype=torch.float32, device=device)

        a2c.update(states, actions, next_states, rewards, dones)
        
        rewards_means.append(np.mean(total_rewards))
        rewards_stds.append(np.std(total_rewards))
    return rewards_means, rewards_stds

In [None]:
rm, rs = train_a2c("MountainCarContinuous-v0", is_discrete=False, iterations=200)
plot_rewards(rm, rs)

In [None]:
rm, rs = train_a2c("LunarLanderContinuous-v2", is_discrete=False, iterations=200)
plot_rewards(rm, rs)

In [None]:
rm, rs = train_a2c("LunarLander-v2", is_discrete=True, iterations=200)
plot_rewards(rm, rs)