# Solving Cartpole v0 by DQN

In [1]:
import random

import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
env = gym.make("CartPole-v1")

## Test run

In [None]:
observation = env.reset()

for _ in range(100):
    env.render()
    action = env.action_space.sample() # your agent here (this takes random actions)
    observation, reward, done, info = env.step(action)

    if done:
        print("Episode finished")
        observation = env.reset()
env.close()

# Section 1 - Solving by using DQN with $\epsilon$-greedy policy

Checklist:
1. Objective function
2. Preprocess data
3. Samples generation

In [3]:
# Environment understanding
print("State space", env.observation_space.shape[0])
print("Action space", env.action_space.n)

State space 4
Action space 2


In [4]:
state_space = env.observation_space.shape[0]
action_space = env.action_space.n

## 1. The policy network
The network takes in the state of the game and decide what we should do. 

For simplicity, use a simple 2-layer NN that takes in the observations and then produce a single number indicating the probability of pushing LEFT or RIGHT. It is standard to use a stochastic policy, meaning that the NN will only produce a probability of each action. 

We are going to train our model with a single experience:
1. Let the model estimate Q values of the old state
2. Let the model estimate Q values of the new state
3. Calculate the new target Q value for the action, using the known reward
4. Train the model with input = (old state), output = (target Q values)

In [5]:
# Hyperparameters
BATCH_SIZE = 32
LEARNING_RATE = 1e-3
INITIAL_EPSILON = 0.95
EPSILON_DECAY_RATE = 0.995
MIN_EPSILON = 0.01
GAMMA = 0.95
C = 10  # Update the network parameters every C iteration
MEMORY_CAPACITY = 100000  # Capacity of experience replay memory

In [6]:
NUM_EPISODES = 5000

In [7]:
from collections import namedtuple, deque
from random import sample

Pseudo code
---
```
Initialise replay memory D to capacity N
Initialise action-value function Q with random weights
Initialise target action-value function Q_hat with weights_hat = weights

For episode = 1, M:
    Reset environment and get initial state
    Preprocess initial state phi1 = phi(s1)
    For t = 1, T:
        Use epsilon-greedy policy to select an action
        Execute action, observe states and rewards
        Store transition S, A, R, S' (Inside function step())
        Sample random minibatch of transitions from experience D (Inside function step())
        Calculate TD target and TD error (Inside function step())
        Perform a gradient descent step on TD error (Inside function step())
        For every C steps reset Q_hat = Q
    End For
End For
```
---

In [8]:
class QNetwork(nn.Module):
    """
    Policy Network
    """
    
    def __init__(self, state_size, action_size, fc1_units=24, fc2_units=24):
        super().__init__()
        
        self.fc1 = nn.Linear(state_size, fc1_units)
        self.fc2 = nn.Linear(fc1_units, fc2_units)
        self.fc3 = nn.Linear(fc2_units, action_size)
        
    def forward(self, x):
        """
        Forward pass
        Essentially, the forward pass return the Q value
        """
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x
    
def loss_fn(output, labels):
    """
    Compute the loss given outputs and labels
    
    Args:
        outputs (Variable)
        labels (Variable)
    """
    return torch.nn.MSELoss(reduction="sum")(output, labels)

def some_measurement(outputs, labels):
    """
    Compute the performance measurement, given the outputs and labels for all images
    """
    pass

In [9]:
print(env.action_space.sample())

1


In [10]:
Experience = namedtuple("Experience", "s a r s_ done")

class DQNAgent:
    """
    A Deep Q learning agent
    https://github.com/udacity/deep-reinforcement-learning/blob/master/dqn/exercise/dqn_agent.py
    https://towardsdatascience.com/reinforcement-learning-tutorial-part-3-basic-deep-q-learning-186164c3bf4
    https://morvanzhou.github.io/tutorials/machine-learning/torch/4-05-DQN/
    """
    
    def __init__(self, env, loss_fcn, learning_rate=LEARNING_RATE, gamma=GAMMA):
        
        # Environment parameters
        self.env = env
        self.n_actions = env.action_space.n
        self.n_state = env.observation_space.shape[0]
        
        # NN - Q function with random parameters
        # The main_net is used for training at every step - weights is theta
        # The target_net is used for prediction at every step - weights is theta^neg
        self.main_net, self.target_net = self.create_model(), self.create_model()
        self.loss_fcn = loss_fcn
        self.optimizer = torch.optim.Adam(self.main_net.parameters(), lr=learning_rate)
        
        # Set the weights of target_net equals to main_net
        self.target_net.load_state_dict(self.main_net.state_dict())
        
        # Experience replay
        self.experience_memory = deque(maxlen=MEMORY_CAPACITY)
        
        # Other parameters
        self.learning_rate = learning_rate
        self.gamma = gamma
        
        # Counters
        self.target_update_counter = 0
    
    def create_model(self):
        return QNetwork(self.n_state, self.n_actions)
    
    def act(self, states, epsilon):
        """
        Epsilon Greedy Policy
        if eps = 0 -> Greedy policy
        
        Args:
            x (torch.Variable): features
        """
        states_torch = torch.FloatTensor(states)
        if random.uniform(0, 1) < epsilon:
            return self.env.action_space.sample()
        else:
            action_value = self.main_net.forward(states_torch)
            action = torch.argmax(action_value).item()
            return action
    
    def step(self, s, a, r, s_pi, done):
        """
        Something that we should do for each step
        """
        # Save the experience in replay memory
        self.experience_memory.append(Experience(s, a, r, s_pi, done))
        
        # Start learning when there are enough samples
        if len(self.experience_memory) > BATCH_SIZE:
            sample_experiences = sample(self.experience_memory, BATCH_SIZE)
            self.learn(sample_experiences, GAMMA)
    
    def learn(self, experiences, gamma):
        """
        Update value parameters using given batch of experience tuples
        
        Args:
            experiences (list of Experience tuple)
            gamma (float)
        """
        
        # Sample the data from experience memory
        states, actions, rewards, next_states, dones = zip(*experiences)
        
        states_torch = torch.FloatTensor(states)
        actions_torch = torch.FloatTensor(actions)
        rewards_torch = torch.FloatTensor(rewards)
        next_states_torch = torch.FloatTensor(next_states)
        dones_torch = torch.FloatTensor(dones)
        
        # Calculate the new Q value
        q_main = self.main_net(states_torch).gather(1, actions_torch.long().view(-1, 1))
        q_target_next = self.target_net(next_states_torch).detach().max(dim=1)[0].view(-1, 1)
        
        # Calculate TD target
        td_target = rewards_torch.view(-1, 1) + gamma * q_target_next * (1 - dones_torch.view(-1, 1))
