<a href="https://colab.research.google.com/github/darshita27-cmd/Warehouse-Robot-Policy-Based-Models/blob/main/warehouse_robot_path_optimization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torch numpy matplotlib # PyTorch is used in neural networks and reinforcement learning. numpy is used for multi dimensional arrays. matplotlib is used for visualization

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Using cached nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Using cached nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Using cached nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cusolver-cu12==11.6.1.9 (from torch)
  Using cached nvidia_cusolver_cu

Each cell is:

🟩 = empty

📦 = item location (pickup)

🎯 = drop location

🤖 = robot

In [None]:
# building a simple warehouse environment (grid world)
import numpy as np
import matplotlib.pyplot as plt
import random
# warehouse grid environment
class WarehouseEnv:
  def __init__(self,size=5): # size of the grid is 5
    self.size=size
    self.reset()

  def reset(self):
    self.grid=np.zeros((self.size,self.size)) # 2D array of 5X5 size filled with 0's that shows empty spaces
    self.robot_pos=[0,0] #top left corner[0,0] is the position of the robot initialy
    self.pickup_pos=[self.size-1,0] #pickup position for the robot where it will find the product
    self.dropoff_pos=[0,self.size-1] # drop off position for a item at top right corner [0,4]
    self.has_item=False #robot currently dosen't have any product
    self.steps=0 # number of actions or the track of actions  taken by the robot
    return self._get_state()

  def _get_state(self):
    return np.array(self.robot_pos + [int(self.has_item)]) # robot position was on the grid with x,y position. and has_item is a boolean value to integer(0=false,1). now ehat returned is [0,0,1] 0,0  will be the position and 1 is that the robot is carrying something

  def  _is_valid(self, pos):
    return 0 <=pos[0] < self.size and 0 <= pos[1] <self.size # checking if the position ofthe robot is within the valid grid. pos[0] checks if x coordinate is withing the valid range and similarly pos[1] checks if y is in a valid range. and it return boolean value(true, false)

  def step(self,action): # 0 = up, 1=down, 2=left, 3=right
    move =[[-1,0],[1,0],[0,-1],[0,1]] # [-1,0] moves up( decrease row index by 1), [1,0] move down, [0,-1] move left, [0,1] move right
    next_pos=[self.robot_pos[0] + move[action][0], self.robot_pos[1] + move[action][1]] # robot_pos[0] is for x axis and move[action][0] with it is for action at x axis and y remains same. example: if the robot position is [2,2] and the action is 0 (up) then next_pos= [2+[-1,0]],[2+[0,0]] which give [1,2]
    reward=-0.1 # penalty for each movement so that robot can find solution eary

    if self._is_valid(next_pos):
      self.robot_pos=next_pos

    # pickup logic
    if self.robot_pos==self.pickup_pos and not self.has_item: # self.robot_pos==self.pickup_pos checks if the robot is at the pickup position. not self.has_item checks if the robot disen't have the item
      self.has_item=True # if true the robot pickups item and sets flag indicating it's carrying something
      reward= +1.0 # reward for picking up the item

    # drop off logic
    elif self.robot_pos==self.dropoff_pos and self.has_item:
      self.has_item=False
      reward=+2.0

    self.steps+=1 # calculating how many stepsagent took
    done=self.steps > 50  # cheks if the agent has exceeded more than 50 if True the episode is maked as done.steps to avoid invinite loop
    return self._get_state(),reward, done # self._get_state() is the new state of the environment after the action

  # visual representation of the robot
  def render(self):
    grid=np.full((self.size,self.size),"⬜")
    x,y=self.robot_pos
    grid[x][y]='🤖'
    grid[self.pickup_pos[0]][self.pickup_pos[1]]='📦'
    grid[self.dropoff_pos[0]][self.dropoff_pos[1]]='🎯'
    for row in grid:
      print(''.join(row))
    print()

In [None]:
env=WarehouseEnv()
state=env.reset()
env.render()

🤖⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜



In [None]:
# for agent

import torch # torchhas similar libraries as numpy but it can also use GPU fir accelerated computing, automatic difference is needed in neural network and backprogation
import torch.nn as nn # for neural network layers and loss functions
import torch.optim as optim # consists of optimization algorithm that are used to update the parameters of neural networks during training. it has SGD(stochastic Gradient Descent) which updates parameters based on gradient of loss function. it also include adam which is an adaptive learning rate that adjusts learning rate for each parameter based on first and second moments of gradient. these optimizers are used to minimize loss functions
class PolicyNetwork(nn.Module):
  def __init__(self, input_dim,hidden_dim,output_dim): # input_dim is size of input(state representation), hidden_dim is number if neurons in hidden layers, output_dim is the no. of possible actions
    super(PolicyNetwork,self).__init__()
    self.fc1=nn.Linear(input_dim,hidden_dim) # first fully connected linear layer( input_dimention -> hidden dimention)
    self.relu=nn.ReLU() # relu is activation function for non-linearity
    self.fc2=nn.Linear(hidden_dim,output_dim) # second fully connected layer (hidden_dim -> output_dim)
    self.softmax=nn.Softmax(dim=-1) # softmax converts action score into probabilities (summing to 1). example: input=[2,3,0] where 2,3is the positiona nd 0 means no item. output=[0.1,0.6,0.2,0.1] 60% chance to move down

  def forward(self,x):
    x=self.relu(self.fc1(x)) # input x gets the current state( example: robot position+ environment info). fc1 is the linear transformation (input_dim -> hidden_dim) applying relu.
    y=self.softmax(self.fc2(x)) # linear transformation (hiiden_dim -> output_dim (no. of actions)). softmax converts to probabilities
    return y


