In [1]:
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
import random
import time

## Gridworld  Env For Testing & Experiementation Using Tabular Reinforcement Learning

### Building our environment class "Astroid Mining Gridworld"

- Our astroid mining agent rover, named SAMAR, aims to collect as many precious metal deposit, PMD, (+ve reward) which are randomly located across the astroid and navigating to the extraction point ( +ve reward) before it runs out of fuel ( -ve reward)



- Inital fuel is parametizable



- Fuel usage can be thought as a proxy for time spent, where each action SAMAR takes is one unit of fuel (-ve reward)



- On the surface of the astroid there is randomly located 'sink holes' which cause SAMAR to get stuck and return a negative reward




- It must reach an extraction point (+ve reward) 




- Termination conditions of the enviornment are reaching the extraction point or runnng out of fuel (large -ve reward) 



### State Space

- The state space is given by the Length x Width definition of the grid,the edges are a impenetrable boundry meaning they cannot be occupied 

### Action Space

- The action space is defined by movement of SAMAR: [left,right,up,down], as well as [retrive] which is done by picking up the precious metal (5 total actions)

### Rewards

- Each action costs 1 fuel unit, (reward = -1)

- Landing on a sink hole costs up to 10 fuel units, (reward = [-1,-100])

- Moving onto a PMD returns a reward of 20

- Collecting a PMD returns a reward of +100



### BUILDING ENVIRONMENT

>

In [2]:
class InputError1(Exception):
    """Exception raised for errors in the input.

    Attributes:
        expression -- input expression in which the error occurred
        message -- explanation of the error
    """

    def __init__(self, action, message='Action needs to be one of the following strings: up down left right retrive'):
        self.action = action
        self.message = message
        super().__init__(self.message)


### HELPER FUNCTIONS

In [2]:
%%writefile GridWorld_Environment.py
import numpy as np
class AstroidGrid():
    
    def __init__(self,length,width,ndeposits,nsinkholes,rand_locs,fuel):
        self.randomseed1 = np.random.randint(1000)
        self.randomseed2 = np.random.randint(1000)
        #Setting up instance variables
        self.length    = int(length)
        self.width     = int(width)
        self.ndeposits = int(ndeposits)
        self.nsinkholes= int(nsinkholes)
        self.rand_locs = rand_locs
        
        
        #Position Vectors for exit and agent 
         
        
        self.exit_pos    = np.array([length-2,width-2])
        
        self.init_fuel   = fuel
        #Inital Fuel
        self.fuel_left   = fuel
        #Cumulative Reward
        self.reward_cum  = int(0)
        
        #Initalizing Environment 
        self.environment = None
        self.samar_pos  = None
        self.next_pos   = None
        
        self.done = False
        
        self.action_list = ['up','down','left','right','retrive']
        
        
    def reset_env(self):
        
        #Initalizing Grid Array
        
        environment = np.zeros([self.length,self.width])
        
        #Setting Boundary Conditions
        
        environment[:,-1] = -1
        environment[:,0]  = -1
        environment[0,:]  = -1 
        environment[-1,:] = -1
        
        #Setting Exit Position
        
        
        
   
        
        
        if self.rand_locs == False: 
            np.random.seed(self.randomseed1)
            
        for _ in range(self.ndeposits):
            deposit_y,deposit_x = np.random.randint(1,self.length-1),np.random.randint(1,self.width-1)
            environment[deposit_y,deposit_x] = 2
        np.random.seed(seed=None)
        
        
        
        if self.rand_locs == False: 
            np.random.seed(self.randomseed2)
            
        for _ in range(self.nsinkholes):
            sinkhole_y,sinkhole_x = np.random.randint(1,self.length-1),np.random.randint(1,self.width-1)
            environment[sinkhole_y,sinkhole_x] = 3
        np.random.seed(seed=None)
                

            
            
        self.samar_pos = np.array([1,1]) # Orig. pos is hard coded
        environment[self.samar_pos[0],self.samar_pos[1]]=0 #Setting value of agent square to 0 at start 1,1 will always be 0
        environment[self.exit_pos[0],self.exit_pos[1]]  = 5 # Hard coded exit when 
        self.next_pos  = self.samar_pos 
        self.environment =environment
        self.done = False #reset done
        self.reward_cum  = int(0) #reset cumulative reward
        self.fuel_left   = self.init_fuel #Resetting fuel (time) back to init fuel levels
       
    
    def take_action(self,action):
        
        #Random action if action selected is movement 
        if action in ['up','down','left','right']:
            
            random_action = np.random.choice([0,1],p = [0.9,0.1])
            
            if random_action:
                
                action = np.random.choice(['up','down','left','right'])
            
      
        if action == 'left':
            self.next_pos = np.array([self.samar_pos[0],self.samar_pos[1] -1])
        if action == 'right':
            self.next_pos = np.array([self.samar_pos[0],self.samar_pos[1]  +1])
        if action == 'down':
            self.next_pos = np.array([self.samar_pos[0] +1,self.samar_pos[1]])
        if action == 'up':  
            self.next_pos = np.array([self.samar_pos[0] -1,self.samar_pos[1]])
        if action == 'retrive':
            self.next_pos = self.samar_pos
            
                                                            
        #Reward responses for position resulting from  action
            
            #Next pos is a boundary
        if self.environment[self.next_pos[0],self.next_pos[1]] == -1: 
            self.samar_pos = self.samar_pos #position doesnt changed as agent is at boundary
            reward = -2
            self.fuel_left += -1
                
            #Next pos is a free space    
        if self.environment[self.next_pos[0],self.next_pos[1]] == 0: 
            self.samar_pos = self.next_pos
            reward = -1
            self.fuel_left += -1
                
            #Next pos is a PMD
        if self.environment[self.next_pos[0],self.next_pos[1]] == 2:
            self.samar_pos = self.next_pos
            reward = 20
            self.fuel_left += -1
            
            #Curr pos is a metal deposit 
        if self.environment[self.samar_pos[0],self.samar_pos[1]] == 2 and action=='retrive':
            reward = 100
            self.fuel_left += -1
                
                #Setting current location back to 0 as deposit has been collect
            self.environment[self.samar_pos[0],self.samar_pos[1]] = 0
            
                  
        if self.environment[self.samar_pos[0],self.samar_pos[1]] != 2 and action=='retrive':
            reward = -10
            self.fuel_left += -1
                
                
        if self.environment[self.next_pos[0],self.next_pos[1]] == 3:
            self.samar_pos = self.next_pos
            reward = -1*np.random.randint(1,100)
            self.fuel_left += -1
                
                
        #TERMINATION OF ENVIRONMENT   
        if self.environment[self.next_pos[0],self.next_pos[1]] == 5:
            reward = 20
            self.done = True #Termination condition reached 
            #print('Extraction Point Reached')
        if self.fuel_left <1:
            self.done = True
            reward = -10
            #print('Ran out of fuel')
            
        self.reward_cum += reward 
            
        return reward,self.reward_cum,self.samar_pos,self.fuel_left,self.done
            
        
        
    def calc_obs(self):
        
        obs = self.environment[self.samar_pos[0]-1:self.samar_pos[0]+2,self.samar_pos[1]-1:self.samar_pos[1]+2]
        
        return obs
        

Overwriting GridWorld_Environment.py