#         print(td_target)
        
        loss = self.loss_fcn(td_target, q_main)
        # Gradient descent
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    def update_target_net_weights(self):
        main_net_state_dict = self.main_net.state_dict()
        self.target_net.load_state_dict(main_net_state_dict)


In [11]:
# Initialise the agent
agent = DQNAgent(env, loss_fcn=loss_fn)

scores = []
scores_window = deque(maxlen=100)

epsilon = INITIAL_EPSILON

for i in range(NUM_EPISODES):
    state = env.reset()
    done = False
    score = 0
    t = 0
    while not done:
        action = agent.act(state, epsilon)
        next_state, reward, done, info = env.step(action)
        agent.step(state, action, reward, next_state, done)
        # For every C time, update Q
        if t % C:
#             print("Update weights")
            agent.update_target_net_weights()
        
        t += 1
        score += reward
        state = next_state
        
        if done:
            break
    
    # Epsilon decay
    epsilon = max(MIN_EPSILON, EPSILON_DECAY_RATE * epsilon)
    scores.append(score)
    scores_window.append(scores)
    
    print("\rEpisode %s \t Average Score: %s \t Epsilon: %s" % (i, np.mean(scores_window), epsilon), end="")
    
    if i % 100 == 0:
        print("\rEpisode %s \t Average Score: %s \t Epsilon: %s" % (i, np.mean(scores_window), epsilon))
        
    if np.mean(scores_window) >= 200:
        print("\nEnvironment solved in %s episodes! \tAverage Score: %s" % (i, np.mean(scores_window)))
        torch.save(agent.main_net.state_dict(), "checkpoint.pth")
        break

Episode 0 	 Average Score: 38.0 	 Epsilon: 0.9452499999999999
Episode 100 	 Average Score: 32.7029702970297 	 Epsilon: 0.57260450509286038
Episode 200 	 Average Score: 101.68656716417911 	 Epsilon: 0.34686688098665924
Episode 300 	 Average Score: 119.98338870431894 	 Epsilon: 0.21012170189946602
Episode 400 	 Average Score: 127.45386533665835 	 Epsilon: 0.12728551507581423
Episode 500 	 Average Score: 140.09780439121755 	 Epsilon: 0.07710580202642309
Episode 600 	 Average Score: 152.01497504159732 	 Epsilon: 0.046708415349514534
Episode 700 	 Average Score: 159.98430813124108 	 Epsilon: 0.028294577154065315
Episode 800 	 Average Score: 166.8676654182272 	 Epsilon: 0.0171400183529387232
Episode 900 	 Average Score: 172.42397336293007 	 Epsilon: 0.010382916399118782
Episode 1000 	 Average Score: 177.0 	 Epsilon: 0.011silon: 0.010024920157445967
Episode 1100 	 Average Score: 180.16167120799273 	 Epsilon: 0.01
Episode 1200 	 Average Score: 186.27810158201498 	 Epsilon: 0.01
Episode 1300 	 

# Solve the cartpole

In [12]:
trained_agent = DQNAgent(env, loss_fcn=loss_fn)
trained_agent.main_net.load_state_dict(torch.load("./checkpoint.pth"))

In [13]:
observation = env.reset()

done = False
total_return = 0
timestep = 0

while not done:
    env.render()
    action = trained_agent.act(observation, epsilon=0) # your agent here (this takes random actions)
    next_observation, reward, done, info = env.step(action)
    
    total_return += reward
    timestep += 1

    if done:
        print("Episode finished")
        observation = env.reset()
        
    observation = next_observation 
        
env.close()

Episode finished
