Import Statements and Seed

In [1]:
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
import torch.nn as nn
from torchvision import datasets, transforms
import numpy as np
import random
from sklearn.neighbors import BallTree
from PIL import Image, ImageDraw
import tkinter as tk
#plotting and visualization library
import matplotlib.pyplot as plt
#Display on the notebook
%matplotlib inline

from tqdm.auto import tqdm
import logging
logging.getLogger("tensorflow").setLevel(logging.WARNING)

Environment Class

In [2]:
#environmental parameters
n_obs = 4

n_sides = 7

pps = 50

n_rays = 360*2

#CONSTANTS
SIN5 = np.sin(5*np.pi/180)
COS5 = np.cos(5*np.pi/180)

COS = np.cos(np.linspace(0, 2*np.pi, n_rays,False))[None].T
SIN = np.sin(np.linspace(0, 2*np.pi, n_rays,False))[None].T

class Environment:

    def __init__(self, batch_size, size):
        #number of instances of environments
        self.batch_size = batch_size

        #size of the environemnts
        self.size = size

        #pursuer, target coordinates and velocities
        self.p = np.random.randint(0,self.size-1,(self.batch_size,2))*1.0

        self.t = np.random.randint(0,self.size-1,(self.batch_size,2))*1.0

        self.th = 2*np.pi*np.random.rand(self.batch_size)
        self.pv = np.zeros((self.batch_size,2),'float64')
        self.pv[:,0] = 0.9*self.size*np.cos(self.th)/200
        self.pv[:,1] = 0.9*self.size*np.sin(self.th)/200

        self.tv = np.zeros_like(self.pv)
        
        #generating the target point cloud
        self.point_cloud = np.zeros((self.batch_size,pps,2))
        th = np.linspace(0,2*np.pi,pps,False)[None].T
        r = self.size/60

        self.point_cloud[:,:,0] = (self.t[:,0] + r*np.cos(th)).T
        self.point_cloud[:,:,1] = (self.t[:,1] + r*np.sin(th)).T

        self.index = [BallTree(i) for i in self.point_cloud]

        #other environment variables
        self.episode_steps = np.zeros((self.batch_size,))

        self.MOVE_REWARD = -1
        self.CAPTURE_REWARD = 350
        self.COLLISION_REWARD = -200
    
    def step(self, delTh):
        #increase the steps
        self.episode_steps += 1

        #move the pursuer and change its direction
        self.p += self.pv

        mask = delTh!=0
        temp = self.pv[:,0][mask]
        self.pv[:,0][mask] = temp*COS5 - (delTh*self.pv[:,1]*SIN5)[mask]
        self.pv[:,1][mask] = self.pv[:,1][mask]*COS5 + delTh[mask]*temp*SIN5
        
        '''
            check for terminal conditions i.e.

                1. If the pursuer is out of bounds
                2. If the pursuer has captured the target
                3. If the pursuer has exausted episode steps
            
            take the logical or of all these and return reward
        '''
        #check for condition 1
        isOutOfBounds = (self.p>self.size) + (self.p<0)
        isOutOfBounds = isOutOfBounds[:,0] + isOutOfBounds[:,1]

        #check for condition 2
        hasCaptured = np.sum((self.p-self.t)**2,axis=1)<100

        #check for condition 3
        isOutOfSteps = self.episode_steps>300

        #calculate reward and return everything
        reward = self.MOVE_REWARD + self.CAPTURE_REWARD*hasCaptured + self.COLLISION_REWARD*isOutOfBounds

        done = isOutOfBounds + hasCaptured + isOutOfSteps

        state = self.getState(20)
        
        #reset the environments that are terminated
        self.reset(done)

        if(hasCaptured.sum()>0):
            print("CAPTURE")
        if(isOutOfBounds.sum()>0):
            print("COLLISION")

        return state, reward, done
    
    def reset(self, which):
        n = which.sum()
        if(n==0):
            return False
        
        self.p[which] = np.random.randint(0,self.size-1,(n,2))

        self.t[which] = np.random.randint(0,self.size-1,(n,2))

        self.th = 2*np.pi*np.random.rand(n)
        self.pv[:,0][which] = 0.9*self.size*np.cos(self.th)/200
        self.pv[:,1][which] = 0.9*self.size*np.sin(self.th)/200

        self.tv[which] = np.zeros_like(self.pv[which])
        
        #generating the target point cloud
        self.point_cloud = np.zeros((self.batch_size,pps,2))
        th = np.linspace(0,2*np.pi,pps,False)[None].T
        r = self.size/60

        self.point_cloud[:,:,0] = (self.t[:,0] + r*np.cos(th)).T
        self.point_cloud[:,:,1] = (self.t[:,1] + r*np.sin(th)).T

        self.index = [BallTree(i) for i in self.point_cloud]

        self.episode_steps[which] = 0
    
    def query(self, points):
        out = np.zeros((points.shape[0],points.shape[1]))

        for i in range(points.shape[0]):
            dist, ind = self.index[i].query(points[i])
            out[i] = dist.T[0]
        
        return out
    
    def getState(self,steps:int):
        #unit rays staring from the velocity vector in all directions
        unit_rays = np.zeros((self.batch_size,n_rays,2))
        unit_rays[:,:,0] = (self.pv[:,0]*COS - self.pv[:,1]*SIN).T/2.7
        unit_rays[:,:,1] = (self.pv[:,1]*COS + self.pv[:,0]*SIN).T/2.7

        #the source of rays i.e. the pursuer
        unit_step = np.expand_dims(self.p,1) + unit_rays

        #global minimum distance of the pursuer from the environment
        globalMinD = self.query(unit_step)

        #ray marching algorithm
        for _ in range(steps):
            currentMinD = self.query(unit_step+np.expand_dims(globalMinD,2)*unit_rays)
            currentMinD[currentMinD<0.6] = 0
            currentMinD[currentMinD>600] = 0
            globalMinD += currentMinD
        
        endPoints = np.expand_dims(globalMinD,2)*unit_rays
        outPoints = np.expand_dims(self.p,1) + endPoints

        #slope of all the rays and padding
        m = endPoints[:,:,1]/endPoints[:,:,0]
        pad = 3

        #top -> y > HIEGHT-padding
        mask = outPoints[:,:,1]>self.size-pad

        outPoints[mask,0] = (self.p[:,0][None].T + (self.size-pad-self.p[:,1][None].T)/m)[mask]
        outPoints[mask,1] = self.size-pad

        #bottom -> y < 0+padding
        mask = outPoints[:,:,1]<pad

        outPoints[mask,0] = (self.p[:,0][None].T + (pad-self.p[:,1][None].T)/m)[mask]
        outPoints[mask,1] = pad

        #left -> x < 0+padding
        mask = outPoints[:,:,0]<pad

        outPoints[mask,1] = (self.p[:,1][None].T + (pad-self.p[:,0][None].T)*m)[mask]
        outPoints[mask,0] = pad
        
        #right -> x > WIDTH+padding
        mask = outPoints[:,:,0]>self.size-pad

        outPoints[mask,1] = (self.p[:,1][None].T + (self.size-pad-self.p[:,0][None].T)*m)[mask]
        outPoints[mask,0] = self.size-pad

        #get the distance vector state
        out = np.sqrt(np.sum((outPoints-np.expand_dims(self.p,1))**2,axis=2))
        out = np.expand_dims(out,1)

        return out


    def renderState(self):
        #BEFORE RUNNING CHANGE RETURN IN GETSTATE TO OUTPOINTS
        img = Image.fromarray(np.zeros((self.size, self.size, 3), dtype=np.uint8), 'RGB')
        draw = ImageDraw.Draw(img)

        draw.ellipse((self.p[0][0]-8,self.p[0][1]-8,self.p[0][0]+8,self.p[0][1]+8), fill=(255,0,0), outline=(200,200,200))
        draw.line((self.p[0][0],self.p[0][1],self.p[0][0]+self.pv[0][0]*20,self.p[0][1]+self.pv[0][1]*20),width=3)

        for p in self.getState(20)[0]:
            draw.point((p[0],p[1]),fill=(255,255,255))
        
        return img

