In [1]:
import gym
import random

env = gym.make('CartPole-v0') # creates an OpenAI Gym environment, an object you interact with to step through the game
epsilon = .01                 # controls the exploration vs exploitation balance
gamma = .99                   # reward discount factor
tau = .995                    # controls how quickly we update the target network
random.seed(666)              # seed the random number generator for reproducibility
batch_size = 128              # how much to sample from the replay buffer at a time
max_ep = 500                  # number of games to play

In [2]:
import torch
import torch.nn as nn

class Q(nn.Module):
    def __init__(self,env):
        super(Q, self).__init__()

        self.main = nn.Sequential(
            nn.Linear(env.observation_space.shape[0], 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, env.action_space.n)
        )

    def forward(self, s):
        return self.main(torch.FloatTensor(s))

In [3]:
import random
from collections import deque

class ReplayBuffer():
    def __init__(self, size):
        self.buffer = deque(maxlen=int(size))
        self.maxSize = size
        self.len = 0

    def sample(self, count):
        count = min(count, self.len)
        batch = random.sample(self.buffer, count)

        s_arr = torch.FloatTensor(np.array([arr[0] for arr in batch]))
        a_arr = torch.FloatTensor(np.array([arr[1] for arr in batch]))
        r_arr = torch.FloatTensor(np.array([arr[2] for arr in batch]))
        s2_arr = torch.FloatTensor(np.array([arr[3] for arr in batch]))
        m_arr = torch.FloatTensor(np.array([arr[4] for arr in batch]))

        return s_arr, a_arr.unsqueeze(1), r_arr.unsqueeze(1), s2_arr, m_arr.unsqueeze(1)

    def len(self):
        return self.len

    def store(self, s, a, r, s2, d):
        def fix(x):
            if not isinstance(x, np.ndarray): return np.array(x)
            else: return x

        data = [s, np.array(a,dtype=np.float64), r, s2, 1 - d]
        transition = tuple(fix(x) for x in data)
        self.len = min(self.len + 1, self.maxSize)
        self.buffer.append(transition)

In [4]:
import numpy as np
import torch.nn.functional as F
from copy import deepcopy

In [5]:
def explore(timestep):
    ts = 0
    while ts < timestep:
        s = env.reset()
        while True:
            a = env.action_space.sample()
            s2, r, done, _ = env.step(int(a))
            rb.store(s, a, r, s2, done)
            ts += 1
            if done:
                break
            else:
                s = s2

In [6]:
def update():
    s, a, r, s2, m = rb.sample(batch_size) # get a batch of states, actions, reward, next states and
                                           # masks (1 for game over, else zero) from the replay buffer

    with torch.no_grad(): # don't track gradients when you pass through the target network
        max_next_q, _ = q_target(s2).max(dim=1, keepdim=True) # get the next state value from the target net
        y = r + m * gamma * max_next_q # sum rewards with discount penalty to avoid infinite time horizons
    
    q_estimates = torch.gather(q(s), 1, a.long())
    loss = torch.pow((q_estimates - y), 2).mean() # calulate loss

    # update Q network weights
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # update target Q networked with weighted avereging
    for param, target_param in zip(q.parameters(), q_target.parameters()):
        target_param.data = target_param.data*tau + param.data*(1-tau)

In [7]:
# init the Q network and target network, a copy that lags behind to smooth training
q = Q(env)
q_target = deepcopy(q)

# init the optimizer, which uses the loss to update the weights of the network
optimizer = torch.optim.Adam(q.parameters(), lr=1e-3)

# init the replay buffer and do some exploration to fill it
rb = ReplayBuffer(1e6)
explore(10000) 

# training loop
ep = 0
while ep < max_ep: # loop through some number of episodes, aka complete games
    s = env.reset() # reset the environment at the state of the game
    ep_r = 0
    while True: # loop through a game frame by frame
        with torch.no_grad():
            # epsilon greedy exploration
            if random.random() < epsilon: # some portion of the time, pick a random action to explore
                a = env.action_space.sample()
            else: # the rest of the time, take the action recomended by the network
                a = int(np.argmax(q(s)))
        
        # take a step in the environment and get the resulting next state, reward, and done boolean
        s2, r, done, _ = env.step(int(a))
        rb.store(s, a, r, s2, done) # store in the replay buffer
        ep_r += r # update episode reward

        
        if done: # if a ga,e endsbreak the loop and begin again, 
            if ep % 10 == 0:
                print(f"Episode {ep} Reward: {ep_r}")
            ep += 1
            break
        else: # otherwise continue
            s = s2

        update() # compute loss and update the network

Episode 0 Reward: 10.0
Episode 10 Reward: 10.0
Episode 20 Reward: 10.0
Episode 30 Reward: 10.0
Episode 40 Reward: 10.0
Episode 50 Reward: 11.0
Episode 60 Reward: 9.0
Episode 70 Reward: 150.0
Episode 80 Reward: 200.0
Episode 90 Reward: 200.0
Episode 100 Reward: 200.0
Episode 110 Reward: 157.0
Episode 120 Reward: 200.0
Episode 130 Reward: 176.0
Episode 140 Reward: 189.0
Episode 150 Reward: 200.0
Episode 160 Reward: 134.0
Episode 170 Reward: 118.0
Episode 180 Reward: 183.0
Episode 190 Reward: 175.0
Episode 200 Reward: 136.0
Episode 210 Reward: 200.0
Episode 220 Reward: 169.0
Episode 230 Reward: 127.0
Episode 240 Reward: 200.0
Episode 250 Reward: 200.0
Episode 260 Reward: 200.0
Episode 270 Reward: 200.0
Episode 280 Reward: 200.0
Episode 290 Reward: 200.0
Episode 300 Reward: 200.0
Episode 310 Reward: 173.0
Episode 320 Reward: 169.0
Episode 330 Reward: 200.0
Episode 340 Reward: 200.0
Episode 350 Reward: 200.0
Episode 360 Reward: 200.0
Episode 370 Reward: 200.0
Episode 380 Reward: 200.0
Episo