# Load Libraries

In [5]:
!pip install -qq gym==0.23.0
from typing import Tuple, Dict, Optional, Iterable
import numpy as np
import matplotlib
from matplotlib import animation
from IPython.display import HTML
from IPython import display
import gym
from gym import spaces
from gym.error import DependencyNotInstalled
import pygame
from pygame import gfxdraw
import matplotlib.pyplot as plt
from pickle import FALSE

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Testing environment

In [7]:
class Maze(gym.Env):

    def __init__(self, agent1, agent2, agent3, exploring_starts: bool = False, shaped_rewards: bool = False, size: int = 10) -> None:
        super().__init__()
        self.exploring_starts = exploring_starts
        self.shaped_rewards = shaped_rewards

        # For agent1
        self.agent1 = agent1
        self.goal1 = (9, 9)
        self.obj1 = (6, 4)

         # For agent2
        self.agent2 = agent2
        self.goal2 = (2, 3)
        self.obj2 = (7, 0)

        # For agent3
        self.agent3 = agent3
        self.goal3 = (9,0)
        self.obj3 = (5, 9)


        self.maze = self._create_maze(size=size)
        self.distances1_1 = self._compute_distances(self.obj1, self.maze)
        self.distances2_1 = self._compute_distances(self.obj2, self.maze)
        self.distances3_1 = self._compute_distances(self.obj3, self.maze)
        self.distances1_2 = self._compute_distances(self.goal1, self.maze)
        self.distances2_2 = self._compute_distances(self.goal2, self.maze)
        self.distances3_2 = self._compute_distances(self.goal3, self.maze)
        self.action_space = spaces.Discrete(n=4)
        self.action_space.action_meanings = {0: 'UP', 1: 'RIGHT', 2: 'DOWN', 3: "LEFT"}    # 4 actions
        self.observation_space = spaces.MultiDiscrete([size, size])
        self.screen = None
        self.agent_transform = None
        self.pickup1 = False
        self.pickup2 = False
        self.pickup3 = False

    # Step for agent1
    def reset1(self, id: int) -> Tuple[int, int]:
        if id == 0:
            self.pickup2 = False
            self.agent1 = self.agent1
        else:
          self.pickup1 = True
          self.agent1 = (1, 2)
        return self.agent1

    def step1(self,action : int) -> Tuple[Tuple[int, int], float, bool, Dict]:

      reward1 = self.compute_reward1(self.agent1, action)
      self.agent1 = self._get_next_state(self.agent1, action)

      if not self.pickup1:
          done1 = self.agent1 == self.obj1
      else:
        done1 = self.agent1 == self.goal1
      info = {}
      return self.agent1, reward1, done1, info

    def isPickup1(self) -> bool:
        return True if self.agent1 == self.obj1 else False

    def compute_reward1(self, state: Tuple[int, int], action: int) -> float:
        next_state = self._get_next_state(state, action)
        if not self.pickup1:
          if self.shaped_rewards:
              return - (self.distances1_1[next_state] / self.distances1_1.max())
          return - float(state != self.obj1)
        else:
          if self.shaped_rewards:
              return - (self.distances1_2[next_state] / self.distances1_2.max())
          return - float(state != self.goal1)

    def simulate_step1(self, state: Tuple[int, int], action: int):
        reward = self.compute_reward1(state, action)
        next_state = self._get_next_state(state, action)
        if not self.pickup1:
          done = next_state == self.obj1
        else:
          done = next_state == self.goal1
        info = {}
        return next_state, reward, done, info

    # For agent 2
    def reset2(self, id: int) -> Tuple[int, int]:
        if id == 0:
          self.pickup2 = False
          self.agent2 = self.agent2
        else:
          self.pickup2 = True
          self.agent2 = (1,1)
        return self.agent2

    def step2(self,action : int) -> Tuple[Tuple[int, int], float, bool, Dict]:
      reward2 = self.compute_reward2(self.agent2, action)
      self.agent2 = self._get_next_state(self.agent2, action)

      if not self.pickup2:
          done2 = self.agent2 == self.obj2
      else:
        done2 = self.agent2 == self.goal2
      info = {}
      return self.agent2, reward2, done2, info

    def isPickup2(self) -> bool:
        return True if self.agent2 == self.obj2 else False

    def compute_reward2(self, state: Tuple[int, int], action: int) -> float:
        next_state = self._get_next_state(state, action)
        if not self.pickup2:
          if self.shaped_rewards:
              return - (self.distances2_1[next_state] / self.distances2_1.max())
          return - float(state != self.obj2)
        else:
          if self.shaped_rewards:
              return - (self.distances2_2[next_state] / self.distances2_2.max())
          return - float(state != self.goal2)

    def simulate_step2(self, state: Tuple[int, int], action: int):
        reward = self.compute_reward2(state, action)
        next_state = self._get_next_state(state, action)
        if not self.pickup2:
          done = next_state == self.obj2
        else:
          done = next_state == self.goal2
        info = {}
        return next_state, reward, done, info

