## Import libraries

In [1]:
import numpy as np
import torch
import random
import torch.nn.functional as F

## About Environment
* `*` represents obstructions
* `M` represents the Man that is trying to eat the food and avoid the obstruction
* `F` represents Food placed randomly

>> `Man` `Food` and obstruction other than wall placed randomly

In [58]:
class env:
    
    def __init__(self, N, wall_points = 3):
        self.square = np.zeros(shape = (N, N), dtype = np.int8)
        self.N = N
        self.ag = [np.random.randint(1, N - 1) , np.random.randint(0, N - 1)]
        self.square[tuple(self.ag)] = 2
        # select a point randomly (in middle somewhat)
        walls = []
        for _ in range(wall_points):
            i, j = np.random.randint(0, N) , np.random.randint(0, N)
            self.square[i][j] = 1
        # mark wall on side
        for i in range(N):
            for j in range(N):
                if i== 0 or j == 0 or i == N -1 or j == N - 1:
                    self.square[i][j] = 1
          
        # get the agent place
        
        self.food = [np.random.randint(1, N-1),  np.random.randint(1, N-1)]
        self.square[tuple(self.food)] = 3
                    
    def showField(self):
        for i in range(self.N):
            for j in range(self.N):
                if self.square[i][j] == 1:
                    print('*', end = ' ')
                elif tuple(self.ag) == (i , j):
                    print('M', end = ' ')
                elif tuple(self.food) == (i , j):
                    print("F", end = ' ')
                else:
                    print(' ', end = ' ')
            print()

    def getFeature(self):
        '''
        features = [threat_up, threat_right, threat_down, threat_left, Is_food_up, Is_food_right, Is_food_down, Is_food_left]  # features, done, reward
        '''
        if self.square[tuple(self.ag)] == 3: # eaten the food
            return [self.square[self.ag[0] - 1, self.ag[1]] == 1, self.square[self.ag[0], self.ag[1] + 1] == 1,
                             self.square[self.ag[0] + 1, self.ag[1]] == 1, self.square[self.ag[0], self.ag[1] - 1] == 1,
                             self.ag[0] > self.food[0], self.food[1] > self.ag[1], self.ag[0] < self.food[0], 
                             self.food[1] <self.ag[1]], 1, 10
        elif self.square[tuple(self.ag)] == 1:
            return [True, True, True, True, False, False, False, False],  1, -1
        else:
            return  [self.square[self.ag[0] - 1, self.ag[1]] == 1, self.square[self.ag[0], self.ag[1] + 1] == 1,
                             self.square[self.ag[0] + 1, self.ag[1]] == 1, self.square[self.ag[0], self.ag[1] - 1] == 1,
                             self.ag[0] > self.food[0], self.food[1] > self.ag[1], self.ag[0] < self.food[0], 
                             self.food[1] <self.ag[1]], 0, 0        
        
    def step(self, action):
        '''
        Do the action
        '''
        if action == 0: # up mve
            self.ag[0] -= 1
        elif action == 1:
            self.ag[1] += 1
        elif action == 2:
            self.ag[0] += 1
        elif action == 3:
            self.ag[1] -= 1
        
        

In [3]:
MyEnv = env(15)
MyEnv.showField()
MyEnv.getFeature()


* * * * * * * * * * * * * * * 
*                           * 
*                           * 
*                           * 
*                           * 
*                           * 
*                           * 
*                   M       * 
*                           * 
*                       *   * 
*                           * 
*                           * 
*                           * 
*     F             *       * 
* * * * * * * * * * * * * * * 


([False, False, False, False, False, False, True, True], 0, 0)

## Features :
`features = [threat_up, threat_right, threat_down, threat_left, Is_food_up, Is_food_right,Is_food_down, Is_food_left ]`

In [5]:
from IPython.display import display, clear_output
import time

