In [1]:
import os

import gym
import torch as T
import numpy as np

from PIL import Image

In [2]:
device = T.device("cuda:0" if T.cuda.is_available() else "cpu")

In [3]:
class ReplayBuffer:
    def __init__(self):
        self.memory_actions = []
        self.memory_states = []
        self.memory_log_probs = []
        self.memory_rewards = []
        self.is_terminals = []
    
    def clear_memory(self):
        del self.memory_actions[:]
        del self.memory_states[:]
        del self.memory_log_probs[:]
        del self.memory_rewards[:]
        del self.is_terminals[:]

In [4]:
class ActorCritic(T.nn.Module):
    def __init__(self, state_dimension, action_dimension, nb_latent_variables):
        super(ActorCritic, self).__init__()

        self.action_layer = T.nn.Sequential(
            T.nn.Linear(state_dimension, nb_latent_variables),
            T.nn.Tanh(),
            T.nn.Linear(nb_latent_variables, nb_latent_variables),
            T.nn.Tanh(),
            T.nn.Linear(nb_latent_variables, action_dimension),
            T.nn.Softmax(dim=-1)
        )

        self.value_layer = T.nn.Sequential(
            T.nn.Linear(state_dimension, nb_latent_variables),
            T.nn.Tanh(),
            T.nn.Linear(nb_latent_variables, nb_latent_variables),
            T.nn.Tanh(),
            T.nn.Linear(nb_latent_variables, 1)
        )

    def act(self, state, memory):
        state = T.from_numpy(state).float().to(device) 
        action_probs = self.action_layer(state)
        dist = T.distributions.Categorical(action_probs)
        action = dist.sample()

        memory.memory_states.append(state)
        memory.memory_actions.append(action)
        memory.memory_log_probs.append(dist.log_prob(action))

        return action.item()

    def evaluate(self, state, action):
        action_probs = self.action_layer(state)
        dist = T.distributions.Categorical(action_probs)

        action_log_probs = dist.log_prob(action)
        dist_entropy = dist.entropy()

        state_value = self.value_layer(state)

        return action_log_probs, T.squeeze(state_value), dist_entropy

In [5]:
class Agent:
    def __init__(
        self, state_dimension, action_dimension, nb_latent_variables,
        lr, betas, gamma, K_epochs, eps_clip):
        self.lr = lr
        self.betas = betas
        self.gamma = gamma
        self.eps_clip = eps_clip
        self.K_epochs = K_epochs
        
        self.policy = ActorCritic(
            state_dimension,
            action_dimension,
            nb_latent_variables).to(device)
        self.optimizer = T.optim.Adam(
            self.policy.parameters(), lr=lr, betas=betas)
        self.policy_old = ActorCritic(
            state_dimension,
            action_dimension,
            nb_latent_variables).to(device)
        self.policy_old.load_state_dict(self.policy.state_dict())
        
        self.MseLoss = T.nn.MSELoss()
    
    def update(self, memory):
        # Monte Carlo estimate
        rewards = []
        discounted_reward = 0
        for reward, is_terminal in \
            zip(reversed(memory.memory_rewards), reversed(memory.is_terminals)):
            if is_terminal:
                discounted_reward = 0
            discounted_reward = reward + (self.gamma * discounted_reward)
            rewards.insert(0, discounted_reward)
        
        # Normalize
        rewards = T.tensor(rewards).to(device)
        rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)
        
        # Convert to Tensor
        old_states = T.stack(memory.memory_states).to(device).detach()
        old_actions = T.stack(memory.memory_actions).to(device).detach()
        old_log_probs = T.stack(memory.memory_log_probs).to(device).detach()
        
        # Policy Optimization
        for _ in range(self.K_epochs):
            log_probs, state_values, dist_entropy = self.policy.evaluate(
                old_states, old_actions)
            
            # Finding ratio: pi_theta / pi_theta__old
            ratios = T.exp(log_probs - old_log_probs.detach())
            
            # Surrogate Loss
            advantages = rewards - state_values.detach()
            surr1 = ratios * advantages
            surr2 = T.clamp(ratios, 1-self.eps_clip, 1+self.eps_clip) * advantages
            loss = -T.min(surr1, surr2) + \
                0.5*self.MseLoss(state_values, rewards) - 0.01*dist_entropy
            
            # Backpropagation
            self.optimizer.zero_grad()
            loss.mean().backward()
            self.optimizer.step()
        
        # New weights to old policy
        self.policy_old.load_state_dict(self.policy.state_dict())

In [6]:
env = gym.make("LunarLander-v2")
np.random.seed(0)

In [7]:
render = True

In [8]:
memory = ReplayBuffer()
agent = Agent(
    state_dimension=env.observation_space.shape[0],
    action_dimension=4,
    nb_latent_variables=64,
    lr=0.002,
    betas=(0.9, 0.999),
    gamma=0.99,
    K_epochs=4,
    eps_clip=0.2)

In [9]:
agent.policy_old.load_state_dict(T.load("../Exercise11.03/PPO_LunarLander-v2.pth"))

<All keys matched successfully>

In [10]:
for ep in range(5):
    ep_reward = 0
    state = env.reset()
    
    for t in range(300):
        action = agent.policy_old.act(state, memory)
        state, reward, done, _ = env.step(action)
        
        ep_reward += reward
        
        if render:
            env.render()
            
            img = env.render(mode = "rgb_array")
            img = Image.fromarray(img)
            image_dir = "./gif"
            if not os.path.exists(image_dir):
                os.makedirs(image_dir)
            img.save(os.path.join(image_dir, "{}.jpg".format(t)))  
        if done:
            break

    print("Episode: {}, Reward: {}".format(ep, int(ep_reward)))
    ep_reward = 0
    env.close()

Episode: 0, Reward: 272
Episode: 1, Reward: 148
Episode: 2, Reward: 249
Episode: 3, Reward: 169
Episode: 4, Reward: 35