#######################################################################################
    # For agent 3
    def reset3(self, id: int) -> Tuple[int, int]:
        if id == 0:
            self.pickup3 = False
            self.agent3 = self.agent3
        else:
            self.pickup3 = True
            self.agent3 = (8, 9)
        return self.agent3

    def step3(self, action: int) -> Tuple[Tuple[int, int], float, bool, Dict]:
        reward3 = self.compute_reward3(self.agent3, action)
        self.agent3 = self._get_next_state(self.agent3, action)

        if not self.pickup3:
            done3 = self.agent3 == self.obj3
        else:
            done3 = self.agent3 == self.goal3
        info = {}
        return self.agent3, reward3, done3, info

    def isPickup3(self) -> bool:
        return True if self.agent3 == self.obj3 else False

    def compute_reward3(self, state: Tuple[int, int], action: int) -> float:
        next_state = self._get_next_state(state, action)
        if not self.pickup3:
            if self.shaped_rewards:
                return - (self.distances3_1[next_state] / self.distances3_1.max())
            return - float(state != self.obj3)
        else:
            if self.shaped_rewards:
                return - (self.distances3_2[next_state] / self.distances3_2.max())
            return - float(state != self.goal3)


    def simulate_step3(self, state: Tuple[int, int], action: int):
        reward = self.compute_reward3(state, action)
        next_state = self._get_next_state(state, action)
        if not self.pickup3:
            done = next_state == self.obj3
        else:
            done = next_state == self.goal3
        info = {}
        return next_state, reward, done, info


