In [14]:
# Using cross entropy method to find an optimal policy for the Mountain Car Environment

import gym
import math
import numpy as np
from collections import deque
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

In [24]:
env = gym.make('MountainCarContinuous-v0')

print('Observation Space: ', env.observation_space)
print('Action Space: ', env.action_space)
print('Observation Space Lower limits: ', env.observation_space.low)
print('Observation Space Higher limits: ', env.observation_space.high)
print('Action Space Lower limits: ', env.action_space.low)
print('Action Space Higher limits: ', env.action_space.high)
# print(env.observation_space.shape[0])
# print(env.action_space.shape[0])

env.seed(1)
np.random.seed(1)

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
Observation Space:  Box(2,)
Action Space:  Box(1,)
Observation Space Lower limits:  [-1.2  -0.07]
Observation Space Higher limits:  [0.6  0.07]
Action Space Lower limits:  [-1.]
Action Space Higher limits:  [1.]


In [26]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

class Agent(nn.Module):
    def __init__(self, env, h_size=16):
        super(Agent, self).__init__()
        self.env = env
        self.s_size = env.observation_space.shape[0]
        self.h_size = h_size
        self.a_size = env.action_space.shape[0]

        #Layer definition
        self.fc1 = nn.Linear(self.s_size, self.h_size)
        self.fc2 = nn.Linear(self.h_size, self.a_size)
        
    def set_weights(self, weights):
        s_size = self.s_size
        h_size = self.h_size
        a_size = self.a_size
        # separate the weights for each layer
        fc1_end = (s_size*h_size)+h_size
        fc1_W = torch.from_numpy(weights[:s_size*h_size].reshape(s_size, h_size))
        fc1_b = torch.from_numpy(weights[s_size*h_size:fc1_end])
        fc2_W = torch.from_numpy(weights[fc1_end:fc1_end+(h_size*a_size)].reshape(h_size, a_size))
        fc2_b = torch.from_numpy(weights[fc1_end+(h_size*a_size):])

        # set the weights for each layer
        self.fc1.weight.data.copy_(fc1_W.view_as(self.fc1.weight.data))
        self.fc1.bias.data.copy_(fc1_b.view_as(self.fc1.bias.data))
        self.fc2.weights.data.copy_(fc2_W.view_as(self.fc2.weights.data))
        self.fc2.bias.data.copy_(fc2_b.view_as(self.fc2.bias.data))

    def get_weights_dim(self):
        return (self.s_size*self.h_size)+(self.h_size*1)+(self.a_size*self.h_size)+(self.a_size*1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        # We're using tanh as the final activation function here, 
        # since the action vector that this environment can take as input vary from -1 to +1. 
        # Look at the min and max of action space above.
        x = F.tanh(self.fc2(x)) 
        return x.cpu().data

    def evaluate(self, weights, gamma = 1.0, max_t = 5000):
        self.set_weights(weights)
        episode_return = 0.0
        state = self.env.reset()
        for t in range(max_t):
            state = torch.from_numpy(state).float().to(device)
            action = self.forward(state)
            state, reward, done, _ = self.env.step(action)
            episode_return += reward * math.pow(gamma, t) # summing up the discounted rewards
            if done:
                break
        return episode_return
    
agent = Agent(env).to(device)
    
    