<a href="https://colab.research.google.com/github/Arkajeet7/warehouse-robotics-using-reinforcement-learning-/blob/main/agent/agent_rl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import matplotlib.pyplot as plt
from collections import deque
import random
import torch
from torch import nn
import torch.nn.functional as F
import importlib.util
import pandas as pd

In [4]:
class WarehouseEnv(gym.Env):
    """
    Warehouse Environment for RL tasks.

    Grid representation:
    - 0: walkable path
    - 1: rack/obstacle
    - S: start (0.5)
    - I: intermediate (0.65)
    - G: goal (0.75)
    - A: agent (1.0)

    Actions:
    0: LEFT, 1: DOWN, 2: RIGHT, 3: UP
    """

    metadata = {'render_modes': ['human', 'ansi'], 'render_fps': 4}

    def __init__(self, rows=11, colm=10, render_mode=None):
        super(WarehouseEnv, self).__init__()

        self.rows = rows
        self.colm = colm
        self.render_mode = render_mode
        self.reached_intermediate = False
        self.episode_num = 0

        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Box(low=0, high=1, shape=(rows, colm), dtype=np.float32)

        self.warehouse_layout = self._create_default_layout()

        self.start_pos = self.random_choice()

        #checks if the intermediate and the goal are equal
        while True:
          self.intermediate_pos = self.random_choice()
          SI = self.manhattan(self.start_pos, self.intermediate_pos)
          if self.intermediate_pos!=self.start_pos and SI>3:
            break
        #checks if the goal position is not equal to start and intermediate
        while True:
          self.goal_pos = self.random_choice()
          IG = self.manhattan(self.intermediate_pos, self.goal_pos)
          if self.goal_pos!=self.intermediate_pos and IG>3 :
            break
        self.steps = 0
        self.agent_pos = self.start_pos
        self.max_steps = rows * colm * 32

    def random_choice(self):
      while True:
        r=random.randint(0,self.rows-1)
        c=random.randint(0,self.colm-1)
        if self.warehouse_layout[r,c]==0:
          return r,c

    def manhattan(self,a,b):
      return abs(a[0] - b[0]) + abs(a[1] - b[1])


    def calculate_distance_from_goal(self, position):
        goal_x, goal_y = self.goal_pos
        pos_x, pos_y = position
        distance = self. manhattan((goal_x, goal_y), (pos_x, pos_y))
        return distance

    def calculate_distance_from_intermediate(self, position):
        intermediate_x, intermediate_y = self.intermediate_pos
        pos_x, pos_y = position
        distance = self.manhattan((intermediate_x, intermediate_y), (pos_x, pos_y))
        return distance

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        # ---- RANDOMIZE EVERY NEW EPISODE ----
        self.start_pos = (2,4)
        self.intermediate_pos=(4,4)
        self.goal_pos=(6,4)
        """
        while True:
          self.intermediate_pos = self.random_choice()
          SI = self.manhattan(self.start_pos, self.intermediate_pos)
          if self.intermediate_pos != self.start_pos and SI>3:
            break

        while True:
          self.goal_pos = self.random_choice()
          IG = self.manhattan(self.intermediate_pos, self.goal_pos)
          if self.goal_pos != self.intermediate_pos and IG>3:
            break"""

        self.agent_pos = self.start_pos
        self.steps = 0
        self.reached_intermediate = False
        self.neg_step_counter=0

        observation = self._get_observation()
        info = {}
        return observation, info

    def _get_observation(self):
        obs = self.warehouse_layout.copy().astype(np.float32)
        r, c = self.agent_pos
        sr, sc = self.start_pos
        ir, ic = self.intermediate_pos
        gr, gc = self.goal_pos

        obs[sr, sc] = 0.5
        obs[ir, ic] = 0.65
        obs[gr, gc] = 0.75
        obs[r, c] = -1.0
        return obs

    def step(self, action):
      self.steps += 1
      prev_row, prev_col = self.agent_pos
      row, col = prev_row, prev_col
      reward = 0
      terminated=False

      # ============================================
      # >>>REWARD-1 : distance between goals reward
      # ============================================
      """
      IG = self.manhattan(self.intermediate_pos, self.goal_pos)
      SI = self.manhattan(self.start_pos, self.intermediate_pos)
      reward +=0.02*(IG+SI)"""


      # --- Movement logic ---
      if action == 0:   # LEFT
        col = max(0, col - 1)
      elif action == 1: # DOWN
        row = min(self.rows - 1, row + 1)
      elif action == 2: # RIGHT
        col = min(self.colm - 1, col + 1)
      elif action == 3: # UP
        row = max(0, row - 1)

      # --- Check movement validity ---
      # ðŸ§± Hit obstacle
      if 0 <= row < self.rows and 0 <= col < self.colm:
        # =======================================
        # >>>REWARD-2: step penalty
        # ========================================

        reward -= 0.01

        if self.warehouse_layout[row, col] == 1:
            row, col = prev_row, prev_col
            reward -= 5.0
            terminated = False

        else:
          #checking for intermediate reached
          if self.reached_intermediate==True:
            if (row,col)==self.goal_pos:
              # ==========================================
              # >>>REWARD-3: reward for reaching the goal
              # ==========================================

              reward =50
              terminated=True

            else:
              # ========================================================
              # >>>REWARD-4: distance between agent pos and goal reward
              # ========================================================

              prev_dist_1=self.calculate_distance_from_goal((prev_row,prev_col))
              new_dist_1=self.calculate_distance_from_goal((row,col))
              diff_1=prev_dist_1 - new_dist_1
              reward +=1*diff_1

           #checking for intermediate not reached

          if self.reached_intermediate==False:
            if (row,col)==self.intermediate_pos:

              # ============================================
              # >>>REWARD-5: for reaching intermediate goal
              # ============================================
              reward = 3
              terminated=False
              self.reached_intermediate = True

            else:
              # ================================================================
              # >>>REWARD-6: distance reward between agent pos and intermediate
              # ================================================================
              prev_dist_2=self.calculate_distance_from_intermediate((prev_row,prev_col))
              new_dist_2=self.calculate_distance_from_intermediate((row,col))
              diff_2= prev_dist_2 - new_dist_2
              reward += 0.1*diff_2
              terminated=False

            # =======================================================
            # >>>REWARD-7: damping reward (for escaping local maxima)
            # =======================================================
            """
          if reward<0:
             # increase counter for consecutive negative-movement steps
             self.neg_step_counter += 1

             # cap the counter at 6 (after 6 steps boost becomes 0)
             step_count = min(self.neg_step_counter, 6)

             # linear decay: goes from A â†’ 0 in exactly 6 steps
             A = 4  # maximum boost value, tuneable
             add_value = A * (1 - (step_count - 1) / 5)

             if add_value < 0:
              add_value = 0

             # apply correction
             reward = reward + add_value
          else:
            # reset counter because reward is not negative
            self.neg_step_counter = 0"""

      else:
        # ðŸš« Out of bounds
        # ======================================
        # >>>REWARD-8: out of boundary penalty
        # ======================================
        row, col = prev_row, prev_col
        reward = -3.0
        terminated = False

    # --- Truncation check ---
    # ================================
    # >>>REWARD-9: truncation penalty
    # ================================
      truncated = self.steps >= self.max_steps
      if truncated:
        reward -= 30.0

    # ===================================
    # >>>REWARD-10: clipping
    # ===================================


    # --- Update agent position ---
      self.agent_pos = (row, col)
      observation = self._get_observation()
      info = {}

      return observation, reward, terminated, truncated, info


    def render(self):
        if self.render_mode == "ansi":
            return self._render_text()
        else:
            raise NotImplementedError()

    def _render_text(self):
        grid = self.warehouse_layout.copy()
        r, c = self.agent_pos
        output = ""
        for i in range(self.rows):
            for j in range(self.colm):
                if (i, j) == self.agent_pos:
                    output += "A "
                elif (i, j) == self.goal_pos:
                    output += "G "
                elif (i, j) == self.intermediate_pos:
                    output += "I "
                elif (i, j) == self.start_pos:
                    output += "S "
                else:
                    output += "â–ˆ " if grid[i, j] == 1 else ". "
            output += "\n"
        return output

    def close(self):
        pass
