In [9]:
import numpy as np
import math
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as functional

# Set up environment
import gym
env = gym.make("CartPole-v0")


In [20]:
# Variables
episodes = 500
hidden_layer_size = 256
learning_rate = 0.001  # Initial learning rate
batch_size = 64

In [21]:
# Tensors
FloatTensor = torch.FloatTensor
LongTensor = torch.LongTensor
ByteTensor = torch.ByteTensor
Tensor = FloatTensor

In [22]:
class replay_buffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        
    def push(self, transition):
        self.buffer.append(transition)
        
        # If the buffer gets filled above capacity, we delete the oldest entry [0]
        if len(self.buffer) > self.capacity:
            del self.buffer[0]
            
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

In [23]:
class neural_net(nn.Module):
    def __init__(self):
        nn.Module.__init__(self)
        self.l1 = nn.Linear(4, hidden_layer_size)
        self.l2 = nn.Linear(hidden_layer_size, 2)
        
    def forward(self, x):
        x = functional.relu(self.l1(x))
        x = self.l2(x)
        
        return x

In [24]:
# Define agent class
class cart_pole_agent:
    # Constructor
    def __init__(self, min_learning_rate=0.1, min_epsilon=0.1, discount=0.8, decay_rate=25):
        
        self.min_learning_rate = min_learning_rate
        self.learning_rate = min_learning_rate
        self.min_epsilon = min_epsilon
        self.epsilon = min_epsilon
        self.discount = discount
        self.decay_rate = decay_rate
        
        
        #self.steps = np.zeros(self.num_episodes)
    
    def choose_action(self, state):
        
        if (np.random.random() < self.epsilon):
            return nn_model(Variable(state).type(FloatTensor)).data.max(1)[1].view(1,1)
        else:
            return LongTensor([[random.randrange(2)]])

    

In [25]:
nn_model = neural_net()
    
buffer = replay_buffer(10000)
optimizer = optim.Adam(nn_model.parameters(), learning_rate)

In [26]:

agent = cart_pole_agent()

# Train the agent
for episode in range(episodes):
    # Get initial state from the environment
    current_state = env.reset()
    
    done = False
    
    while not done:
        
        action = agent.choose_action(FloatTensor([current_state]))
        new_state, reward, done, _ = env.step(action[0, 0].item())
        
        if done:
            reward = -1
        
        buffer.push((FloatTensor([current_state]), action, FloatTensor([new_state]), FloatTensor([reward])))
        
        if len(buffer) < batch_size:
            continue
        
        transitions = buffer.sample(batch_size)
        
        batch_state, batch_action, batch_next_state, batch_reward = zip(*transitions)
        
        batch_state = Variable(torch.cat(batch_state))
        batch_action = Variable(torch.cat(batch_action))
        batch_reward = Variable(torch.cat(batch_reward))
        batch_next_state = Variable(torch.cat(batch_next_state))
        
        current_q_values = nn_model(batch_state).gather(1, batch_action)
        
        max_next_q_values = nn_model(batch_next_state).detach().max(1)[0]
        
        expected_q_values = batch_reward + (agent.decay_rate * max_next_q_values)
        
        loss = functional.smooth_l1_loss(current_q_values, expected_q_values)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        
        current_state = new_state
        

print("Training has finished!")



Training has finished!


In [27]:
# Run the trained agent

episodes = 10
for episode in range(episodes):
    done = False
    current_state = env.reset()
    
    while not done:
        env.render()
        action = agent.choose_action(FloatTensor([current_state]))
        new_state, reward, done, _ = env.step(action[0, 0].item())
        
        buffer.push((FloatTensor([current_state]), action, FloatTensor([new_state]), FloatTensor([reward])))
        
        current_state = new_state
    
env.close()