# Cross-Entropy Method

---

This notebook, shows a sample implementation of Cross-Entropy Method with OpenAI Gym's MountainCarContinuous environment.

In [None]:
import gym
import math
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

print(torch.__version__)

## Environment Description

State space: Continous    
Action Space: Continuous

In [None]:
env = gym.make('MountainCarContinuous-v0')
env.seed(101)
np.random.seed(101)

print('observation space:', env.observation_space)
print('observation space shape:', env.observation_space.shape)
print('action space:', env.action_space)
print('  - low:', env.action_space.low)
print('  - high:', env.action_space.high)
print('action space size:', env.action_space.shape[0])


### 2. Agent Definition

In [None]:
from typing import List

class WeightModifier:
    """
    Given a flat weights array, this class can disribute the array weights to individual
    layer components.
    
    Total number of weights required can be found by calling get_weights_dim() method
    """
    def _set_layers(self, layers: List[nn.Linear]):
        """
        All layers of the network are saved in an array and indices are computed to be used
        later to copy weights from a flat numpy array containing weights for all of the 
        layers.
        """
        self.layers = layers
        self.w_indices = []
        idx_start = 0
        for l in self.layers:
            indices = self._w_b_indices(l, idx_start)
            self.w_indices.append((idx_start, *indices))
            idx_start = indices[1]
            
    def _w_b_indices(self, layer, start):
        """
        returns the indices in the weights array where the given layer's 
        weights and biases are to be copied from
        """
        w = np.prod(layer.weight.shape)   # e.g. 4 layer with 2 input would be (4,2)=8 shape
        b = layer.bias.shape[0]           # bias only has as many as neurons in this layer
        return start + w, start + w + b

    def _set_layer_weights_(self, layer, weights, layer_no):
        start, layer_w, layer_b = self.w_indices[layer_no]
        
        # pick up weights and biases from the weights array passed in
        w = weights[start: layer_w]
        b = weights[layer_w: layer_b]
        
        # change weight and bias of the given layer
        layer.weight.data.copy_(torch.from_numpy(w).view(layer.weight.shape))
        layer.bias.data.copy_(torch.from_numpy(b).view(layer.bias.shape))
        
        # returns the index where next layer's weights will start from
        return layer_b
    
    def set_weights(self, weights):
        index = 0
        for idx, l in enumerate(self.layers):
            self._set_layer_weights_(l, weights, idx)
        return self

    def get_weights(self):
        w = np.zeros(self.get_weights_dim())
        for idx, l in enumerate(self.layers):
            i = self.w_indices[idx]
            w[i[0] : i[1]] = l.weight.data.cpu().detach().numpy().reshape(-1)
            w[i[1] : i[2]] = l.bias.data.cpu().detach().numpy().reshape(-1)
        return np.array(w)
        
    def get_weights_dim(self):
        return self.w_indices[-1][2]
    
    def gen_random(self, sigma = 1.):
        return sigma * np.random.rand(self.get_weights_dim())

In [None]:
class Agent(nn.Module, WeightModifier):
    def __init__(self, env):
        super().__init__()
        self.env = env
        s_size = env.observation_space.shape[0]
        a_size = env.action_space.shape[0]

        self.fc1 = nn.Linear(s_size, 16)
        self.fc2 = nn.Linear(16, a_size)
        
        super()._set_layers([self.fc1, self.fc2])

    def forward(self, x):
        x_tensor = torch.from_numpy(x).float().to(self.fc1.weight.device)
        output = F.relu(self.fc1(x_tensor))
        output = torch.tanh(self.fc2(output))
        return output.cpu().data
    
    def act(self, x):
        return self.forward(x)
    
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

agent = Agent(env).to(device)

In [None]:
def run_episode(agent, env, gamma=1.0, max_t=5000):
    ep_reward = 0.0
    gamma_t = 1
    score = 0
    state = env.reset()

    for t in range(max_t):
        action = agent.act(state)
        state, reward, done, _ = env.step(action)
        score += reward
        ep_reward += reward * gamma_t
        gamma_t *= gamma

        if done:
            break

    return score, ep_reward

### 3. Train Agent with the Cross-Entropy Method

The following cell will train the agent using cross entropy method

In [None]:
def cem(agent, n_episodes=500, max_t=1000, gamma=1.0, print_every=10, pop_size=50, elite_frac=0.2, sigma=0.5):
    """PyTorch implementation of the cross-entropy method.
        
    Params
    ======
        n_iterations (int): maximum number of training iterations
        max_t (int): maximum number of timesteps per episode
        gamma (float): discount rate
        print_every (int): how often to print average score (over last 100 episodes)
        pop_size (int): size of population at each iteration
        elite_frac (float): percentage of top performers to use in update
        sigma (float): standard deviation of additive noise
    """
    n_elite = int(pop_size * elite_frac)

    scores_100 = deque(maxlen=100)
    scores = []
    dim = agent.get_weights_dim()
    best_weight = sigma * np.random.randn(dim)

    agent_with_weight = lambda a, w: a.set_weights(w)
    
    for ep_no in range(1, n_episodes + 1):
        # generate 50 more populations by adding small random numbers
        # to the best weights that we have with us
        
        weights_pop = best_weight + sigma * np.random.randn(pop_size, dim)

        # run each of these 50 population through the agent and get their rewards
        rewards = []
        for i in range(pop_size):
            print(f'\rEp: {ep_no} Population: {i}', end='')
            _, r = run_episode(agent_with_weight(agent, weights_pop[i]), env)
            rewards.append(r)

        # pick the best rewarding weight indices from the rewards and then pick the corresponding weights
        pop_best_idx = np.array(rewards).argsort()[-n_elite:]
        pop_best_weights = weights_pop[pop_best_idx]
        
        # use the mean of the best indices
        best_weight = pop_best_weights.mean(axis=0)

        # evaluate the new weights and keep their scroes in the last 100 episode array
        best_agent = agent_with_weight(agent, best_weight)
        
        _, reward = run_episode(agent, env, gamma = 1.)
        scores_100.append(reward)
        scores.append(reward)
        
        torch.save(agent.state_dict(), 'checkpoint2.pth')
        
        if ep_no % print_every == 0:
            print(f'\rEpisode {ep_no}\tAverage Score: {np.mean(scores_100):.2f}', end='')

        if np.mean(scores_100) >= 90.0:
            print(f'\nEnv solved in {ep_no:d} iterations!\tAverage Score: {np.mean(scores_100):.2f}')
            break
            
    return scores

scores = cem(agent, n_episodes = 500)
print('\nFinished')


In [None]:

# plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(1, len(scores)+1), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()

### 4. Watch a Smart Agent!

In the next code cell, you will load the trained weights from file to watch a smart agent!

In [None]:
# load the weights from file
agent.load_state_dict(torch.load('checkpoint.pth'))

state = env.reset()
while True:
    state = torch.from_numpy(state).float().to(device)
    with torch.no_grad():
        action = agent(state)
    env.render()
    next_state, reward, done, _ = env.step(action)
    state = next_state
    if done:
        break

env.close()