# Cross-Entropy Method

---

In this notebook, we will train the Cross-Entropy Method with OpenAI Gym's MountainCarContinuous environment.

### 1. Import the Necessary Packages

In [None]:
import gym
import math
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

### 2. Instantiate the Environment and Agent

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
env = gym.make('BipedalWalker-v3') # need a score of 300 to be solved
# env = gym.make('MountainCarContinuous-v0')
env.seed(101)
np.random.seed(101)

print('observation space:', env.observation_space)
print('action space:', env.action_space)
print('  - low:', env.action_space.low)
print('  - high:', env.action_space.high)

class Agent(nn.Module):
    def __init__(self, env, h1_size=16, h2_size=32, h3_size=16):
        super(Agent, self).__init__()
        self.env = env
        # state, hidden layer, action sizes
        self.s_size = env.observation_space.shape[0]
        self.h1_size = h1_size
        self.h2_size = h2_size
        self.h3_size = h3_size
        self.a_size = env.action_space.shape[0]
        # define layers
        self.fc1 = nn.Linear(self.s_size, self.h1_size)
        self.fc2 = nn.Linear(self.h1_size, self.h2_size)
        self.fc3 = nn.Linear(self.h2_size, self.h3_size)
        self.fc4 = nn.Linear(self.h3_size, self.a_size)
        
    def set_weights(self, weights):
        s_size = self.s_size
        h1_size = self.h1_size
        h2_size = self.h2_size
        h3_size = self.h3_size
        a_size = self.a_size
        
        # separate the weights for each layer
        # get the amount of weights between the imput layer and the hidden layer + the bias
        fc1_end = (s_size * h1_size) + h1_size
        fc2_end = (h1_size * h2_size) + h2_size + fc1_end
        fc3_end = (h2_size * h3_size) + h3_size + fc2_end
        # get the weights between the input and the firs hidden layer
        fc1_W = torch.from_numpy(weights[:s_size * h1_size].reshape(s_size, h1_size))
        # get the bias of the first hidden layer
        fc1_b = torch.from_numpy(weights[s_size*h1_size:fc1_end])
        
        # get the weights between the input and the firs hidden layer
        fc2_W = torch.from_numpy(weights[fc1_end:fc1_end + (h1_size * h2_size)].reshape(h1_size, h2_size))
        # get the bias of the first hidden layer
        fc2_b = torch.from_numpy(weights[fc1_end + (h1_size * h2_size):fc2_end])
        # get the weights between the input and the firs hidden layer
        fc3_W = torch.from_numpy(weights[fc2_end:fc2_end + (h2_size * h3_size)].reshape(h2_size, h3_size))
        # get the bias of the first hidden layer
        fc3_b = torch.from_numpy(weights[fc2_end + (h2_size * h3_size):fc3_end])
        
        
        # get the weights between the hidden and the output layer
        fc4_W = torch.from_numpy(weights[fc3_end:fc3_end+(h3_size*a_size)].reshape(h3_size, a_size))
        # get the bias of the output layer
        fc4_b = torch.from_numpy(weights[fc3_end+(h3_size*a_size):])
        
        # set the weights for each layer
        self.fc1.weight.data.copy_(fc1_W.view_as(self.fc1.weight.data))
        self.fc1.bias.data.copy_(fc1_b.view_as(self.fc1.bias.data))
        
        self.fc2.weight.data.copy_(fc2_W.view_as(self.fc2.weight.data))
        self.fc2.bias.data.copy_(fc2_b.view_as(self.fc2.bias.data))
        self.fc3.weight.data.copy_(fc3_W.view_as(self.fc3.weight.data))
        self.fc3.bias.data.copy_(fc3_b.view_as(self.fc3.bias.data))
        
        self.fc4.weight.data.copy_(fc4_W.view_as(self.fc4.weight.data))
        self.fc4.bias.data.copy_(fc4_b.view_as(self.fc4.bias.data))
    
    # get the amount / dimension of the weights -> +1 is for the bias
    def get_weights_dim(self):
        return (self.s_size+1)*self.h1_size + (self.h1_size+1)*self.h2_size + (self.h2_size+1)*self.h3_size + (self.h3_size+1)*self.a_size
    
    # forward pass through the network
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = torch.tanh(self.fc4(x))
        return x.cpu().data
    
    # evaluate the calculated weights
    def evaluate(self, weights, gamma=1.0, max_t=5000):
        # put the calculated weights into the network
        self.set_weights(weights)
        episode_return = 0.0
        # reset the environment
        state = self.env.reset()
        for t in range(max_t):
            state = torch.from_numpy(state).float().to(device)
            # get the action dependent on the state (and the weights)
            action = self.forward(state)
            # act according to the action and get the next state, the reward, etc.
            state, reward, done, _ = self.env.step(action)
            # accumulate the reward (discounted by the discount factor) 
            episode_return += reward * math.pow(gamma, t)
            if done:
                break
        return episode_return
    