# Your loop
for i in range(5):
    # Clear the previous output
    clear_output(wait=True)
    
    # Your new content to be displayed
    MyEnv = env(15)
    MyEnv.showField()
    
    # Pause for a short duration (optional)
    time.sleep(1)
    
    # Display the updated content
    display()


* * * * * * * * * * * * * * * 
*                           * 
*         M                 * 
*                           * 
*                           * 
*                           * 
*                           * 
*     F                     * 
*                           * 
*                           * 
*       *   *         *     * 
*                           * 
*                           * 
*                           * 
* * * * * * * * * * * * * * * 


## Buffer space

>> We are using replay buffer of size 10K that contains experience <S, A, R, S', DONES>


In [7]:
from collections import deque
replay_buffer = deque(maxlen = 100_00) # Buffer space
NUM_INPUTS = 8
HIDDEN_UNITS = 20
OUTPUTS = 4
BATCH_SIZE = 64
GAMMA = 0.95

In [8]:
import torch.nn as nn

class CaveNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.inputLayer = nn.Linear(in_features=NUM_INPUTS, out_features=HIDDEN_UNITS)
        self.hiddenLayer = nn.Linear(in_features=HIDDEN_UNITS, out_features=HIDDEN_UNITS)
        self.outputLayer = nn.Linear(in_features=HIDDEN_UNITS, out_features= OUTPUTS)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.inputLayer(x)
        x = self.relu(x)
        x = self.hiddenLayer(x)
        x = self.relu(x)
        x = self.hiddenLayer(x)
        x = self.relu(x)
        out = self.outputLayer(x)
        return out
    

### Copy model
>> Target model provides stability while training and will be updated after certain period

In [9]:
import copy
model = CaveNet()
target_model = CaveNet()

In [72]:
@torch.no_grad()
def epsilon_greedy_action_selection(model, epsilon, observation, sharpening_factor = 1):
    if np.random.random() > epsilon:
        prediction = model(torch.tensor(observation, dtype = torch.float))  # perform the prediction on the observation
        # Chose the action from softmax distribution
        action = torch.multinomial(F.softmax(prediction*sharpening_factor, dim = 0), num_samples = 1).item()   
    else:
        action = np.random.randint(0, 4)  # Else use random action
    return action

### Training The mode

In [41]:
optimize = torch.optim.SGD(params = model.parameters(), lr = 0.1)
loss_fn = torch.nn.L1Loss()

In [42]:
def train(states, target, model = model, EPOCHS = 1, batch_size = BATCH_SIZE):
    for epoch in range(EPOCHS):
        # train mode on
        model.train()
        # forward prop
        y_preds = model(states)
        loss = loss_fn(y_preds, target)
        #optimizer zero grading
        optimizer.zero_grad()
        # backprop the loss
        loss.backward()
        #optimizer step
        optimizer.step()

## Replay memory

In [59]:
def replay(replay_buffer,batch_size = BATCH_SIZE, model = model, target_model = target_model ):
    
    if len(replay_buffer) < batch_size:
        return
    samples = random.sample(replay_buffer, batch_size)
    target_batch = []  
    
    zipped_samples = list(zip(*samples))  
    states, actions, rewards, new_states, dones = zipped_samples
    states, new_states = torch.tensor(np.array(states), dtype = torch.float), torch.tensor(np.array(new_states), dtype = torch.float)
    with torch.inference_mode():
        # Predict targets for all states from the sample
        targets = target_model(states)
        # Predict Q-Values for all new states from the sample
        q_values = model(new_states)
    for i in range(batch_size):
        # take the maximum value
        q_val = max(q_values[i])
        target = torch.clone(targets[i]).numpy()
        if dones[i]:
            target[actions[i]] = rewards[i]
        else:
            target[actions[i]] = rewards[i] + q_val * GAMMA
        
        target_batch.append(target)
    train(states, torch.tensor(target_batch))    

In [44]:
optimizer = torch.optim.SGD(params = model.parameters(), lr = 0.1)
loss_fn = torch.nn.MSELoss()
EPSILON = 1.0
EPSILON_REDUCE = 0.9

