In [1]:
import numpy as np
import gym
import tqdm
import matplotlib.pyplot as plt   

import torch 
from torch import nn, optim
import torch.nn.functional as F

In [2]:
debug = True

In [3]:
if debug:
    env = gym.make('CartPole-v0')

In [4]:
config = {
    'obs_space' : env.observation_space.shape[0],
    'action_space' : env.action_space.n,
    'hidden_dims' : [24, 48],
    'discount_factor' : 1,
    'min_exp' : 128,
    'max_exp' : 1024,
    'batch_size' : 64,
    'lr' : 0.02
}

episodes = 10000
epsilon_decay = 0.995
epsilon = 1
min_epsilon = 0.01
copy_step = 10
weight_decay = 0.01

In [5]:
class DeepModel(nn.Module):
    def __init__(self, obs_space, hidden_dims, action_space):
        super(DeepModel, self).__init__()
        self.input_layer = nn.Linear(obs_space, hidden_dims[0])
        hidden_layers = []
        for idx, h_dim in enumerate(hidden_dims[:-1]):
            hidden_layers.append(nn.Linear(h_dim, hidden_dims[idx + 1]))
            hidden_layers.append(nn.Tanh())
        self.hidden_layers = nn.Sequential(*hidden_layers)
        self.output_layer = nn.Linear(hidden_dims[-1], action_space)
    
    def forward(self, xb):
        h = self.input_layer(xb)
        h = self.hidden_layers(h)
        out = self.output_layer(h)
        return out

if debug:
    model = DeepModel(env.observation_space.shape[0], [32,32], env.action_space.n)
    preds = model(torch.atleast_2d(torch.tensor(env.observation_space.sample())))
    print(preds)


tensor([[-0.2348, -0.5895]], grad_fn=<AddmmBackward>)


In [6]:
class DQN:
    def __init__(self, obs_space, action_space, hidden_dims, discount_factor, min_exp, max_exp, batch_size, lr):
        self.action_space = action_space
        self.discount_factor = discount_factor
        self.min_exp = min_exp
        self.max_exp = max_exp
        self.batch_size = batch_size
        self.lr = lr
        self.model = DeepModel(obs_space, hidden_dims, action_space)
        self.optimizer = optim.Adam(model.parameters(), lr, weight_decay=weight_decay)
        self.experience = {
            's' : [],
            'a' : [],
            'r' : [],
            's2' : [],
            'done' : []
        }
    
    def predict(self, inputs):
        self.model.eval()
        return self.model(torch.tensor(np.atleast_2d(inputs.astype('float32'))))

    def train(self, TargetNet):
        if len(self.experience['s']) < self.min_exp:
            return
        
        ids = np.random.randint(low = 0, high=len(self.experience['s']), size=self.batch_size)

        states = np.asarray(self.experience['s'][ids])
        actions = np.asarray(self.experience['a'][ids])
        rewards = np.asarray(self.experience['r'][ids])

        states_next = np.asarray(self.experience['s2'][ids])
        dones = np.asarray(self.experience['done'][ids])
        value_next = TargetNet.predict(states_next)
        actual_values = np.where(dones, rewards, rewards + self.discount_factor * value_next * np.one_hot(actions,num_classes=self.action_space))

        self.model.train()
        self.optimizer.zero_grad()
        selected_action_values = self.predict(states)
        loss = nn.MSELoss(rewards, torch.tensor(actual_values))
        loss.backward()
        self.optimizer.step()

    def get_action(self, state, epsilon):
        if np.random.normal() < epsilon:
            return np.random.randint(self.action_space)
        else:
            predictions = self.predict(state).detach().cpu().numpy()
            return np.argmax(predictions)
    
    def add_experience(self, exp):
        if len(self.experience['s']) > self.max_exp:
            for key in self.experience.keys():
                self.experience[key].pop(0)
                self.experience[key].append(exp[key])
    
    def copy_weights(self, TargetNet : nn.Module):
        self.model.load_state_dict(TargetNet.model.state_dict())
    
if debug:
    agent = DQN(**config)

In [7]:
def play_game(env, TrainNet : DQN, TargetNet : DQN, epsilon, render=False):
    rewards = 0
    iter = 0
    done = False
    observation = env.reset()

    while not done:
        # if render:
        #     env.render()
        iter += 1

        action = TrainNet.get_action(observation, epsilon)
        prev_observation = observation
        observation, reward, done, _ = env.step(action)
        if iter == 195:
            reward = 100
            done = True
        else:
            reward = iter/200
        rewards += reward

        exp = {
            's' : prev_observation,
            'a' : action,
            's2' : observation,
            'r' : reward,
            'done' : done
        }

        TrainNet.add_experience(exp)
        
    return rewards

In [8]:
env = gym.make('CartPole-v0')
TrainNet = DQN(**config)
TargetNet = DQN(**config)

running_reward = np.empty(episodes)

In [9]:
pbar = tqdm.tqdm(range(episodes))
for n in pbar:
    if n > 500 and n%10 == 0:
        running_reward[n] = play_game(env, TrainNet, TargetNet, epsilon, render=True)
    else:
        running_reward[n] = play_game(env, TrainNet, TargetNet, epsilon)
    TrainNet.train(TargetNet)

    if n % copy_step == 0:
        TrainNet.copy_weights(TargetNet)
    pbar.set_postfix({
        'episode reward' : running_reward[n],
        'avg (100 last) reward' : running_reward[max(0, n - 100) : n].mean(),
        'epsilon' : epsilon
    })
    epsilon *= epsilon_decay
    epsilon = max(epsilon, min_epsilon)

  'avg (100 last) reward' : running_reward[max(0, n - 100) : n].mean(),
  ret = ret.dtype.type(ret / rcount)
 13%|█▎        | 1295/10000 [00:15<02:14, 64.77it/s, episode reward=6.12, avg (100 last) reward=14.1, epsilon=0.01]