In [None]:
# training reinforce
policy=PolicyNetwork(input_dim=3, hidden_dim=128, output_dim=4) # input_dim=3 means there are three inputs (x_pos,y_pos,had_item). output_dim=4 means 4 possible actions(up,down,right,left)
optimizer=optim.Adam(policy.parameters(),lr=0.01) # lr is learning rate. ot how drastically weights update each step. used adam optimizer to update the network weights. actumatically tracks all trainable parameters in policy network
gamma=0.99 # discount for future rewards
def select_action(state): # state is taken as an imput. its the current state
  state=torch.FloatTensor(state) # converting the 'state' to PyTorch tensor of FloatTensor as PyTorch requires inputs to be in tensor format
  probs=policy(state) # getting the output probabilities of each action
  dist=torch.distributions.Categorical(probs) # the probabilities got from above can be used to make samples or log probabiloties. the log probabilities and samples comes under Categorial
  action=dist.sample() # randomly select the any action but still the chances of selecting the highest probability is much more
  return action.item(),dist.log_prob(action)

def compute_returns(rewards,gamma): # rewards are a list or an array of rewarrdsreceived at each step during an episode. and gamma is high means future rewards are considered more important
  returns=[] # to store the returns of each time step
  G=0 # G will accumulate the discount returns as we integrate through the rewards. discount return is sum of all the rewards from time step onwards discounted by the factor.
  for r in reversed(rewards): # reverse the list because returns at each step depends on current reward and the returns of future time steps
    G=r+gamma*G # for r G (discount return is updated) including gamma
    returns.insert(0,G) # as earlier we computed by reversing now we need to get the correct orders for rewards.   this line ensures the G is int]serted at the 0 th or the initial position to get the correct order
  return returns



In [None]:
# train the agent
num_episodes=500
for episodes in range (num_episodes):
  state=env.reset()
  log_probs=[]
  rewards=[]
  total_reward=0

  for t in range(100):
    action,log_prob=select_action(state) # determine which action should be choosed
    next_state,reward,done=env.step(action)
    log_probs.append(log_prob)
    rewards.append(reward)
    state=next_state
    total_reward += reward
    if done:
      break

  returns=compute_returns(rewards,gamma) # calculating the discounted cummilative sum of rewards which is total return for each time step
  returns=torch.tensor(returns) # returns are converted into PyTorch tensor for numerical computations
  returns=(returns-returns.mean())/(returns.std()+ 1e-9) # normalising the rewards, normalizing can help reducing variable variance, preventing Exploding Gradients
  loss=0
  for log_prob,G in zip(log_probs,returns):
    loss -= log_prob * G # in the policy formulab its taken positive but since in PyTorch we need to minimize loss therefore the formula is taken negative. it works as if G is high(good reward), gradient will increase the probability of that action and if G is ow or negative gradient will decrease probability of that action

  optimizer.zero_grad() # PyTorch accumulates gradients by defaults and we want ti start fresh for each iteration therefore we zero the gradients
  loss.backward() # computing gradient of loss with respect to models parameters. this is done using backprpogation
  optimizer.step() # to update the parameters based on gradients in previous step. it adjusts parameters to minimize loss
# first we zeroed the gradients to make sure we are not accumulating gradients from the previous iteration. we then calculate the gradient of loss with respect to models parameters using backpropagation that gives us the direction in which we need to adjust models parameters to minimize the loss. finally, parameters are updated using gradients computed in the previous step
  if episodes % 50 == 0: # to print the episodes if they episode is a multiple of 50
    print(f'episode {episodes}, Total reward: {total_reward:.2f}')




episode 0, Total reward: -4.00
episode 50, Total reward: -4.00
episode 100, Total reward: -4.00
episode 150, Total reward: -4.00
episode 200, Total reward: -4.00
episode 250, Total reward: -4.00
episode 300, Total reward: -4.00
episode 350, Total reward: -4.00
episode 400, Total reward: -4.00
episode 450, Total reward: -4.00


In [None]:
# testing and visualize
state=env.reset()
env.render()
for _ in range(20):
  action,_=select_action(state)
  state,_,done=env.step(action)
  env.render()
  if done:
    break

🤖⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
🤖⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
🤖⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
🤖⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

⬜⬜⬜⬜🎯
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
⬜⬜⬜⬜⬜
📦⬜⬜⬜⬜

