In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.distributions import Categorical, Normal
from torchsummary import summary
import pickle
import os
import gym
from gym.wrappers import Monitor
from tqdm import tqdm
import numpy as np
import time
from pyvirtualdisplay import Display
from IPython import display as ipythondisplay
from pathlib import Path
from IPython.display import clear_output
import random, os.path, math, glob, csv, base64, itertools, sys




In [2]:
def to_tensor(x):
    """Helper function, which converts list or np.array to torch.tensor
    """

    if not isinstance(x, np.ndarray):
        x = np.array(x)

    return torch.from_numpy(x).float()

def to_numpy(x):
    return x.numpy()

display = Display(visible=0, size=(1400, 900))
display.start();

In [3]:
class ActorCritic(nn.Module):
    def __init__(self, observation_space, action_space, hidden_layers=[512,256], policy_activation=None):
        super().__init__()
        
        # Write down hidden layers dimensions
        self.hidden_layers_dimensions = hidden_layers
        self.policy_activation_ = policy_activation
        
        # Initialize the common model
        self.model = nn.Sequential(
            nn.Linear(observation_space, hidden_layers[0]),
            nn.ReLU(),
            nn.Linear(hidden_layers[0], hidden_layers[1]),
            nn.ReLU()
        )
        
        # Enpoind for policy
        self.policy = nn.Linear(hidden_layers[1], action_space)
        if self.policy_activation_:
            self.policy_activation = self.policy_activation_(action_space)
        # Endpoint for values
        self.value = nn.Linear(hidden_layers[1], 1)
        
    def forward(self, x):
        # Apply common model
        l = self.model(x)
        # Get value output
        value = self.value(l)
        # Get policy output
        policy = self.policy(l)
        if self.policy_activation_:
            policy = self.policy_activation(policy)
        return value, policy

