# Experiment 1

In [1]:
%%html
<style type='text/css'>
.CodeMirror{
font-family: JetBrains Mono;
style>

In [2]:
import gym # gym == 0.9.0
import torch
import random
import warnings
import itertools
import numpy as np
import seaborn as sns
import torch.nn as nn
from typing import Sequence
import matplotlib.pyplot as plt
import torch.nn.functional as F
from collections import namedtuple, deque

warnings.filterwarnings("ignore")

GAMMA = 0.99
BATCH_SIZE = 64
BUFFER_SIZE = 10000
MIN_REPLAY_SIZE = 5000
EPS_START = 1.0
EPS_END = 0.05
EPS_DECAY = 0.995
TARGET_UPDATE_FREQ = 5

## DQN

### Initialisation

In [3]:
env = gym.make("LunarLander-v2")
obs = env.reset()
episode_reward = 0.0

[2022-12-27 21:44:11,582] Making new env: LunarLander-v2


In [4]:
#Class & Method Initialisation

Transition = namedtuple('Transition', ('states', 'actions', 'rewards', 'dones', 'next_states'))

class Replay_memory():

    def __init__(self, env, fullsize, minsize, batchsize):
        self.env = env
        self.memory = deque(maxlen=fullsize)
        self.rewards = deque(maxlen=50)
        self.batchsize = batchsize
        self.minsize = minsize
    def append(self, transition):
        self.memory.append(transition)
    def sample_batch(self):
        batch = random.sample(self.memory, self.batchsize)
        batch = Transition(*zip(*batch))
        states = torch.from_numpy(np.array(batch.states, dtype=np.float32))
        actions = torch.from_numpy(np.array(batch.actions, dtype=np.int64)).unsqueeze(1)
        rewards = torch.from_numpy(np.array(batch.rewards, dtype=np.float32)).unsqueeze(1)
        dones = torch.from_numpy(np.array(batch.dones, dtype=np.bool8)).unsqueeze(1)
        next_states = torch.from_numpy(np.array(batch.next_states, dtype=np.float32))
        return states, actions, rewards, dones, next_states
    def initialize(self):
        obs = env.reset()
        for _ in range(self.minsize):
            action = self.env.action_space.sample()
            new_obs, reward, done, info = env.step(action)
            transition = Transition(obs, action, reward, done, new_obs)
            self.append(transition)
            obs = new_obs
            if done:
                self.env.reset()
        return self
    
class DQN(nn.Module):
    def __init__(self, ninputs, noutputs):
        super(DQN, self).__init__()
        self.a1 = nn.Linear(ninputs, 64)
        self.a2 = nn.Linear(64, noutputs)
    
    def forward(self, X):
        o = self.a1(X)
        o = torch.tanh(o)
        o = self.a2(o)
        return o
    
    def __call__(self, X):
        return self.forward(X)

In [5]:
#Initialise Replay Memory
replay_memory = Replay_memory(
    env, BUFFER_SIZE,MIN_REPLAY_SIZE, BATCH_SIZE).initialize()

#Initialise DQN Policy & Target
dqn_policy = DQN(
    env.observation_space.shape[0], env.action_space.n)
dqn_target = DQN(
    env.observation_space.shape[0], env.action_space.n)
dqn_target.load_state_dict(dqn_policy.state_dict())
dqn_target.eval()

DQN(
  (a1): Linear(in_features=8, out_features=64, bias=True)
  (a2): Linear(in_features=64, out_features=4, bias=True)
)

In [6]:
def epsilon_greedy_policy(epsilon, obs):
    rnd_sample = random.random()
    if rnd_sample <= epsilon:
        action = env.action_space.sample()
    else:
        with torch.no_grad():
            action = int(torch.argmax(dqn_policy(torch.Tensor(obs))))
    return action

### Training

In [7]:
loss_fn = nn.SmoothL1Loss()
learning_rate = 0.01
optimizer = torch.optim.Adam(dqn_policy.parameters(),
                             lr=learning_rate)
history = []

obs = env.reset()
eps_threshold = EPS_START
episode = 1

for step in itertools.count():
    #Get action using Epsilon-Greedy Policy
    action = epsilon_greedy_policy(eps_threshold, obs)
    
    #Get the new observation and reward.
    new_obs, reward, done, _ = env.step(action)
    
    #Append to Replay Memory
    replay_memory.append(
        Transition(obs, action, reward, done, new_obs))
    episode_reward += reward
    obs = new_obs
    
    #If the episode is finished
    if done:
        episode += 1
        
        eps_threshold = np.max((eps_threshold*EPS_DECAY, EPS_END))
        replay_memory.rewards.append(episode_reward)
        obs = env.reset()
        
        avg_res = np.mean(replay_memory.rewards) #Mean over all episodes
        
        if episode % 10 == 0: 
            print(f'Episode: {episode}\
                    Avg Results: {round(avg_res,3)}\
                    Epsilon: {round(eps_threshold,3)}',end='\r')
            
            history.append((episode,avg_res))
        
        #Check if average result over the last 50 episodes is >= 195
        
        if avg_res >= 195:
            print(f'Solved at episode: {episode}\
                    Avg Results: {round(avg_res,3)}')
            break
        
        if step % TARGET_UPDATE_FREQ == 0:
            dqn_target.load_state_dict(dqn_policy.state_dict())

        episode_reward = 0
    
    #Sample from the Replay Memory
    b_states, b_actions, b_rewards,\
    b_dones, b_next_states = replay_memory.sample_batch()
    
    #Get Q-Values of every state-action pair from
    #the Replay Memory Sample
    qvalues = dqn_policy(b_states).gather(1, b_actions)
    
    #Train the Neural Network to better evaluate
    #the states observed.
    with torch.no_grad():
        target_qvalues = dqn_target(b_next_states)
        max_target_qvalues = torch.max(
            target_qvalues, axis=1).values.unsqueeze(1)
        expected_qvalues = b_rewards + GAMMA*(1-b_dones.type(torch.int64))*max_target_qvalues

    loss = loss_fn(qvalues, expected_qvalues)
    optimizer.zero_grad()
    loss.backward()
    for param in dqn_policy.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()
    

Episode: 910                    Avg Results: 7.48                    Epsilon: 0.05.0564

KeyboardInterrupt: 

In [None]:
history = pd.DataFrame(
    history, columns=['Episode', 'Avg Result'])

(fig, ax) = plt.subplots(1, 1)
ax.plot(history['Episode'],history['Avg Result'])

plt.show()

## Policy Based Algorithm