In [5]:
# Basic
import gym
import numpy as np
import pandas as pd
import os
import sys
import pickle
from time import sleep
from collections import namedtuple, OrderedDict
from typing import List, Dict, NoReturn, Tuple, Optional, Any

# Visualization pretty
from pprint import pprint
from tqdm.notebook import tqdm_notebook
from IPython.display import clear_output

# NNs
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

In [2]:
import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib inline

### Configue GPU

In [14]:
try:
    import google.colab
    IN_COLAB = True
except:
    IN_COLAB = False

if IN_COLAB:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    n_gpu = torch.cuda.device_count()
    print(device, n_gpu)
    torch.cuda.get_device_name(0) 
else:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(device)

cpu


## Environment

**Description:** <br>
        A pole is attached by an un-actuated joint to a cart, which moves along
        a frictionless track. The pendulum starts upright, and the goal is to
        prevent it from falling over by increasing and reducing the cart's
        velocity.
        
**Source:** <br>
        This environment corresponds to the version of the cart-pole problem
        described by Barto, Sutton, and Anderson
        
**Observation:** <br>
        Type: Box(4) <br>
        
| Num | Observation | Min | Max |
| --- | --- | --- | --- |
| 0 | Cart Position          |   -4.8                 |  4.8 |
| 1 | Cart Velocity          |  -Inf                  |  Inf |
| 2 | Pole Angle             |   -0.418 rad (-24 deg) |  0.418 rad (24 deg) |
| 3 | Pole Angular Velocity  |   -Inf                 |  Inf |
        
**Actions:** <br>
        Type: Discrete(2) <br>
        Num   Action <br>
* 0     Push cart to the left
* 1     Push cart to the right

> Note: The amount the velocity that is reduced or increased is not
        fixed; it depends on the angle the pole is pointing. This is because
        the center of gravity of the pole increases the amount of energy needed
        to move the cart underneath it
        
**Reward:** <br>
        Reward is 1 for every step taken, including the termination step
        
**Starting State:** <br>
        All observations are assigned a uniform random value in [-0.05..0.05]
        
**Episode Termination:** <br>
* Pole Angle is more than 12 degrees.
* Cart Position is more than 2.4 (center of the cart reaches the edge of the display).
* Episode length is greater than 200.
* Solved Requirements:
* Considered solved when the average return is greater than or equal to 195.0 over 100 consecutive trials.

In [3]:
# loading Cartpole environment from gym
env = gym.make('CartPole-v1')
env.seed(seed=11)
print(f"Action space: {env.action_space.n}")  
# Cart Position, Cart Velocity, Pole Angle, Pole Angular Velocity
print(f"Observation space: shape {env.observation_space.shape},\n{env.observation_space.low} to {env.observation_space.high}")

Action space: 2
Observation space: shape (4,),
[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38] to [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]


In [4]:
# Show sime random episodes
try:
    env.reset()
    for _ in range(200):
        env.render(mode='human')
        clear_output(wait=True)
        env.step(env.action_space.sample()) # take a random action
finally:
    env.close()

## Agent

In [14]:
Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])

In [26]:
class CEM_Agent(nn.Module):
    
    def __init__(self, env, device, h_size: int = 16):
        super(CEM_Agent, self).__init__()
        self._env = env
        # state, hidden layer, action sizes
        if len(env.observation_space.shape) == 0:
            self._state_size = env.observation_space.n
        else:
            self._state_size = env.observation_space.shape[0]
        self._hidd_size = h_size
        if len(env.action_space.shape) == 0:
            self._act_size = env.action_space.n
        else:
            self._act_size = env.action_space.shape[0]
        self._device = device
        # define net
        self._net = nn.Sequential(OrderedDict([
            ("fc1", nn.Linear(self._state_size, self._hidd_size)),
            ("relu", nn.ReLU()),
            ("fc2", nn.Linear(self._hidd_size, self._act_size))
        ])).to(self._device)
        self.__n_params = self.__count_parameters()
        
        
    def __count_parameters(self) -> int:
        return np.sum([c._parameters.get('weight').data.flatten().shape[0] +
                       c._parameters.get('bias').data.flatten().shape[0]
                       for i, c in self._net.named_children() 
                       if len(c._parameters.keys()) > 0])
                
        
    def set_weights(self, weights: np.ndarray):
        assert(weights.shape[0] == self.__n_params)
        layers = dict(net.named_children())
        fc1_end = self._state_size * self._hidd_size + self._hidd_size
        # fc1
        fc1_w = torch.from_numpy(weights[:self._state_size * self._hidd_size].reshape(self._state_size, 
                                                                                        self._hidd_size)).to(self._device)
        fc1_b = torch.from_numpy(weights[(self._state_size * self._hidd_size):
                                         fc1_end].reshape(self._state_size)).to(self._device)
        layers.get('fc1')._parameters.get('weight').data.copy_(fc1_w) # .view_as(layers.get('fc1')._parameters.get('weight').data)
        layers.get('fc1')._parameters.get('bias').data.copy_(fc1_b)
        # fc2
        fc2_w = torch.from_numpy(weights[fc1_end:self._act_size * self._hidd_size].reshape(self._act_size, 
                                                                                        self._hidd_size)).to(self._device)
        fc2_b = torch.from_numpy(weights[(self._act_size * self._hidd_size) 
                                         :].reshape(self._state_size)).to(self._device)
        layers.get('fc2')._parameters.get('weight').data.copy_(fc2_w) 
        layers.get('fc2')._parameters.get('bias').data.copy_(fc2_b)
        
        
    def forward(self, x) -> np.ndarray:
        return F.tanh(self._net(x.float().to(self._device))).cpu().data
    
    
    def evaluate(self, weights: np.ndarray, gamma: float = 1.0, 
                 max_iters: int = 1000) -> float:
        # set given weights
        self.set_weights(weights)
        # run replay
        episode_reward = 0.0
        state = self._env.reset()
        for t in range(max_iters):
            state = torch.from_numpy(state).float().to(self._device)
            action = self.forward(state)
            state, reward, done, _ = self._env.step(action)
            episode_reward += reward * np.power(gamma, t)
            if done:
                break
        return episode_reward

In [27]:
# Create Agent
agent = CEM_Agent(env, device=device, h_size=10)

In [28]:
agent

CEM_Agent(
  (_net): Sequential(
    (fc1): Linear(in_features=4, out_features=10, bias=True)
    (relu): ReLU()
    (fc2): Linear(in_features=10, out_features=2, bias=True)
  )
)