In [5]:

# Install required libraries
# Import required libraries
import random
import math
import gymnasium as gym
from gymnasium import spaces
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pickle
from collections import namedtuple, deque
from itertools import count
import tensorflow as tf
import keras
# print("Num CPUs Available: ", tf.config.experimental.list_physical_devices('CPU'))

# device = torch.device(
#     "cuda" if torch.cuda.is_available() else
#     "cpu"
# )

# print(device)
# CUDA_LAUNCH_BLOCKING=1

In [6]:
# Basic outline of Environment here 

# grid = np.zeros((6,6))
# print(grid)
# Robot_start_pos = [0,0]
# package_start_pos = [6,6]
# destination_start_pos = [1,6]
# grid[tuple(Robot_start_pos)] = 1
# plt.imshow(grid)


# implement of class Warehouse Robot Here
class GridEnvironment(gymnasium.Env):
    metadata = {'render.modes': []}
    def __init__(self):
        self.observation_space = spaces.Discrete(36)
        self.action_space = spaces.Discrete(6)
        self.max_timesteps = 200

        
        self.timestep = 0
        self.agent_pos = [0,0]
        self.package_start_pos = [5,5]
        self.dropoff_pos = [5,0]
        self.shelf_pos = [[0,1],[1,1],[1,5],[2,3],[4,3],[3,3]]

        self.has_package = 0
        self.state = np.zeros((6,6))
        
        self.goal = False
        self.state[tuple(self.package_start_pos)] = 0.5
        self.state[tuple(self.dropoff_pos)] = 0.25 
        self.state[tuple(self.agent_pos)] = 1
        for i in self.shelf_pos:
            self.state[tuple(i)] = 0.1


    def reset(self, **kwargs):
        self.has_package = 0
        self.goal = False
        
        self.timestep = 0
        self.agent_pos = [0,0]
        self.package_start_pos = [5,5]
        self.dropoff_pos = [5,0]
        self.shelf_pos = [[0,1],[1,1],[1,5],[2,3],[4,3],[3,3]]
        self.has_package = 0

        self.state = np.zeros((6,6))
        self.state[tuple(self.package_start_pos)] = 0.5
        self.state[tuple(self.dropoff_pos)] = 0.25
        self.state[tuple(self.agent_pos)] = 1
        for i in self.shelf_pos:
            self.state[tuple(i)] = 0.1

        observation = self.state.flatten()
        info = {}
        return observation , info
    
    def step(self, action):
        reward = -1

        # determine action 
        if action == 0: #down
            self.agent_pos[0] +=1
            for i in self.shelf_pos:
                if np.array_equal(self.agent_pos,i):
                    reward = -20
                    self.agent_pos[0] -=1
        if action == 1: #up
            self.agent_pos[0] -=1
            for i in self.shelf_pos:
                if np.array_equal(self.agent_pos,i):
                    reward = -20
                    self.agent_pos[0] +=1
        if action == 2: #right
            self.agent_pos[1] += 1
            for i in self.shelf_pos:
                if np.array_equal(self.agent_pos,i):
                    reward = -20
                    self.agent_pos[1] -=1
        if action == 3: #left
            self.agent_pos[1] -= 1
            for i in self.shelf_pos:
                if np.array_equal(self.agent_pos,i):
                    reward = -20
                    self.agent_pos[1] +=1
        if action == 4: # pick up
            if self.has_package == 0:
                if np.array_equal(self.agent_pos,self.package_start_pos):
                    self.has_package = 1
                    reward = 20
                else:
                    reward = -20
            else:
                reward = -20
        if action == 5: # drop off:
            # sucessful delivery
            if self.has_package == 1:
                if np.array_equal(self.agent_pos,self.dropoff_pos):
                    reward =20
                    self.complete =True
                    self.goal = True
                    self.has_package = 0
                else:
                    reward = -20
            else:
                    reward = -20
        
        # if agent hit wall
        check_if_clip = self.agent_pos
        self.agent_pos = np.clip(self.agent_pos, 0 ,5) 
        if np.array_equal(self.agent_pos,check_if_clip) is False:
            reward = -20

        #package move with agent

        if self.has_package == 1:
            self.package_start_pos = self.agent_pos
            #print(self.package_start_pos)
        self.package_start_pos = np.clip(self.package_start_pos, 0, 5)
        
        self.state = np.zeros((6,6))
        # modify to show imgs
        self.state[tuple(self.package_start_pos)] = 0.5
        self.state[tuple(self.dropoff_pos)] = 0.25
        self.state[tuple(self.agent_pos)] = 1
        for i in self.shelf_pos:
            self.state[tuple(i)] = 0.1
        observation = self.state.flatten()

        
        self.timestep += 1
        if self.timestep>=self.max_timesteps:
            terminated = True
        else:
            terminated = self.goal
        #truncated = True if np.all((self.agent_pos >= 0) & (self.agent_pos <=5)) else False
        truncated = 0

        info = {}
        return observation, reward , terminated, truncated, info ,self.goal

    def render(self):
        plt.title('Grid environment')
        plt.imshow(self.state)


