In [57]:
import gym
import numpy as np
import torch
from torch import nn
import matplotlib.pyplot as plt
from collections import namedtuple
import random

In [58]:
env = gym.make('MountainCar-v0', render_mode=None)

In [59]:
#Env Constants
#3 actions, 0, 1, or 2
N_ACTIONS = env.action_space.n
#A state is defined by the x position and velocity
STATE_VECTOR_DIM = env.observation_space.shape[0]

In [60]:
#Hyperparameters

num_episodes = 50

timesteps_per_episode = 500

batch_size = 64

capacity = 1000

gamma = 0.97

starting_epsilon = 0.05
epsilon_decay_value = 0.995

lr = 0.01

In [61]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory():
    def __init__(self, capacity=1000):
        self.capacity = capacity
        self.memory = []
        
    def push(self, state, action, reward, next_state):
        
        if len(self) >= self.capacity:
            self.memory.pop(0)
        
        self.memory.append(Transition(state, action, next_state, reward))
    
    def sample(self, batch_size):
        current_size = len(self.memory)
        if batch_size > current_size:
            batch_size = current_size
        
        indices = random.sample(range(current_size), batch_size)
        batch = Transition(*zip(*[self.memory[i] for i in indices]))
        return {
            'states': np.array(batch.state),
            'actions': np.array(batch.action),
            'next_states': np.array(batch.next_state),
            'rewards': np.array(batch.reward),
        }

    def __len__(self):
        return len(self.memory)

In [62]:
class DQN(nn.Module):
    def __init__(self, state_vector_dim, n_actions, n_neurons=24):
        super().__init__()
        self.layer1 = nn.Linear(state_vector_dim, n_neurons)
        self.layer2 = nn.Linear(n_neurons, n_neurons)
        self.layer3 = nn.Linear(n_neurons, n_actions)
        
    def forward(self, state_vector):
        state_vector = torch.relu(self.layer1(state_vector))
        state_vector = torch.relu(self.layer2(state_vector))
        state_vector = self.layer3(state_vector)
        return state_vector

In [63]:
class EpsilonGreedyPolicy():
    def __init__(self, starting_epsilon, decay_value, n_actions):
        self.epsilon = starting_epsilon
        self.decay_value = decay_value
        self.n_actions = n_actions
    
    def act(self, q_network, state):
        rnd_num = np.random.random()
        if rnd_num < self.epsilon:
            action = np.random.randint(0, self.n_actions)
        else:
            with torch.no_grad():
                action = torch.argmax(q_network(state)).item()
                
        self._decay()
        return action
    
    def _decay(self):
        self.epsilon *= self.decay_value

In [64]:
def custom_reward_function(next_state, terminated, timestep, timesteps_per_episode):
    positive_velocity_scaling = 100
    negative_velocity_scaling = 1
    success_scaling = 500
    time_scaling = 2
    
    time_penalty = time_scaling * timestep / timesteps_per_episode
    
    success_bonus = terminated * success_scaling
    
    velocity_scaling = positive_velocity_scaling if next_state[1].item() > 0 else negative_velocity_scaling
    
    reward = velocity_scaling * next_state[1].item() + success_bonus - time_penalty
    
    return reward

In [65]:
def train_network(q_network, loss_fn, optimizer, predictions, targets):
    
    for param in q_network.parameters():
        param.grad = None
    
    loss = loss_fn(predictions, targets)
        
    loss.backward()
    
    optimizer.step()

In [66]:
memory = ReplayMemory(capacity)

q_network = DQN(STATE_VECTOR_DIM, N_ACTIONS)

optimizer = torch.optim.Adam(q_network.parameters(), lr=lr)
loss_fn = torch.nn.SmoothL1Loss()

policy = EpsilonGreedyPolicy(starting_epsilon, epsilon_decay_value, N_ACTIONS)

In [None]:
history = {
    "episode_returns": [],
    "timesteps": [],
}

for ep_number in range(num_episodes):
    
    state, info = env.reset()
    state = torch.tensor(state).float()
    
    episode_return = 0
    timestep = 0
    while True:
        
        action = policy.act(q_network, state)
                
        next_state, _, terminated, truncated, info = env.step(action)
        reward = custom_reward_function(next_state, terminated, timestep, timesteps_per_episode)
        
        next_state = torch.tensor(next_state, dtype=torch.float32)
        episode_return += reward
        
        
        memory.push(state, action, reward, next_state)
                
        batch_transitions = memory.sample(batch_size)
        
        batch_states = torch.tensor(batch_transitions["states"], dtype=torch.float32)
        batch_actions = torch.tensor(batch_transitions["actions"], dtype=torch.int64)
        batch_next_states = torch.tensor(batch_transitions["next_states"], dtype=torch.float32)
        batch_rewards = torch.tensor(batch_transitions["rewards"], dtype=torch.float32)
        
        current_q_values = q_network(batch_states).gather(1, batch_actions.unsqueeze(1)).squeeze(1)
        
        with torch.no_grad():
            max_next_q_values = q_network(batch_next_states).max(1)[0]
            target_q_values = batch_rewards + gamma * max_next_q_values * (1 - terminated)
        
        train_network(q_network, loss_fn, optimizer, current_q_values, target_q_values)
                
        state = next_state
        
        if terminated or timestep >= timesteps_per_episode:
            print(f"Episode {ep_number}, Success: {terminated}")
            break
        
        timestep += 1
            
    history['episode_returns'].append(episode_return)
    history['timesteps'].append(timestep)
    print(f"Episode {ep_number}, Return: {episode_return}")

env.close()

Episode 0, Success: True
Episode 0, Return: 539.7670902597799
Episode 1, Success: True
Episode 1, Return: 726.5854521340278
Episode 2, Success: False
Episode 2, Return: -94.26990122487766
Episode 3, Success: False
Episode 3, Return: -23.24365687379758
Episode 4, Success: False
Episode 4, Return: -222.30191045554415
Episode 5, Success: True
Episode 5, Return: 620.1102958235933
Episode 6, Success: True
Episode 6, Return: 700.0672634952597
Episode 7, Success: True
Episode 7, Return: 655.5948144703517
Episode 8, Success: True
Episode 8, Return: 612.4374440038456
Episode 9, Success: True
Episode 9, Return: 561.293872854685
Episode 10, Success: True
Episode 10, Return: 594.3336353123124
Episode 11, Success: True
Episode 11, Return: 581.4685155325915
Episode 12, Success: True
Episode 12, Return: 618.1217045527161
Episode 13, Success: True
Episode 13, Return: 677.9230325782336
Episode 14, Success: True
Episode 14, Return: 694.2019342170058
Episode 15, Success: True
Episode 15, Return: 706.2418

In [None]:
plt.plot(history['timesteps'])
plt.xlabel("Episode")
plt.ylabel("Timesteps Spent")
plt.show()

In [None]:
plt.plot(history['episode_returns'])
plt.xlabel("Episode")
plt.ylabel("Return")
plt.show()

In [None]:
#Watch the agent play
env = gym.make('MountainCar-v0', render_mode='human')
while True:
    state, info = env.reset()
    state = torch.tensor(state).float()
    timestep = 0
    while True:
        
        action = policy.act(q_network, state)
                
        next_state, _, terminated, truncated, info = env.step(action)
        reward = custom_reward_function(next_state, terminated, timestep, timesteps_per_episode)
        
        next_state = torch.tensor(next_state, dtype=torch.float32)
        episode_return += reward
                
        state = next_state
        
        if terminated or timestep >= timesteps_per_episode:
            if terminated:
                print("Success")
            else:
                print("Failure")
            break
        
        timestep += 1

env.close()