In [4]:
class Agent:
    def __init__(self, lr=0.0003, gamma=1, environment="Pendulum-v1", beta=0, discrete=False, sample_std=0.0001):
        self.beta = beta                                  # Beta for entropy (actor loss)
        self.lr = lr                                      # Learning rate
        self.gamma = gamma                                # Gamma for discounting late actions
        self.env = gym.make(environment)                  # Init env
        self.discrete = discrete                          # Discrete env
        self.best = None                                  # Path to best model (highest reward)
        
        self.sample_std = sample_std if sample_std else 0.04         # STD for sampling action from policy distribution
        self.observation_space = self.env.observation_space.shape[0] # Env output vector size
        # Actuator vector size
        self.n_actions = self.env.action_space.n if self.discrete else self.env.action_space.shape[0]
        # Monitor wrapper to see the env
        self.monitor_env = Monitor(self.env, "./videos", force=True, video_callable=lambda episode: True)

        # Initialize actor-critic and it's optimizer
        self.actor_critic = ActorCritic(self.observation_space, self.n_actions)
        self.optimizer = optim.Adam(self.actor_critic.parameters(), self.lr)

    def unpickle_self(cls, path):
        '''Unpickle self from provided path'''
        with open(path, "rb") as pfile:
            self = pickle.load(pfile)
        
    def load_best(self):
        '''Loads weights of best NN this object experienced'''
        self.load_weights(self.best)
        
    def load_weights(self, path):
        '''Load weights from provided path'''
        # Load weights
        weights_new = torch.load(path)
        # Initialize new model, load it's new weights and reinitialize optimizer
        self.actor_critic = ActorCritic(self.observation_space, self.n_actions)
        self.actor_critic.load_state_dict(weights_new)
        self.optimizer = optim.Adam(self.actor_critic.parameters(), self.lr)
        
    def save_model(self, path="basic", directory="models", selfpickle=False):
        '''Save model (as NN params or pickle of Agent)'''
        # Create topdir
        os.makedirs(directory, exist_ok=True)
        if selfpickle:       # Pickle object
            with open(os.path.join(directory, path + ".p"), "wb") as pfile:
                pickle.dump(self, pfile)
            return os.path.join(directory, path + ".p")
        else:                # Pickle neural network
            torch.save(self.actor_critic.state_dict(), os.path.join(directory, path))
            return os.path.join(directory, path)
    
    def save_video(self):
        '''Save or show video of agent trying its best'''
        self.run(epochs=1,train=False,visualize=True)
        print(f"Reward: {self.history['score'][-1]}")
        html = []
        for mp4 in Path('./videos').glob("*.mp4"):
            video_b64 = base64.b64encode(mp4.read_bytes())
            html.append('''<video alt="{}" autoplay 
                        loop controls style="height: 400px;">
                        <source src="data:video/mp4;base64,{}" type="video/mp4" />
                    </video>'''.format(mp4, video_b64.decode('ascii')))
        ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))
    
    
    def choose_action(self, observation):
        '''Choose next action based on observation'''
        # Get actions from actor-critic
        state = to_tensor(observation.reshape(-1, 1, self.observation_space))
        _, policy = self.actor_critic(state)
        # Categorical for discreet env, distribution over probability for continuous env
        if self.discrete:
            action_probabilities = Categorical(policy)
        else:
            action_probabilities = Normal(policy, self.sample_std)
        # Sample from distribution and store the action
        action = action_probabilities.sample()
        log_prob = action_probabilities.log_prob(action)
        self.action = action
        # Reduce dimensions
        return action.numpy()[0]
    

    def learn(self, state, state_, reward, done):
        '''Perform single learning step'''
        # Adjust types and sizes
        state, state_, reward = to_tensor(state), to_tensor(state_), to_tensor(reward)
        state = state.reshape(-1, 1, self.observation_space)
        state_ = state_.reshape(-1, 1, self.observation_space)
        
        # Inference NN and reduce dimensions of it's result
        state_val, policy = self.actor_critic(state)
        state_val_, _ = self.actor_critic(state_)
        state_val = state_val[0]
        state_val_ = state_val_[0]
        
        # Handle descrete and continuous
        if self.discrete:
            policy_distribution =  Categorical(policy)
        else:
            policy_distribution = Normal(policy, self.sample_std)
        # Caluclate loss/gradients
        log_policy = policy_distribution.log_prob(self.action)
        entropy = policy_distribution.entropy().mean()

        delta = reward + self.gamma * state_val_ * (1 - int(done)) - state_val
        actor_loss = - log_policy * delta - entropy * self.beta
        critic_loss = delta ** 2
        total_loss = actor_loss + critic_loss
        
        # Apply gradients
        self.optimizer.zero_grad()
        total_loss.backward()
        # Restrain gradients
        nn.utils.clip_grad_norm_([p for g in self.optimizer.param_groups for p in g["params"]], 0.5)
        self.optimizer.step()
    
    def run(self, epochs=200, max_steps=128, train=True, visualize=False):
        '''Run inference in loops - evaluation or training'''
        # MEMORY
        self.history = {
            "score": [],
        }
        cache = []
        max_reward = float('-inf')
        # Visualization wrapper
        env = self.monitor_env if visualize else self.env

        for epoch in tqdm(range(epochs)):
            observation = env.reset()    # For each epoch - reset env
            done = False                 # Agent not done
            rewards = 0                  # Cumulative rewards per epoch
            while not done:
                # Choose action to perform -> step the env
                action = self.choose_action(observation)
                observation_, reward, done, info = env.step(action[0])
                rewards += reward
                if train:    # Apply training if required
                    self.learn(observation, observation_, reward, done)
                observation = observation_
            # Keep track of best reward agent
            if rewards > max_reward:
                max_reward = rewards
                self.best = self.save_model(path=f"e{epoch}_r{rewards}", selfpickle=False)
            # Some printing, clearing cache
            self.history["score"].append(rewards)
            cache.append(rewards)
            if not ((epoch + 1) % 50):  
                #print(f'Run {epoch + 1}. - reward: {int(sum(cache)/len(cache))}')
                cache.clear()
        env.close()
        return self.history

In [None]:
agent = Agent(environment="Pendulum-v1", discrete=False)
agent.run(epochs=5000)

  2%|▊                                     | 109/5000 [01:09<1:11:20,  1.14it/s]

In [51]:
agent.load_best()
agent.save_video()

100%|█████████████████████████████████████████████| 1/1 [00:00<00:00, 12.57it/s]

Reward: -1663.0107221133637





In [None]:
class ActorCritic(nn.Module):
    def __init__(self, observation_space, action_space, hidden_layers=[512,256],
                 policy_activation=None, activations=nn.ReLU):
        super().__init__()
        
        # Write down hidden layers dimensions
        self.hidden_layers_dimensions = hidden_layers
        self.policy_activation_ = policy_activation
        self.activations = activations
        
        # Initialize the common model
        self.model = nn.ModuleList()
        layers = [observation_space] + hidden_layers + [action_space]
        for i, layer in enumerate(layers)[1:]:
            self.model.append(nn.Linear(layers[i-1], layers[i]))
        
        # Enpoind for policy
        self.policy = nn.Linear(hidden_layers[1], action_space)
        if self.policy_activation_:
            self.policy_activation = self.policy_activation_(action_space)
        # Endpoint for values
        self.value = nn.Linear(hidden_layers[1], 1)
        
    def forward(self, x):
        # Apply common model
        for layer in self.model:
            x = self.activations(layer(x))
        # Get value output
        value = self.value(x)
        # Get policy output
        policy = self.policy(x)
        if self.policy_activation_:
            policy = self.policy_activation(policy)
        return value, policy