Imports

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import random
from collections import namedtuple, deque
import math
import random

import matplotlib
import matplotlib.pyplot as plt

import Simulator
import Simulator.simulator

In [18]:
device = torch.device("cuda" if torch.cuda.is_available() else"cpu")
print(device)

random.seed(6) #For Consistency

cpu


Q-Network & Replay Memory

In [28]:
class DQN(nn.Module):
    def __init__(self,state_len,action_len):
        super(DQN,self).__init__()
        self.layer1 = nn.Linear(state_len, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, action_len)

    def forward(self,x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        x = F.relu(self.layer3(x))
        return x

In [11]:
#x = Transition(1,2,3,4)
#print(x.state) -> outputs 1
#print(x.reward) -> outputs 4
Transition = namedtuple('Transition',('state', 'action', 'next_state', 'reward'))

In [13]:
class ReplayMemory(object):
    def __init__(self,capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self,*args): #Converts State,Action,Next_State,Reward into transition tuple
        self.memory.push(Transition(*args))  

    def __sample__(self,batch_size):
        return random.sample(self.memory,batch_size)

    def __len__(self): #len(Object of Replay Memory) -> internally calls __len__
        return len(self.memory)

Epsilon Greedy Policy Algorithm

In [19]:
def choose_action(policy_net,state,steps):
    global EPS_START ,EPS_END , EPS_DECAY
    eps = EPS_START + (EPS_END - EPS_START)*math.exp(-1*(steps/EPS_DECAY))

    if random.random() < eps : #Exploration
        return random.randrange(2)  #Two Actions ["JUMP or FALL"]
    else : #Exploitation 
        with torch.no_grad():
            return torch.argmax(policy_net(state)).item() #Greedy Action

Training Step

In [21]:
def optimize_model(policy_net,target_net,memory,optimizer):
    global BATCH_SIZE,GAMMA,device

    if len(memory) < BATCH_SIZE : 
        return 

    transitions = memory.sample(BATCH_SIZE) #List of Transitions i.e (State Action Next_state Reward)
    batch = Transition(*zip(*transitions)) #Transition of Lists i.e Transition[State = (s1,s2,..) Action = (a1,a2,..) ... Reward = (r1,r2,..)]
    
    state_batch  = torch.cat(batch.state)  #batch_size * no. of states  [from tuple of tensors to tensor]
    action_batch = torch.cat(batch.action) #batch_size * no. of actions [from tuple of tensors to tensor]
    reward_batch = torch.cat(batch.reward) #batch_size * no. of rewards [form tuple of tensors to tensor]

    # [[Q(s1,action1),Q(s1,action2),...],[Q(s2,action1),Q(s2,action2),...],...] from neural network 
    #s1,s2,s3 -> state from memory buffer
    #action1,action2,... -> action space
    #a1,a2,...... -> action done when on s1,s2,...
    #final result would be what is the quality of actions a1,a2,.. on states s1,s2,.. respectively
    #i.e [Q(s1,action == a1),Q(s2,action == a2),Q(s3,action == a3),...]
    state_action_values = policy_net(state_batch).gather(1,action_batch)

    #Finding Expected State Action Values From Target Network
    #next_state can be None sometimes, because next_state could be end of episodes
    #So we ignore them
    non_final_mask   = torch.tensor(tuple(map(lambda s : s is not None,batch.next_state)),device=device,dtype=torch.bool) 
    non_final_states = torch.cat([s for s in batch.next_state if s is not None])
    
    #Predicted Final Reward
    #End of Episode = contributes 0
    #But others have some value -> we get from target_net
    #We know we get [[Q(s1,action1),Q(s1,action2),...],[Q(s2,action1),Q(s2,action2),...],...] out of neural network
    #.max(1) -> ([Q(s1,action_i) max value, it's coordinates],[Q(s2,action_i) max value, it's coordinates],.....)
    #.value  -> [Q(s1,action_i) max value, Q(s2,action_i) max value,...]
    #essentially just fetching max value of the state
    next_state_values = torch.zeros(BATCH_SIZE,device=device) 
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_states).max(1).values
    
    #TD Estimate
    expected_state_action_values = (GAMMA*next_state_values) + reward_batch 
    
    #Standard Backpropagation
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

Hyper Parameters

In [23]:
EPS_START = 0.99 #Starting : Give Priority to Exploration
EPS_END   = 0.01 #Ending   : Give Priority to Exploitation
EPS_DECAY = 1000 #How Fast You Want to End Exploration

BATCH_SIZE = 128 #SHOULD BE OBVIOUS XD
GAMMA = 0.99 #Higher Gamma : Long Term Reward Maximization
NUM_EPS = 50 #Each Episode[Level] We Make the Game Tougher
TAU = 0.005  #Soft Update Factor For Updating targert_net towards policy_net
LR = 1e-3    #Learning Rate for Optimizer

Training

In [24]:
state_len  = 5 #Pipe Coords,Player Coords,Space Between Pipes, Player Fall Speed, Player Jump Speed
action_len = 2 #JUMP or FALL

In [29]:
policy_net = DQN(state_len, action_len).to(device)
target_net = DQN(state_len, action_len).to(device)
target_net.load_state_dict(policy_net.state_dict()) #Target Net is a Copy of Policy Net (Just few updates behind)

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)

In [None]:
for _ in range(NUM_EPS):
    pass