In [4]:

# Install required libraries
# Import required libraries
import random
import math
import gymnasium as gym
from gymnasium import spaces
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple, deque
from itertools import count

device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "cpu"
)

print(device)
CUDA_LAUNCH_BLOCKING=1

In [2]:
# Definition of the Grid Environment class.

class GridEnvironment(gym.Env):
    # Attribute of a Gym class that provides info about the render modes
    metadata = { 'render.modes': [] }

    # Initialization function
    def __init__(self):

      self.observation_space = spaces.Discrete(36)
      self.action_space = spaces.Discrete(6)
      self.max_timesteps = 150

      self.timestep = 0
      self.agent_pos = [0, 0]
      self.goal_pos = [5, 5]
      self.package_pos = [3, 1]
      self.state = np.zeros((6,6))
      #Shelves
      self.wall_1 = [3,2]
      self.wall_2 = [4,2]
      self.wall_3 = [5,2]
      self.wall_4 = [1,4]
      self.wall_5 = [1,5]
      self.state[tuple(self.agent_pos)] = 50
      self.state[tuple(self.goal_pos)] = 23
      self.state[tuple(self.package_pos)] = 5
      self.state[tuple(self.wall_1)] = 12
      self.state[tuple(self.wall_2)] = 12
      self.state[tuple(self.wall_3)] = 12
      self.state[tuple(self.wall_4)] = 12
      self.state[tuple(self.wall_5)] = 12

      self.package = 0
      self.pickup = 0

    # Reset function
    def reset(self, **kwargs):

      self.state = np.zeros((6,6))
      self.agent_pos = [0, 0]
      self.state[tuple(self.agent_pos)] = 50
      self.state[tuple(self.goal_pos)] = 23
      self.state[tuple(self.package_pos)] = 5
      self.state[tuple(self.wall_1)] = 12
      self.state[tuple(self.wall_2)] = 12
      self.state[tuple(self.wall_3)] = 12
      self.state[tuple(self.wall_4)] = 12
      self.state[tuple(self.wall_5)] = 12
      self.timestep = 0
      observation = self.state.flatten()
      self.package =0
      info = {}

      return observation, info

    # Step function: Contains the implementation for what happens when an
    # agent takes a step in the environment.
    def step(self, action):
      prev = self.agent_pos.copy()
      if action == 0: #down
        self.agent_pos[0] += 1
      if action == 1: #up
        self.agent_pos[0] -= 1
      if action == 2: #right
        self.agent_pos[1] += 1
      if action == 3: #left
        self.agent_pos[1] -= 1

      reward = -1
      if action == 4: # Pick up
        if np.array_equal(self.agent_pos, self.package_pos) and self.package == 0: #Picked up, in right location
          self.package = 1
          reward = 40
          #print("picked up", self.timestep)
          #self.pickup += 1
        elif self.package: #Picked up while holding a package
          reward = -100
        else: # Picked up in wrong location
          reward = -10

      if action == 5:
        #Drop off
        if np.array_equal(self.agent_pos, self.goal_pos) and self.package == 1: #Dropped off in right location
          reward = 100
          #print("dropped off", self.timestep)
          self.package = 0
          self.timestep = self.max_timesteps
        elif self.package == 0: #Dropped off without holding a package
          reward = -100
        else: #dropped off in wrong location
          reward -10

      if np.array_equal(self.agent_pos, self.wall_1) or np.array_equal(self.agent_pos, self.wall_2) or np.array_equal(self.agent_pos, self.wall_3) or np.array_equal(self.agent_pos, self.wall_4) or np.array_equal(self.agent_pos, self.wall_5):
        reward = -20
        self.agent_pos = prev
        #print("wall")

      # Comment this to demonstrate the truncation condition.
      if self.agent_pos[0] > 5 or self.agent_pos[0] < 0 or self.agent_pos[1] > 5 or self.agent_pos[1] < 0:
        reward = -25
        #print("bounding: ", self.agent_pos)
      self.agent_pos = np.clip(self.agent_pos, 0, 5)

      self.state = np.zeros((6,6))
      self.state[tuple(self.goal_pos)] = 23
      self.state[tuple(self.package_pos)] = 5
      self.state[tuple(self.wall_1)] = 12
      self.state[tuple(self.wall_2)] = 12
      self.state[tuple(self.wall_3)] = 12
      self.state[tuple(self.wall_4)] = 12
      self.state[tuple(self.wall_5)] = 12
      self.state[tuple(self.agent_pos)] = 50
      observation = self.state.flatten()


      self.timestep += 1

      # Condition to check for termination (episode is over)
      terminated = True if self.timestep >= self.max_timesteps else False

      # Condition to check if agent is traversing to a cell beyond the permitted cells
      # This helps the agent to learn how to behave in a safe and predictable manner
      truncated = True if np.all((np.asarray(self.agent_pos) >=0 ) & (np.asarray(self.agent_pos) <= 6)) else False
      #print(self.agent_pos)

      return observation, reward, terminated, self.package

    # Render function: Visualizes the environment
    def render(self):
      plt.title('Grid Environment')
      plt.imshow(self.state)

In [1]:

# modify this to fit current environment 
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))
class Memory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def batch(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(obs , 64)  
        self.fc2 = nn.Linear(64, actions)
        
    def forward(self, input):
        t = torch.tensor([input])
        input = F.one_hot(t,36)
        input = input.float()
        output = F.relu(self.fc1(input))
        output = self.fc2(output)
        return output
    
class agent():
    def __init__(self,obs,actions):
        self.actions = actions
        self.batch = 128
        self.discount_factor = 0.99
        self.eps = 1
        self.eps_decay = 0.9954
        # self.tau = 0.005
        self.learning_rate = 0.1
        self.memory = Memory(5000) # replay memory 
        # self.policy_net = DQN(obs, actions).to(device) # action value function
        # self.target_net = DQN(obs, actions).to(device) # target action value function 
        self.policy_net = DQN(obs, actions)
        self.target_net = DQN(obs, actions)
        self.optimizer = optim.SGD(self.policy_net.parameters(), lr=0.01)
        self.Q_table = self.print_table() # not working rn
        
    
    def select_action(self,state):
        p = random.random()
        if p < self.eps:
            #random action
            return torch.tensor([[env.action_space.sample()]], dtype=torch.long)
        else:
            #best action
            with torch.no_grad():
                # t.max(1) will return the largest column value of each row.
                # second column on max result is index of where max element was
                # found, so we pick action with the larger expected reward.
                return self.policy_net(state).max(1).indices.view(1, 1)

    def update(self,timestep,episode,terminated):
        #update weight every 5 steps
        t = timestep +1
        if t%5 == 0:
            #sample and train policy_net
            self.replay()
        #update target_net every 5 eps, If solved, set as target_net
        ep = episode+1
        if ep%5 == 0 or terminated == True:
            self.target_net.load_state_dict(self.policy_net.state_dict())
        return 
    
    def replay(self):
        if len(agent.memory) < self.batch:
            return
        else:
            sample = self.memory.batch(self.batch)
            batch = Transition(*zip(*sample))
            non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
            non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
            print(batch.reward)
            print(batch.state)
            state_batch = torch.cat(batch.state)
            action_batch = torch.cat(batch.action)
            reward_batch = torch.cat(batch.reward)
            state_action_values = self.policy_net(state_batch).gather(1, action_batch)
            next_state_values = torch.zeros(self.batch)
            with torch.no_grad():
                next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1).values
                expected_state_action_values = (next_state_values * self.learning_rate) + reward_batch
                criterion =nn.MSELoss
                print(state_action_values)
                print(expected_state_action_values)
                loss = criterion(state_action_values,expected_state_action_values.unsqueeze(1))

            
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return 
        #replay algorithim here
    def print_table(self): # not working rn
        
        return 



env = GridEnvironment()
state, info = env.reset()

obs = len(state)
actions = env.action_space.n
agent = agent(obs,actions)
        
    

In [3]:


class DQN:
  # initialize values
  def __init__(self, N, Qtable, C):
    # initialize environment
    self.env = GridEnvironment()
    # initialize replay memory to capacity N
    self.replay = np.full(N, (0,0,0,0))
    self.pointer = 0

    # initialize action-value function Q with random weights 0
    self.action_value = None

    # initialize target action-value function Q with weights 0' = 0

    # Qtable: tuple[position(0, 35); holding the pacakge(0,1); action(0,5)] -> estimated reward
    self.Qtable = Qtable

    self.C = C
    pass

  
  # Main training function
  def train(self, episodes, steps, epsilon, gamma, discount, action_function):
    for i in episodes:
      # initialize sequence S and preprocessed sequence o
      seq  = [(0,0,0)]
      terminated = False
      pos = package = 0
      self.env.reset()
      for S in steps:
        # Select action
        action, key = action_function(epsilon, pos, package)
        # Set sequence
        seq[S+1] = key

        # Execute action and observe reward
        position, reward, terminated, package = self.env.step(action)
        pos = np.where(position == 50)[0][0]

        # store transition in replay buffer
        transition = (seq[S], action, reward, seq[S+1])
        self.replay[self.pointer] = transition
        self.pointer += 1

        # Sample mini batches from the replay buffer
        self.batching(transition)
        if terminated:
          # Set Yj to reward
          pass
        else:
          # Set Yj to Q value aprox
          pass


        # gamma decay
        gamma *= gamma
        # Set a gradient descent step

        # Every C steps reset Q' = Q

      # Decay epsilon after every episode
      epsilon *= epsilon
  # Determine the action for the warehouse environment
  def warehouse_action(self, epsilon, pos, package):
      if np.random.rand() < epsilon:
        action = np.random.randint(self.env.action_space)
        key = (pos, package, action)
        self.Qtable.get(key, 0)
      else:
        # select max(Q)
        actions = []
        for i in range(self.env.action_space):
            act = self.Qtable.get((pos, package, i), 0)
            actions.append(act)
        expected = max(actions)
        action = actions.index(expected)
        key = (pos, package, action)

      return action, key

  # Replay function for batching
  def batching(self, transition):
    pass

  # Save the current weights
  def save(self, filename):
    with open("pickles/" + filename, 'wb') as file:
      pickle.dump(self.Qtable, file)