# For Overall functions

    def _get_next_state(self, state: Tuple[int, int], action: int) -> Tuple[int, int]:
        if action == 0:
            next_state = (state[0] - 1, state[1])
        elif action == 1:
            next_state = (state[0], state[1] + 1)
        elif action == 2:
            next_state = (state[0] + 1, state[1])
        elif action == 3:
            next_state = (state[0], state[1] - 1)
        else:
            raise ValueError("Action value not supported:", action)
        if next_state in self.maze[state]:
            return next_state
        return state

    @staticmethod
    def _create_maze(size: int) -> Dict[Tuple[int, int], Iterable[Tuple[int, int]]]:
        maze = {(row, col): [(row - 1, col), (row + 1, col), (row, col - 1), (row, col + 1)]
                for row in range(size) for col in range(size)}

        left_edges = [[(row, 0), (row, -1)] for row in range(size)]
        right_edges = [[(row, size - 1), (row, size)] for row in range(size)]
        upper_edges = [[(0, col), (-1, col)] for col in range(size)]
        lower_edges = [[(size - 1, col), (size, col)] for col in range(size)]
        walls = [
            [(1,0),(1,1)],[(2,0),(2,1)],[(3,0),(3,1)],[(0,1),(1,1)],[(0,2),(1,2)],[(0,2),(0,3)],
            [(1,3),(1,4)],[(1,4),(2,4)],[(0,5),(1,5)],[(0,5),(0,6)],[(1,5),(1,6)],
            [(0,7),(1,7)],[(0,8),(1,8)],[(0,9),(1,9)],
            [(2,6),(2,7)],[(2,7),(2,8)],[(2,8),(2,9)],[(2,7),(3,7)],[(1,8),(2,8)],
            [(3,2),(3,3)],[(4,2),(4,3)],[(5,2),(5,3)],[(2,3),(3,3)],[(2,4),(3,4)],
            [(7,0),(8,0)],[(7,0),(7,1)],[(6,0),(6,1)],[(5,1),(6,1)],[(6,1),(6,2)],[(7,1),(7,2)],[(8,1),(8,2)],[(8,1),(9,1)],
            [(5,4),(5,5)],[(6,4),(6,5)],[(6,4),(7,4)],
            [(5,5),(5,6)],[(4,5),(4,6)],[(3,6),(4,6)],[(3,7),(4,7)],[(3,8),(4,8)],[(3,9),(4,9)],
            # [(5,7),(5,8)],[(5,8),(5,9)],[(6,7),(5,8)],[(5,8),(6,8)],
            [(9,8),(9,9)],[(8,8),(8,9)],
            [(8,4),(9,4)],[(8,5),(9,5)],[(8,6),(9,6)],[(8,7),(9,7)],[(8,6),(8,7)],[(7,6),(7,7)],
            [(5,7),(5,8)],[(5,8),(5,9)],[(7,8),(6,8)],[(6,8),(6,9)],[(5,9),(6,9)]

        ]

        obstacles = upper_edges + lower_edges + left_edges + right_edges + walls

        for src, dst in obstacles:
            maze[src].remove(dst)

            if dst in maze:
                maze[dst].remove(src)

        return maze

    @staticmethod
    def _compute_distances(goal: Tuple[int, int],
                           maze: Dict[Tuple[int, int], Iterable[Tuple[int, int]]]) -> np.ndarray:
        distances = np.full((10 ,10), np.inf)
        visited = set()
        distances[goal] = 0.

        while visited != set(maze):
            sorted_dst = [(v // 10, v % 10) for v in distances.argsort(axis=None)]
            closest = next(x for x in sorted_dst if x not in visited)
            visited.add(closest)

            for neighbour in maze[closest]:
                distances[neighbour] = min(distances[neighbour], distances[closest] + 1)
        return distances

# Rendering Function
    def render(self, mode: str = 'human') -> Optional[np.ndarray]:
      assert mode in ['human', 'rgb_array']

      screen_size = 600
      scale = screen_size / 10

      if self.screen is None:
          pygame.init()
          self.screen = pygame.Surface((screen_size, screen_size))

      surf = pygame.Surface((screen_size, screen_size))
      surf.fill((22, 36, 71))

      # Draw the grid and walls
      for row in range(10):
          for col in range(10):
              state = (row, col)
              for next_state in [(row + 1, col), (row - 1, col), (row, col + 1), (row, col - 1)]:
                    if next_state not in self.maze[state]:

                        # Add the geometry of the edges and walls (i.e. the boundaries between
                        # adjacent squares that are not connected).
                        row_diff, col_diff = np.subtract(next_state, state)
                        left = (col + (col_diff > 0)) * scale - 2 * (col_diff != 0)
                        right = ((col + 1) - (col_diff < 0)) * scale + 2 * (col_diff != 0)
                        top = (10 - (row + (row_diff > 0))) * scale - 2 * (row_diff != 0)
                        bottom = (10 - ((row + 1) - (row_diff < 0))) * scale + 2 * (row_diff != 0)

                        gfxdraw.filled_polygon(surf, [(left, bottom), (left, top), (right, top), (right, bottom)], (255, 255, 255))



      # Draw the agent1
      agent_row = int(screen_size - scale * (self.agent1[0] + 0.5))
      agent_col = int(scale * (self.agent1[1] + 0.5))
      gfxdraw.filled_circle(surf, agent_col, agent_row, int(scale * 0.6 / 2), (228, 63, 90))

      # Draw the goal1
      goal_left, goal_right, goal_top, goal_bottom = scale * 9 + 10, scale * 10 - 10, scale - 10, 10
      gfxdraw.filled_polygon(surf, [(goal_left, goal_bottom), (goal_left, goal_top), (goal_right, goal_top), (goal_right, goal_bottom)], (40, 199, 172))

      # Draw the object1
      object_size = int(scale * 0.4)

      if not self.pickup1:
          self.pickup1 = self.isPickup1()

      if not self.pickup1:
          # Object remains in its fixed position (1, 2) before pickup
          object_row = int(screen_size - scale * (self.obj1[0] + 0.5))  # Fixed at row 1
          object_col = int(scale * (self.obj1[1] + 0.5))  # Fixed at column 2
          gfxdraw.box(surf, (object_col - object_size // 2, object_row - object_size // 2, object_size, object_size), (255, 165, 0))
      else:
          # Object moves dynamically with the agent after pickup
          object_left = agent_col - object_size // 2
          object_top = agent_row - object_size // 2
          gfxdraw.box(surf, (object_left, object_top, object_size, object_size), (255, 165, 0))

##############################
      # Draw the agent2
      agent_row = int(screen_size - scale * (self.agent2[0] + 0.5))
      agent_col = int(scale * (self.agent2[1] + 0.5))
      gfxdraw.filled_circle(surf, agent_col, agent_row, int(scale * 0.6 / 2), (255, 255, 255))

      # Draw the goal2 at (2, 3)
      goal_row, goal_col = 2, 3  # New goal position
      goal_left = int(goal_col * scale + 10)  # Left boundary
      goal_right = int((goal_col + 1) * scale - 10)  # Right boundary
      goal_top = int(screen_size - (goal_row + 1) * scale + 10)  # Top boundary
      goal_bottom = int(screen_size - goal_row * scale - 10)  # Bottom boundary

      gfxdraw.filled_polygon(
          surf,
          [(goal_left, goal_bottom), (goal_left, goal_top), (goal_right, goal_top), (goal_right, goal_bottom)],
          (255, 192, 203)
      )

      # Draw the object2
      object_size2 = int(scale * 0.4)

      if not self.pickup2:
          self.pickup2 = self.isPickup2()

      if not self.pickup2:
          # Object remains in its fixed position (1, 2) before pickup
          object_row = int(screen_size - scale * (self.obj2[0] + 0.5))  # Fixed at row 1
          object_col = int(scale * (self.obj2[1] + 0.5))  # Fixed at column 2
          gfxdraw.box(surf, (object_col - object_size2 // 2, object_row - object_size2 // 2, object_size2, object_size2), (128, 0, 128))
      else:
          # Object moves dynamically with the agent after pickup
          object_left = agent_col - object_size2 // 2
          object_top = agent_row - object_size2 // 2
          gfxdraw.box(surf, (object_left, object_top, object_size2, object_size2), (128, 0, 128))


##############################
      # Draw the agent3
      agent_row = int(screen_size - scale * (self.agent3[0] + 0.5))
      agent_col = int(scale * (self.agent3[1] + 0.5))
      gfxdraw.filled_circle(surf, agent_col, agent_row, int(scale * 0.6 / 2), (0, 255, 255)) # aqua

      # Draw the goal3
      goal_row, goal_col = self.goal3[0], self.goal3[1]  # New goal position
      goal_left = int(goal_col * scale + 10)  # Left boundary
      goal_right = int((goal_col + 1) * scale - 10)  # Right boundary
      goal_top = int(screen_size - (goal_row + 1) * scale + 10)  # Top boundary
      goal_bottom = int(screen_size - goal_row * scale - 10)  # Bottom boundary

      gfxdraw.filled_polygon(
          surf,
          [(goal_left, goal_bottom), (goal_left, goal_top), (goal_right, goal_top), (goal_right, goal_bottom)],
          (224, 255, 255) # Light Cyan
      )
      # Draw the object2
      object_size3 = int(scale * 0.4)

      if not self.pickup3:
          self.pickup3 = self.isPickup3()

      if not self.pickup3:
          # Object remains in its position before pickup
          object_row = int(screen_size - scale * (self.obj3[0] + 0.5))  # Fixed at row 1
          object_col = int(scale * (self.obj3[1] + 0.5))  # Fixed at column 2
          gfxdraw.box(surf, (object_col - object_size3 // 2, object_row - object_size3 // 2, object_size3, object_size3), (150, 75, 0)) # Mint Cream
      else:
          # Object moves dynamically with the agent after pickup
          object_left = agent_col - object_size3 // 2
          object_top = agent_row - object_size3 // 2
          gfxdraw.box(surf, (object_left, object_top, object_size3, object_size3), (150, 75, 0))

#########################


      surf = pygame.transform.flip(surf, False, True)
      self.screen.blit(surf, (0, 0))

      if mode == 'human':
          pygame.display.flip()
      elif mode == 'rgb_array':
          return np.transpose(
              np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2))

    def close(self) -> None:
        if self.screen is not None:
            pygame.display.quit()
            pygame.quit()
            self.screen = None

In [8]:
def display_video(frames):
    # Copied from: https://colab.research.google.com/github/deepmind/dm_control/blob/master/tutorial.ipynb
    orig_backend = matplotlib.get_backend()
    matplotlib.use('Agg')
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    matplotlib.use(orig_backend)
    ax.set_axis_off()
    ax.set_aspect('equal')
    ax.set_position([0, 0, 1, 1])
    im = ax.imshow(frames[0])
    def update(frame):
        im.set_data(frame)
        return [im]
    anim = animation.FuncAnimation(fig=fig, func=update, frames=frames,
                                    interval=50, blit=True, repeat=False)
    return HTML(anim.to_html5_video())

# Load Q-Value table

In [9]:
import numpy as np

# Load Agent 1
agent1_O = np.loadtxt('/content/drive/MyDrive/RL Project/Final defence code/agent1_O_reshaped.csv', delimiter=',')
agent1_O_A = agent1_O.reshape((10, 10, 4))

agent1_G = np.loadtxt('/content/drive/MyDrive/RL Project/Final defence code/agent1_G_reshaped.csv', delimiter=',')
agent1_G_A = agent1_G.reshape((10, 10, 4))

# Load Agent 2
agent2_O = np.loadtxt('/content/drive/MyDrive/RL Project/Final defence code/agent2_O_reshaped.csv', delimiter=',')
agent2_O_A = agent2_O.reshape((10, 10, 4))

agent2_G = np.loadtxt('/content/drive/MyDrive/RL Project/Final defence code/agent2_G_reshaped.csv', delimiter=',')
agent2_G_A = agent2_G.reshape((10, 10, 4))

# Load Agent 3
agent3_O = np.loadtxt('/content/drive/MyDrive/RL Project/Final defence code/agent3_O_reshaped.csv', delimiter=',')
agent3_O_A = agent3_O.reshape((10, 10, 4))

agent3_G = np.loadtxt('/content/drive/MyDrive/RL Project/Final defence code/agent3_G_reshaped.csv', delimiter=',')
agent3_G_A = agent3_G.reshape((10, 10, 4))

print("All action value tables loaded successfully!")


All action value tables loaded successfully!


# Testing Phase

# Defining policy

In [10]:
def target_policy(state: tuple, action_values: np.ndarray):
    av = action_values[state]
    return np.random.choice(np.flatnonzero(av == av.max()))

## Defining order

In [11]:
def define_order(sequence):
    age1 = ["A1_1", "A1_2"]
    age2 = ["A2_1", "A2_2"]
    age3 = ["A3_1", "A3_2"]

    # Mapping numbers to age lists
    age_map = {1: age1, 2: age2, 3: age3}

    # Constructing order based on the sequence
    order = sum([age_map[num] for num in sequence], [])

    return order

## Test Agent

In [12]:
def test_agent(environment: 'Maze', policy: 'function', order: list, *action_values):
    frames = []
    state1 = env.reset1(0)
    state2 = env.reset2(0)
    state3 = env.reset3(0)

    done_flags = {
        "A1_1": False, "A1_2": False,
        "A2_1": False, "A2_2": False,
        "A3_1": False, "A3_2": False
    }

    states = {"A1": state1, "A2": state2, "A3": state3}
    actions = {
        "A1_1": (env.step1, 0), "A1_2": (env.step1, 1),
        "A2_1": (env.step2, 2), "A2_2": (env.step2, 3),
        "A3_1": (env.step3, 4), "A3_2": (env.step3, 5)
    }

    frames.append(env.render(mode="rgb_array"))

    for task in order:
        print(f"Executing {task}")
        step_function, action_idx = actions[task]
        print(step_function, action_idx)

        while not done_flags[task]:
            action = policy(states[task[:2]], action_values[action_idx])

            next_state, reward, done_flags[task], extra_info = step_function(action)
            print(f"Action for {task}: Next_state={next_state}, Reward={reward}, Done={done_flags[task]}")

            frames.append(env.render(mode="rgb_array"))
            states[task[:2]] = next_state

        print(f"{task} : DONE")

    return display_video(frames)


# Output Function

In [14]:
# Enter the agent states
agent1 = tuple(map(int, input("Enter the agent1 state (space-separated numbers): ").split()))
agent2 = tuple(map(int, input("Enter the agent2 state (space-separated numbers): ").split()))
agent3 = tuple(map(int, input("Enter the agent3 state (space-separated numbers): ").split()))


# init environment
env = Maze(agent1=agent1, agent2 = agent2, agent3= agent3)

# Enter the  order of agent execution
sequence = tuple(map(int, input("Enter a sequence of numbers (e.g., 2 1 3): ").split()))
order = define_order(sequence)

print(f"agent1:{agent1},agent2:{agent2},agent3: {agent3}, order: {order}")
# Run the testing environment
test_agent(env, target_policy, order,  agent1_O_A, agent1_G_A, agent2_O_A, agent2_G_A , agent3_O_A, agent3_G_A)


Enter the agent1 state (space-separated numbers): 0 0
Enter the agent2 state (space-separated numbers):  0 0
Enter the agent3 state (space-separated numbers): 0 0
Enter a sequence of numbers (e.g., 2 1 3): 1 2 3
agent1:(0, 0),agent2:(0, 0),agent3: (0, 0), order: ['A1_1', 'A1_2', 'A2_1', 'A2_2', 'A3_1', 'A3_2']
Executing A1_1
<bound method Maze.step1 of <__main__.Maze object at 0x792e18da3b90>> 0
Action for A1_1: Next_state=(1, 0), Reward=-1.0, Done=False
Action for A1_1: Next_state=(2, 0), Reward=-1.0, Done=False
Action for A1_1: Next_state=(3, 0), Reward=-1.0, Done=False
Action for A1_1: Next_state=(4, 0), Reward=-1.0, Done=False
Action for A1_1: Next_state=(5, 0), Reward=-1.0, Done=False
Action for A1_1: Next_state=(5, 1), Reward=-1.0, Done=False
Action for A1_1: Next_state=(5, 2), Reward=-1.0, Done=False
Action for A1_1: Next_state=(6, 2), Reward=-1.0, Done=False
Action for A1_1: Next_state=(6, 3), Reward=-1.0, Done=False
Action for A1_1: Next_state=(6, 4), Reward=-1.0, Done=True
A1