In [None]:
def DQN(obs, actions):
    model = keras.models.Sequential()
    model.add(keras.layers.Dense(32, input_dim=obs, activation='relu'))
    model.add(keras.layers.Dense(32, activation='relu'))
    model.add(keras.layers.Dense(actions, activation='linear'))
    model.compile(optimizer='adam', loss='mse', metrics=['mae']) #auto learning rate
    return model

class Agent:
    def __init__(self, obs, actions):

        self.actions = actions
        self.discount_factor = 0.99 #gamma
        self.epsilon = 1
        self.epsilon_decay = 0.9954
        self.epsilon_min = 0.01
        self.batch_size = 128
        self.size = 5000
        self.state_mem = np.zeros((self.size,obs))
        self.next_state_mem = np.zeros((self.size,obs))
        self.action_mem = np.zeros(self.size, dtype = int )
        self.reward_mem = np.zeros(self.size)
        self.done_mem = np.zeros(self.size,dtype=np.float32)
        self.pointer = 0
        self.policy_net = DQN(obs, actions)
        self.target_net = DQN(obs, actions) 

    def append(self, state, action, reward, next_state, done):
        i = self.pointer % self.size #get index
        self.state_mem[i] = state
        self.next_state_mem[i] = next_state
        self.reward_mem[i] = reward 
        self.done_mem[i] = done
        self.action_mem[i] = action
        self.pointer +=1

    def sample(self,batch):
        mem = min(self.pointer,self.size) # get range to choose mem from
        batch = np.random.choice(mem,batch) # choose random indices
        states = self.state_mem[batch]
        next_states = self.next_state_mem[batch]
        actions = self.action_mem[batch]
        rewards = self.reward_mem[batch]
        done = self.done_mem[batch]
        return states, actions, rewards, next_states, done
    
    def select_action(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.actions)  # random action
        state = state[np.newaxis,:]
        q_values = self.policy_net.predict(state, verbose=0)
        return np.argmax(q_values)  # best action

    def replay(self):
        if self.pointer < self.batch_size:
            return
        # print("learn god damn it")
        state, action, reward, next_state, done = self.sample(self.batch_size)

        # estimate cuurent q values
        q_eval = self.policy_net.predict(state,verbose = 0)
        # set target values
        q_next = self.target_net.predict(next_state, verbose = 0)
        q_target = q_eval.copy()
        batch_index = np.arange(self.batch_size, dtype=np.int32)
        # print('batch',batch_index)
        # print(action)
        q_target[batch_index,action] = reward + self.discount_factor *np.max(q_next,axis=1) *done

        self.policy_net.fit(state, q_target, verbose=0,)
        return
        