Deep Q-Learning Network

In [3]:
class DQN(nn.Module):

    def __init__(self, in_space, out_space):
        super(DQN, self).__init__()

        #create a conv block
        self.conv = nn.Sequential(
            nn.Conv1d(1, 32, 18),
            nn.ReLU(),
            nn.Conv1d(32, 48, 6),
            nn.ReLU(),
            nn.Conv1d(48, 64, 3),
            nn.ReLU()
        )

        #get the conv out size
        size = self.get_conv_out_size((1,in_space))

        #create a dense block
        self.fc = nn.Sequential(
            nn.Linear(size, 512),
            nn.ReLU(),
            nn.Linear(512, out_space)
        )

    def get_conv_out_size(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))
    
    def forward(self, x):
        out = self.conv(x).view(x.shape[0],-1)
        return self.fc(out)

Training Setup

In [4]:
# Environment settings
EPISODES = 15_000

#episodic batch size
BATCH_SIZE = 32

# Exploration settings
EPSILON = 0.96
EPSILON_DECAY = 0.999
MIN_EPSILON = 0.01

#Discount factor
DISCOUNT = 0.99

#target update value
COUNTER_MAX = 8

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

env = Environment(32,600)

#instantiate models and copy dict
policyDQN = DQN(n_rays, 3).to(device)
targetDQN = DQN(n_rays, 3).to(device)
targetDQN.load_state_dict(policyDQN.state_dict())

optimizer = optim.AdamW(policyDQN.parameters(),1e-4,amsgrad=True)

criterion = nn.SmoothL1Loss()

Training Loop

In [None]:
for i in tqdm(range(1, EPISODES + 1), ascii=True, unit='steps', position=0, leave=True):

    #ping policy net for current Q and action space
    state = torch.Tensor(env.getState(20)).to(device)
    Qcs = policyDQN(state)
    action = Qcs.max(1)[1]-1

    #apply epsilon randomization
    random_action = torch.randint(-1,2,action.shape).to(device)
    mask = np.random.rand(32)<EPSILON
    action[mask] = random_action[mask]

    EPSILON *= EPSILON_DECAY
    EPSILON = max(EPSILON,MIN_EPSILON)
    
    #get the new state after taking action
    new_state, reward, done = env.step(action.cpu().numpy())

    #get the new Q values
    new_state = torch.Tensor(new_state).to(device)
    with torch.no_grad():
        Qns = targetDQN(new_state)

    #get the maximum Q for current state using action
    Qcs_max = Qcs.gather(1, action.unsqueeze(1)+1)

    #get the maximum Q for new states
    Qns_max = torch.zeros(BATCH_SIZE,1).to(device)
    Qns_max[np.invert(done)] = Qns.max(1)[0][np.invert(done)][None].T

    #calculate the expected value and backward the loss
    Q_expected = Qns_max*DISCOUNT + torch.Tensor(reward[None].T).to(device)

    loss = criterion(Qcs_max, Q_expected)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if i%COUNTER_MAX==0:
        targetDQN.load_state_dict(policyDQN.state_dict())



  0%|          | 0/15000 [00:00<?, ?steps/s]

COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
CAPTURE
COLLISION
CAPTURE
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
CAPTURE
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLISION
COLLIS

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#save the model
torch.save(policyDQN.state_dict(),'/content/drive/MyDrive/DLOpsTermProject.pt')