agent = Agent(env).to(device)

### 3. Train the Agent with the Cross-Entropy Method

Run the code cell below to train the agent from scratch.  Alternatively, you can skip to the next code cell to load the pre-trained weights from file.

In [None]:
def cem(n_iterations=500, max_t=1000, gamma=1.0, print_every=10, pop_size=50, elite_frac=0.2, sigma=0.5):
    """PyTorch implementation of the cross-entropy method.
        
    Params
    ======
        n_iterations (int): maximum number of training iterations
        max_t (int): maximum number of timesteps per episode
        gamma (float): discount rate
        print_every (int): how often to print average score (over last 100 episodes)
        pop_size (int): size of population at each iteration
        elite_frac (float): percentage of top performers to use in update
        sigma (float): standard deviation of additive noise
    """
    # number of best estimations
    n_elite=int(pop_size*elite_frac)

    scores_deque = deque(maxlen=100)
    scores = []
    # get initial weights
    best_weight = sigma * np.random.randn(agent.get_weights_dim())
    
    for i_iteration in range(1, n_iterations+1):
        # get pop_size additional estimations (their weights)
        weights_pop = [best_weight + (sigma*np.random.randn(agent.get_weights_dim())) for i in range(pop_size)]
        # get the rewards for each of the pop_size additional estimations and store them in array 'rewards'
        rewards = np.array([agent.evaluate(weights, gamma, max_t) for weights in weights_pop])

        # sort the rewards from smallest to largest (argsort) and return the largest n_elite element indicies
        elite_idxs = rewards.argsort()[-n_elite:]
        # get the weights of the n_elite best estimations
        elite_weights = [weights_pop[i] for i in elite_idxs]
        
        # calculate the mean weights out of the n_elite best estimations
        best_weight = np.array(elite_weights).mean(axis=0)
        
        # get the reward for this (mean) best estimate
        reward = agent.evaluate(best_weight, gamma=1.0)
        
        # store the reward
        scores_deque.append(reward)
        scores.append(reward)
        
        # save the state of the agent
        torch.save(agent.state_dict(), 'checkpoint_bipedal.pth')
        
        if i_iteration % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_iteration, np.mean(scores_deque)))
        
        # exit the agent if the score has reaches a defines threshold
        if np.mean(scores_deque)>=180.0:
            print('\nEnvironment solved in {:d} iterations!\tAverage Score: {:.2f}'.format(i_iteration, np.mean(scores_deque)))
            break
    return scores

scores = cem()

# plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(1, len(scores)+1), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()

### 4. Watch a Smart Agent!

In the next code cell, you will load the trained weights from file to watch a smart agent!

In [None]:
import time
# load the weights from file
agent.load_state_dict(torch.load('checkpoint_bipedal.pth'))

state = env.reset()
while True:
    state = torch.from_numpy(state).float().to(device)
    with torch.no_grad():
        action = agent(state)
    env.render()
    time.sleep(0.02)
    next_state, reward, done, _ = env.step(action)
    state = next_state
    if done:
        break

# env.close()

In [None]:
env.close()