In [62]:
def training(model = model, target_model = target_model,EPSILON = EPSILON, EPSILON_REDUCE = EPSILON_REDUCE,EPOCHS = 1):

    
    num_done = 0
    for epoch in range(EPOCHS):
        MyEnv = env(15) # initialsation of environment
        state, done, reward = MyEnv.getFeature()
        num_simulation = 0 # we will simulate an episode for maximum of 100 steps
        while not done:
            # choose an action
            action = epsilon_greedy_action_selection(model, EPSILON, state)
            # perform action and get next state
            MyEnv.step(action)
            new_state, done, reward = MyEnv.getFeature()
            replay_buffer.append((state, action, reward, new_state, done))
            state = new_state
            if reward == 10:
                num_done += 1
            num_simulation += 1
            if num_simulation >= 100:
                break
            
        replay(replay_buffer)  
       
        EPSILON *= EPSILON_REDUCE
        
        if epoch % 500 == 0:
            target_model.load_state_dict(model.state_dict())
            print(f" {epoch} : DONES = {num_done}")

* Training the model for 100k epochs 

In [63]:
training(model, target_model,EPSILON,EPSILON_REDUCE,EPOCHS = 100000)

 0 : DONES = 1
 500 : DONES = 258
 1000 : DONES = 583
 1500 : DONES = 936
 2000 : DONES = 1318
 2500 : DONES = 1716
 3000 : DONES = 2132
 3500 : DONES = 2557
 4000 : DONES = 2983
 4500 : DONES = 3401
 5000 : DONES = 3823
 5500 : DONES = 4256
 6000 : DONES = 4676
 6500 : DONES = 5115
 7000 : DONES = 5552
 7500 : DONES = 5977
 8000 : DONES = 6412
 8500 : DONES = 6846
 9000 : DONES = 7282
 9500 : DONES = 7715
 10000 : DONES = 8151
 10500 : DONES = 8585
 11000 : DONES = 9005
 11500 : DONES = 9432
 12000 : DONES = 9873
 12500 : DONES = 10285
 13000 : DONES = 10715
 13500 : DONES = 11148
 14000 : DONES = 11568
 14500 : DONES = 11975
 15000 : DONES = 12394
 15500 : DONES = 12799
 16000 : DONES = 13193
 16500 : DONES = 13605
 17000 : DONES = 14015
 17500 : DONES = 14407
 18000 : DONES = 14839
 18500 : DONES = 15241
 19000 : DONES = 15625
 19500 : DONES = 16020
 20000 : DONES = 16409
 20500 : DONES = 16825
 21000 : DONES = 17216
 21500 : DONES = 17614
 22000 : DONES = 18020
 22500 : DONES = 184

### Save the model

In [74]:
import os
model_folder_path = './model'
file_name = "model_4.pth"
if not os.path.exists(model_folder_path):
    os.makedirs(model_folder_path)        
file_name = os.path.join(model_folder_path, file_name)
torch.save(model.state_dict(), file_name)

## Let's play a game and see how it behaves

In [76]:
MyEnv = env(15) # initialization of environment
state, done, reward = MyEnv.getFeature()
num_done = 0  
while not done:
    # Clear the previous output
    clear_output(wait=True)
    # choose an action
    action = epsilon_greedy_action_selection(model, epsilon = 0, observation = state, sharpening_factor=3)
    # perform action and get next state
    MyEnv.step(action)
    new_state, done, reward = MyEnv.getFeature()

    state = new_state
    MyEnv.showField()

    # Pause for a short duration 
    time.sleep(1)

    # Display the updated content
    display()


* * * * * * * * * * * * * * * 
*     M                     * 
*                           * 
*                           * 
*                           * 
*                           * 
*                           * 
*                           * 
*             *             * 
*                           * 
*     *                     * 
*                           * 
*                           * 
*                           * 
* * * * * * * * * * * * * * * 
