Here the agent is learning on two continuous variables: position and velocity. For any given state (position and velocity) of the car, the agent is given the possibility of driving left, driving right, or not using the engine at all.

In [1]:
import gym
import math
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional
from collections import deque

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
env = gym.make('MountainCarContinuous-v0')

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [3]:
env.seed(101)
np.random.seed(101)

In [4]:
class Agent(nn.Module):
    def __init__(self, env, hiddenLayers=16):
        super(Agent, self).__init__()
        self.env = env
        self.stateSize = env.observation_space.shape[0]
        self.hiddenLayerSize = hiddenLayers
        self.actionSize = env.action_space.shape[0]
        # define layers
        self.fc1 = nn.Linear(self.stateSize, self.hiddenLayerSize)
        self.fc2 = nn.Linear(self.hiddenLayerSize, self.actionSize)

    def initWeights(self, weights):
        stateSize = self.stateSize
        hiddenLayerSize = self.hiddenLayerSize
        actionSize = self.actionSize
        # separate the weights for each layer
        fc1_end = (stateSize * hiddenLayerSize) + hiddenLayerSize
        fc1_W = torch.from_numpy(weights[:stateSize * hiddenLayerSize].reshape(stateSize, hiddenLayerSize))
        fc1_b = torch.from_numpy(weights[stateSize * hiddenLayerSize:fc1_end])
        fc2_W = torch.from_numpy(weights[fc1_end:fc1_end + (hiddenLayerSize * actionSize)].reshape(hiddenLayerSize, actionSize))
        fc2_b = torch.from_numpy(weights[fc1_end + (hiddenLayerSize * actionSize):])
        # set the weights for each layer
        self.fc1.weight.data.copy_(fc1_W.view_as(self.fc1.weight.data))
        self.fc1.bias.data.copy_(fc1_b.view_as(self.fc1.bias.data))
        self.fc2.weight.data.copy_(fc2_W.view_as(self.fc2.weight.data))
        self.fc2.bias.data.copy_(fc2_b.view_as(self.fc2.bias.data))

    def evaluate(self, weights, gamma=1.0, max_t=5000):
        self.initWeights(weights)
        episode_return = 0.0
        state = self.env.reset()
        for t in range(max_t):
            state = torch.from_numpy(state).float().to(device)
            action = self.forward(state)
            state, reward, done, _ = self.env.step(action)
            episode_return += reward * math.pow(gamma, t)      # Modifying the reward
            if done:
                break
        return episode_return

    def getWeights(self):
        return (self.stateSize + 1) * self.hiddenLayerSize + (self.hiddenLayerSize + 1) * self.actionSize

    def forward(self, x):
        x = torch.nn.functional.relu(self.fc1(x))
        x = torch.nn.functional.tanh(self.fc2(x))
        return x.cpu().data

In [5]:
agent = Agent(env).to(device)

In [6]:
def crossEntropyCalculator(trainEpisodes=501, maxSteps=1000, gamma=1.0, popSize=52, eliteFrac=0.2, sigma=0.5):
    nElite = int(popSize * eliteFrac)
    scoresDeque = deque(maxlen=100)
    scores = list()
    bestWeight = sigma * np.random.randn(agent.getWeights())

    for episode in range(1, trainEpisodes + 1):
        weights_pop = [bestWeight + (sigma * np.random.randn(agent.getWeights())) for _ in range(popSize)]
        rewards = np.array([agent.evaluate(weights, gamma, maxSteps) for weights in weights_pop])

        eliteIdxs = rewards.argsort()[-nElite:]
        eliteWeights = [weights_pop[i] for i in eliteIdxs]
        bestWeight = np.array(eliteWeights).mean(axis=0)

        reward = agent.evaluate(bestWeight, gamma=1.0)
        scoresDeque.append(reward)
        scores.append(reward)

        torch.save(agent.state_dict(), 'checkpoint.pth')
        print(f'Episode {episode}\t Average Score: {np.mean(scoresDeque):.2f}')

        if np.mean(scoresDeque) >= 90:
            print(f'\nEpisodes needed to learn a good policy {episode - 100} episodes\t')
            break
    return scores

In [7]:
scores = crossEntropyCalculator()
# Loading weights from file
agent.load_state_dict(torch.load('checkpoint.pth'))
state = env.reset()



Episode 1	 Average Score: -5.67
Episode 2	 Average Score: -3.07
Episode 3	 Average Score: -2.13
Episode 4	 Average Score: -2.07
Episode 5	 Average Score: -1.70
Episode 6	 Average Score: -1.60
Episode 7	 Average Score: -1.68
Episode 8	 Average Score: -1.56
Episode 9	 Average Score: -1.43
Episode 10	 Average Score: -1.30
Episode 11	 Average Score: -1.25
Episode 12	 Average Score: -1.17
Episode 13	 Average Score: -1.10
Episode 14	 Average Score: -1.04
Episode 15	 Average Score: -0.98
Episode 16	 Average Score: -0.93
Episode 17	 Average Score: -1.21
Episode 18	 Average Score: -1.44
Episode 19	 Average Score: -1.39
Episode 20	 Average Score: -1.32
Episode 21	 Average Score: -1.29
Episode 22	 Average Score: -1.28
Episode 23	 Average Score: -1.28
Episode 24	 Average Score: -1.25
Episode 25	 Average Score: -1.48
Episode 26	 Average Score: -1.45
Episode 27	 Average Score: -1.44
Episode 28	 Average Score: -1.61
Episode 29	 Average Score: -1.57
Episode 30	 Average Score: -1.71
Episode 31	 Average

In [8]:
while True:
    state = torch.from_numpy(state).float().to(device)
    with torch.no_grad():
        action = agent(state)
    env.render()
    next_state, reward, done, _ = env.step(action)
    state = next_state
    if done:
        break



In [9]:
env.close()