In [None]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import random
from IPython.display import display, clear_output
import time
import gymnasium
from gymnasium import spaces

$$
\begin{array}{|c|c|c|c|c|c|c|c|c|}
\hline
\text{Batman} & & & & & & & & \\
\hline
 & & &\text{Alfred} & & & & &\text{Bane} \\
\hline
 & & & &\text{Joker} & & & & \\
\hline
 &\text{Batmobile} & & & & \text{Alfred} & & & \\
\hline
& & &\text{Arkham} & & & &\text{Court Of Owls} & \text{Selina}\\
\hline
 & & & & & & & & \\
\hline
 & &\text{Scarecrow} &\text{Redbird} & & & & & \\
\hline
 & & & & & &\text{Alfred} & & \\
\hline
\text{Robin} & & & & & & & &\text{Alfred} \\
\hline
\end{array}
$$

In [None]:
from PIL import Image
import io

class GothamCityCooperative(gymnasium.Env):
    def __init__(self, max_timesteps):
        # Environment Details
        self.grid_size = (9,9)
        
        ## Initializing the Observation Space and Action Space
        self.observation_space = spaces.Discrete(self.grid_size[0]* self.grid_size[1])
        self.action_space = spaces.Discrete(4)
        self.max_timesteps = max_timesteps
        self.time_step = 0
        
        ## Actor Positions
        self.bat_pos = [0,0]
        self.robin_pos = [8,0]
        self.selina_pos = (4,8)
        
        ## Negative Actors
        self.joker_pos = (2,4)
        self.arkham_pos = (4,3)
        self.bane_pos = (1,8)
        self.owl_pos = (4,7)
        self.scarecrow_pos = (6,2)
        
        ## Postive Actors
        self.batmobile_pos = (3,1)
        self.redbird_pos = (6,3)
        self.alfred_pos = ((1,3), (3,5), (7,6), (8,8))
        # Load Images
        self.batman_image = mpimg.imread('images/bat.png')
        self.alfred_image = mpimg.imread('images/alfred.jpg')
        self.robin_image = mpimg.imread('images/robin.jpg')
        self.batmobile_image = mpimg.imread('images/batmobile.png')
        self.redbird_image = mpimg.imread('images/red_bird.jpg')
        
        self.joker_image = mpimg.imread('images/joker.jpg')
        self.arkham_image = mpimg.imread('images/arkham_asylum.jpg')
        self.bane_image = mpimg.imread('images/bane.jpg')
        self.bane_action = mpimg.imread('images/bane_breaks_bats.jpg')
        self.owl_image = mpimg.imread('images/court_of_owls.jpg')
        self.owl_action = mpimg.imread('images/court_of_owls_attack.png')
        self.scarecrow_image = mpimg.imread('images/scare_crow.png')
        self.scarecrow_action = mpimg.imread('images/scarecrow_attack.jpeg')
        
        self.selina_image = mpimg.imread('images/selina.png')
        self.gotham_image = mpimg.imread('images/gotham.jpg')
        
        # Initializing the State of the environment.
        self.state = np.zeros(self.grid_size)
        self.state[tuple(self.bat_pos)] = 2
        self.state[tuple(self.robin_pos)] = 1
        self.state[tuple(self.selina_pos)] = 0.5
    
    def reset(self, **kwargs):
        self.bat_pos = [0,0]
        self.robin_pos = [8,0]
        self.time_step = 0
        
        self.state = np.zeros(self.grid_size)
        self.state[tuple(self.bat_pos)] = 2
        self.state[tuple(self.robin_pos)] = 1
        
        observation = self.state.flatten()
        
        info = {}
        info['Termination Message'] = 'Not Terminated'
        
        return observation, info
    
    def get_reward(self, agent_pos, agent_old_pos, agent_name):

        if np.array_equal(agent_pos, self.selina_pos):
            # Assigining the reward of +20 on reaching the selina position.
            return 20
        elif np.array_equal(agent_pos, agent_old_pos):
            # Assigining the reward of -1 on statying the same position after action.
            return -1
        elif np.array_equal(agent_pos, self.joker_pos):
            # Assigining the reward of -10 on reaching the joker position.
            return -10
        elif np.array_equal(agent_pos, self.owl_pos):
            # Assigining the reward of -7 on reaching the court of owls position.
            return -7
        elif np.array_equal(agent_pos, self.bane_pos):
            # Assigining the reward of -5 on reaching the bane position.
            return -5
        elif np.array_equal(agent_pos, self.scarecrow_pos):
            # Assigining the reward of -5 on reaching the bane position.
            return -3
        elif agent_pos in self.alfred_pos:
            # Assigining the reward of -5 on reaching the Alfred position.
            return 10
        elif np.array_equal(agent_pos, self.arkham_pos):
            # Assigining the reward of -1 on reaching the Arkham Asylum and moving the agent to starting position.
            if agent_name == 'batman':
                self.bat_pos = [0,0]
            elif agent_name == 'robin':
                self.robin_pos = [8,0]
            return -1
        elif np.array_equal(agent_pos, self.batmobile_pos):
            # Assigining the reward of -1 on reaching the Arkham Asylum and moving the agent to starting position.
            if agent_name == 'batman':
                self.bat_pos = list(random.choice(self.alfred_pos))
                return 10
            elif agent_name == 'robin':
                return 0
        elif np.array_equal(agent_pos, self.redbird_pos):
            # Assigining the reward of -1 on reaching the Arkham Asylum and moving the agent to starting position.
            if agent_name == 'batman':
                self.bat_pos = list(random.choice(self.alfred_pos))
                return 0
            elif agent_name == 'robin':
                self.robin_pos = list(random.choice(self.alfred_pos))
                return 10
        else: return 0
    
    
    def step(self, bat_action, robin_action):
        reward_bat = 0
        reward_robin = 0
        bat_truncated = False
        robin_truncated = False
        self.state = np.zeros(self.grid_size)
        print('init_bat_pos', tuple(self.bat_pos) != self.selina_pos)
        print('init_robin_pos', tuple(self.robin_pos) != self.selina_pos)
        if tuple(self.bat_pos) != self.selina_pos:
            bat_old_pos = self.bat_pos.copy()
            if bat_action == 0:
                # Take Down action.
                self.bat_pos[0] += 1
            elif bat_action == 1:
                # Take Up action.
                self.bat_pos[0] -= 1
            elif bat_action == 2:
                # Take Right action.
                self.bat_pos[1] += 1
            elif bat_action == 3:
                # Take Left action.
                self.bat_pos[1] -= 1
            
            self.bat_pos[0] = np.clip(self.bat_pos[0], 0, self.grid_size[0]-1)
            self.bat_pos[1] = np.clip(self.bat_pos[1], 0, self.grid_size[1]-1)
            
            
            
            reward_bat = self.get_reward(tuple(self.bat_pos), bat_old_pos, 'batman')
            
            self.state[tuple(self.bat_pos)] = 2
            
            if (self.bat_pos[0] >=0) & (self.bat_pos[0] <= self.grid_size[0]-1) & (self.bat_pos[1] >=0)  & (self.bat_pos[1] <= self.grid_size[1]-1):
                bat_truncated = True
            else:
                bat_truncated = False
        
        if tuple(self.robin_pos) != self.selina_pos:
            robin_old_pos = self.robin_pos.copy()
            if robin_action == 0:
                # Take Down action.
                self.robin_pos[0] += 1
            elif robin_action == 1:
                # Take Up action.
                self.robin_pos[0] -= 1
            elif robin_action == 2:
                # Take Right action.
                self.robin_pos[1] += 1
            elif robin_action == 3:
                # Take Left action.
                self.robin_pos[1] -= 1
            
            self.robin_pos[0] = np.clip(self.robin_pos[0], 0, self.grid_size[0]-1)
            self.robin_pos[1] = np.clip(self.robin_pos[1], 0, self.grid_size[1]-1)
            
            
            reward_robin = self.get_reward(tuple(self.robin_pos), robin_old_pos, 'robin')
            
            self.state[tuple(self.robin_pos)] = 1
            
            if (self.robin_pos[0] >=0) & (self.robin_pos[0] <= self.grid_size[0]-1) & (self.robin_pos[1] >=0)  & (self.robin_pos[1] <= self.grid_size[1]-1):
                robin_truncated = True
            else:
                robin_truncated = False
        else:
            reward_robin = 20
        
        self.state[tuple(self.selina_pos)] = 0.5
        observation = self.state.flatten()
        
        # Updating the time step.
        self.time_step += 1

        info = {}

        # Updating the episode termination status.
        if np.array_equal(self.bat_pos, self.robin_pos) and np.array_equal(self.bat_pos, self.selina_pos) and np.array_equal(self.robin_pos, self.selina_pos):
            # Goal position is reached.
            terminated = True
            info['Termination Message'] = 'Goal Position Reached !!!'
        elif self.time_step >= self.max_timesteps:
            # Maximum time steps reached.
            terminated = True
            info['Termination Message'] = 'Maximum Time Reached'
        else:
            # Episode not terminated.
            terminated = False
            info['Termination Message'] = 'Not Terminated'
        print('obs:', observation)
        print('robin_pos:', self.robin_pos)
        print('bat_pos:', self.bat_pos)
            
        return observation, reward_bat, reward_robin, terminated, bat_truncated, robin_truncated, info

    
    def render(self):
        plt.figure(figsize=(8, 8))
        fig , ax = plt.subplots()
        
        ax.imshow(self.gotham_image, extent=[0, self.grid_size[1], 0, self.grid_size[0]])
        
        ax.imshow(self.batman_image, extent=[self.bat_pos[1], self.bat_pos[1] + 1, self.grid_size[0] - self.bat_pos[0] - 1, self.grid_size[0] - self.bat_pos[0]])
        ax.imshow(self.robin_image, extent=[self.robin_pos[1], self.robin_pos[1] + 1, self.grid_size[0] - self.robin_pos[0] - 1, self.grid_size[0] - self.robin_pos[0]])
        
        ax.imshow(self.batmobile_image, extent=[self.batmobile_pos[1], self.batmobile_pos[1] + 1, self.grid_size[0] - self.batmobile_pos[0] - 1, self.grid_size[0] - self.batmobile_pos[0]])
        ax.imshow(self.redbird_image, extent=[self.redbird_pos[1], self.redbird_pos[1] + 1, self.grid_size[0] - self.redbird_pos[0] - 1, self.grid_size[0] - self.redbird_pos[0]])
        
        ax.imshow(self.joker_image, extent=[self.joker_pos[1], self.joker_pos[1] + 1, self.grid_size[0] - self.joker_pos[0] - 1, self.grid_size[0] - self.joker_pos[0]])
        ax.imshow(self.arkham_image, extent=[self.arkham_pos[1], self.arkham_pos[1] + 1, self.grid_size[0] - self.arkham_pos[0] - 1, self.grid_size[0] - self.arkham_pos[0]])
        
        if np.array_equal(self.bat_pos, self.bane_pos) or np.array_equal(self.robin_pos, self.bane_pos):
            ax.imshow(self.bane_action, extent=[self.bane_pos[1], self.bane_pos[1] + 1, self.grid_size[0] - self.bane_pos[0] - 1, self.grid_size[0] - self.bane_pos[0]])
        else:
            ax.imshow(self.bane_image, extent=[self.bane_pos[1], self.bane_pos[1] + 1, self.grid_size[0] - self.bane_pos[0] - 1, self.grid_size[0] - self.bane_pos[0]])
        
        if np.array_equal(self.bat_pos, self.owl_pos) or np.array_equal(self.robin_pos, self.owl_pos):
            ax.imshow(self.owl_action, extent=[self.owl_pos[1], self.owl_pos[1] + 1, self.grid_size[0] - self.owl_pos[0] - 1, self.grid_size[0] - self.owl_pos[0]])
        else:
            ax.imshow(self.owl_image, extent=[self.owl_pos[1], self.owl_pos[1] + 1, self.grid_size[0] - self.owl_pos[0] - 1, self.grid_size[0] - self.owl_pos[0]])
        
        if np.array_equal(self.bat_pos, self.scarecrow_pos) or np.array_equal(self.robin_pos, self.scarecrow_pos):
            ax.imshow(self.scarecrow_image, extent=[self.scarecrow_pos[1], self.scarecrow_pos[1] + 1, self.grid_size[0] - self.scarecrow_pos[0] - 1, self.grid_size[0] - self.scarecrow_pos[0]])
        else:
            ax.imshow(self.scarecrow_image, extent=[self.scarecrow_pos[1], self.scarecrow_pos[1] + 1, self.grid_size[0] - self.scarecrow_pos[0] - 1, self.grid_size[0] - self.scarecrow_pos[0]])
        
        ax.imshow(self.selina_image, extent=[self.selina_pos[1], self.selina_pos[1] + 1, self.grid_size[0] - self.selina_pos[0] - 1, self.grid_size[0] - self.selina_pos[0]])
        
        
        
        for pos in self.alfred_pos:
            ax.imshow(self.alfred_image, extent=[pos[1], pos[1] + 1, self.grid_size[0] - pos[0] - 1, self.grid_size[0] - pos[0]])
        ax.set_xlim(0, self.grid_size[1])
        ax.set_ylim(0, self.grid_size[0])
        
        ax.axis('off')
        # Save rendered image directly to a buffer
        buf = io.BytesIO()
        fig.savefig(buf, format='png', dpi=fig.dpi, bbox_inches='tight', pad_inches=0)
        buf.seek(0)

        # Convert buffer to PIL Image and then to RGB array
        image = Image.open(buf)
        rgb_image = np.array(image.convert('RGB'))
        
        plt.close(fig)  # Close the figure to prevent it from being displayed
        return rgb_image

### Random Agent

In [None]:
env = GothamCityCooperative(10)

In [None]:
class RandomAgent:
  def __init__(self, env):
   
    self.env = env
    self.observation_space = env.observation_space
    self.action_space = env.action_space
  
  def step(self: 'RandomAgent', state: np.ndarray) -> int:

    return np.random.choice(self.action_space.n), np.random.choice(self.action_space.n)

In [None]:
agent = RandomAgent(env)

In [None]:
obs, _ = env.reset()
terminated, truncated = False, False

env.render()

def get_action(action):
  # Mapping action number to action
  if action == 0:
    action_took = 'Down'
  elif action == 1:
    action_took = 'Up'
  elif action == 2:
    action_took = 'Right'
  elif action == 3:
    action_took = 'Left'
  
  return action_took

# Continue through the episode untill we reach termination state
while not terminated:
  # Agent deciding on what action to choose.
  action1, action2 = agent.step(obs)
  
  
  obs, reward_bat, reward_robin, terminated, truncated_bat, truncated_robin, info = env.step(action1, action2)
  rgb_array = env.render()
  plt.figure(figsize=(8, 8))
  plt.imshow(rgb_array)
  plt.axis('off')
  plt.show()
    
  time.sleep(0.0001)
  clear_output(wait=True)

## Decentralized Q - Learning

In [None]:
import pickle

class DecentralizedQLearningAgent:
  """
  Define an RL agent which follows an epsilon greedy algorithm throughout the training.
  It uses Q-Learning algorithm to update Q-values.
  """
  def __init__(self: 'DecentralizedQLearningAgent', env: 'gymnasium.Env', learning_rate: 'float', discount_factor: 'float') -> None:
    """Initializing Epsilon Greedy Agent

    Args:
        env (gymnasium.Env): object of Grid Environment.
        learning_rate (float): Learning rate used in SARSA algorithm.
        discount_factor (float): Discount factor to quantify the importance of future reward.
    """
    self.env = env
    self.observation_space = env.observation_space
    self.action_space = env.action_space
    self.learning_rate = learning_rate
    self.discount_factor = discount_factor
    
    # Initiating the Q-table with all Zeros.
    self.q_table = np.zeros((2, self.observation_space.n, self.action_space.n))


  def step(self: 'DecentralizedQLearningAgent', state_bat: int, state_robin: int, epsilon: float) -> int:
    """Given the current state and probability of choosing a random action we will
    provide the action we should choose.

    Args:
        state (int): Current location of the agent in the environment.
        epsilon (float): Probability of taking random action.

    Returns:
        action (int): Action represented as number; (0: Down, 1: Up, 2: Right, 3: Left)
    """
    # Epsilon-Greedy Action Selection
    random_number = np.random.rand()

    if random_number <= epsilon:
        action1 = self.env.action_space.sample()
        action2 = self.env.action_space.sample()
    else:
        action1 = np.argmax(self.q_table[0, state_bat,:])
        action2 = np.argmax(self.q_table[1, state_robin,:])
    
    return action1, action2
  
  def update_qvalue(self: 'DecentralizedQLearningAgent', agent_name: str, current_state: int, current_action: int, 
                    reward: int, future_state: int) -> None:
    """Update the Q value based on the Q-Learning algorithm

    Args:
        current_state (int): Current State represented as integer
        current_action (int): Current action represented as number; (0: Down, 1: Up, 2: Right, 3: Left)
        reward (int): Immediate reward that was recieved after taking the current action.
        future_state (int): Future State represented as integer
    """
    # Q-Learning update Q table
    if agent_name == 'batman':
        agent_id = 0
    elif agent_name == 'robin':
        agent_id = 1
    print(agent_name)
    print(future_state)
    print(self.q_table[agent_id, future_state].shape)
    self.q_table[agent_id, current_state, current_action] = self.q_table[agent_id, current_state, current_action] + self.learning_rate * (reward 
                                                                                                    + self.discount_factor * np.max(self.q_table[agent_id, future_state, :]) 
                                                                                                    - self.q_table[agent_id, current_state, current_action])
    print(self.q_table[agent_id, current_state, current_action])


In [None]:
def process_state(observation, agent_id):
    """
    Extracts the state from the observation for a specific agent.
    
    Args:
        observation (np.array): The observation array from the environment.
        agent_id (int): The identifier for the agent (e.g., 2 for batman, 1 for robin).

    Returns:
        state: The extracted state for the agent.
    """
    state = np.where(observation == agent_id)[0]
    if len(state) == 0:
        return None 
    else:
        return state[0]

def is_valid_state(state):
    """
    Checks if the provided state is valid.
    
    Args:
        state: The state to check.

    Returns:
        bool: True if the state is valid, False otherwise.
    """
    return state is not None

def handle_invalid_state():
    """
    Provides a fallback for invalid states.

    Returns:
        state: A default or fallback state.
    """
    default_state = 0 
    return default_state

In [None]:
def decentralized_q_learning_learning_loop(env: 'gymnasium.Env',learning_rate: float, discount_factor: float, episodes: int,
                        min_epsilon_allowed: float, initial_epsilon_value: float) -> tuple['decentralized_q_learning_learning_loop', list, list]:
  """Learning loop train Agent to reach GOAL state in the environment using Q-Learning Algorithm.

  Args:
      env (gymnasium.Env): object of Grid Environment.
      learning_rate (float): Learning rate used in SARSA algorithm
      discount_factor (float): Discount factor to quantify the importance of future reward.
      episodes (int): Number of episodes we should train.
      min_epsilon_allowed (float): Minimum epsilon that we should reach by the end of the training.
      initial_epsilon_value (float): Initial epsilon that we should use while starting the learning. 

  Returns:
      tuple[DecentralizedQLearningAgent, list, list]: Returns a tuple containing agent,
                                                  cumulative rewards across episodes,
                                                  epsilon used across episodes respectively.
  """
  
  # Agent.
  agent = DecentralizedQLearningAgent(env, learning_rate=learning_rate, discount_factor = discount_factor)
  
  # Initiating epsilon values.
  epsilon = initial_epsilon_value
  min_epsilon_allowed = min_epsilon_allowed
  
  # Calculating Epsilon Decay factor. 
  epsilon_decay_factor = np.power(min_epsilon_allowed/epsilon, 1/episodes)
  
  # Initiating list to store rewards and epsilons we use across episodes
  reward_bat_across_episodes = []
  reward_robin_across_episodes = []
  
  epsilons_across_episodes = []
  
  # Iterating over Episodes.
  for _ in range(episodes):
    # Resetting the environment.
    obs, _ = env.reset()
    terminated = False
    
    # Fectcing Current State and Current Action details.
    try:
      current_state_bat = np.where(obs == 2)
    except Exception as e:
      print("An error occurred:", e)
    
    #current_state_robin
    try:
      current_state_robin = np.where(obs == 1)
    except Exception as e:
      print("An error occurred:", e)
      
    current_action_bat, current_action_robin = agent.step(current_state_bat, current_state_robin, epsilon)
    
    agent.step(state_bat =  current_state_bat, state_robin = current_state_robin, epsilon = epsilon)
    
    reward_per_episode_bat = 0
    reward_per_episode_robin = 0
    
    epsilons_across_episodes.append(epsilon)
    
    # Iterating over an epsidoe untill termination status is reached.
    while not terminated:
      obs, reward_bat, reward_robin, terminated, init_bat_pos, init_robin_pos, info = env.step(current_action_bat, current_action_robin)

      # Process new observations to get future states
      future_state_bat = process_state(obs, 2)  # 2 represents batman
      future_state_robin = process_state(obs, 1)  # 1 represents robin

      # Ensure the future states are valid
      if not is_valid_state(future_state_bat):
          future_state_bat = handle_invalid_state()

      if not is_valid_state(future_state_robin):
          future_state_robin = handle_invalid_state()

      # Updating cumulative rewards for the episode
      reward_per_episode_bat += reward_bat
      reward_per_episode_robin += reward_robin

      # Decide the next action based on the future state
      future_action_bat, future_action_robin = agent.step(future_state_bat, future_state_robin, epsilon)

      # Updating Q values
      agent.update_qvalue("batman", current_state_bat, current_action_bat, reward_bat, future_state_bat)
      agent.update_qvalue("robin", current_state_robin, current_action_robin, reward_robin, future_state_robin)

      # Updating current states and actions for the next iteration
      current_state_bat = future_state_bat
      current_action_bat = future_action_bat
      current_state_robin = future_state_robin
      current_action_robin = future_action_robin
      
      # Fetching future state and future reward.
      try:
        future_state_bat = np.where(obs == 2)
        print('reward_bat',reward_bat)
        print(future_state_bat[0])
        print(future_state_bat[0].shape[0] == 0)
        if reward_bat ==  20:
          future_state_bat = (np.array([44]),)
        elif future_state_bat[0].shape[0] == 0:
          future_state_bat = np.where(obs == 1)
          print('future_state_bat', future_state_bat)
      except Exception as e:
        print("An error occurred:", e)
      
      try:
        future_state_robin = np.where(obs == 1)
        print('reward_robin',reward_robin)
        print(future_state_robin[0])
        print(future_state_robin[0].shape[0] == 0)
        if reward_robin ==  20:
          future_state_robin = (np.array([44]),)
        elif future_state_robin[0].shape[0] == 0:
          future_state_robin = np.where(obs == 2)
          print('future_state_robin', future_state_robin)
      except Exception as e:
        print("An error occurred:", e)
    
    # Decaying Epsilon
    epsilon = epsilon_decay_factor*epsilon
    reward_bat_across_episodes.append(reward_per_episode_bat)
    reward_robin_across_episodes.append(reward_per_episode_robin)

  return agent, reward_bat_across_episodes, reward_robin_across_episodes, epsilons_across_episodes

In [None]:
env = GothamCityCooperative(100)

In [None]:
agent, reward_bat_across_episodes, reward_robin_across_episodes, epsilons_across_episodes = decentralized_q_learning_learning_loop(env,learning_rate = 0.5, discount_factor = 0.99, episodes = 1000, min_epsilon_allowed = 0.01, initial_epsilon_value = 1)

with open('decentralized_qlearning.pkl', 'wb') as f:
    pickle.dump(agent, f)

## Evaluation:

In [None]:

import numpy as np
import matplotlib.pyplot as plt

def evaluate_agents(env, q_tables, num_episodes=10):
    total_rewards_bat_eval = []
    total_rewards_robin_eval = []
    
    for episode in range(num_episodes):
        obs = env.reset()
        states = [process_state(obs, 2), process_state(obs, 1)]  # 2 and 1 are IDs for Batman and Robin
        done = False
        total_reward_bat = 0
        total_reward_robin = 0

        while not done:
            # Choose actions greedily
            actions = [np.argmax(q_tables[i][states[i]]) if states[i] >= 0 else env.action_space.sample() for i in range(len(states))]

            # Environment step with unpacked actions
            obs, reward_bat, reward_robin, done, _ = env.step(*actions)

            # Update states
            next_states = [process_state(obs, 2), process_state(obs, 1)]
            states = next_states

            total_reward_bat += reward_bat
            total_reward_robin += reward_robin

        total_rewards_bat_eval.append(total_reward_bat)
        total_rewards_robin_eval.append(total_reward_robin)

    return total_rewards_bat_eval, total_rewards_robin_eval

# Plotting the evaluation results
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(eval_rewards_bat, label='Batman')
plt.plot(eval_rewards_robin, label='Robin')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Evaluation: Total Rewards per Episode')
plt.legend()

plt.subplot(1, 2, 2)
plt.bar(range(10), eval_rewards_bat, label='Batman', alpha=0.6)
plt.bar(range(10), eval_rewards_robin, label='Robin', alpha=0.6)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Evaluation: Rewards Comparison per Episode')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
import numpy as np

# Define the NumPy array
numpy_array = np.array([2., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
                        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
                        0., 0., 0., 0., 0., 0., 0., 0., 0.5, 0., 0., 0., 0., 0., 0., 0., 0., 0.,
                        0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
                        0., 0., 0., 0., 0., 0., 0., 0., 0.])

np.where(numpy_array == 0.5)

### Co-Related Q-Learning:

In [None]:
import numpy as np
import random
import matplotlib.pyplot as plt

def initialize_q_tables(env, num_agents):
    q_tables = [np.zeros((env.observation_space.n, env.action_space.n)) for _ in range(num_agents)]
    return q_tables

def choose_action(q_table, state, epsilon):
    if random.uniform(0, 1) < epsilon:
        return env.action_space.sample()
    else:
        if isinstance(state, dict):
            return env.action_space.sample()
        else:
            state_idx = int(np.argmax(state))
            return np.argmax(q_table[state_idx])

def update_q_values(q_tables, states, actions, rewards, next_states, alpha, gamma):
    ce_strategy = [0 for _ in q_tables]

    for q_table, state, action, reward, next_state in zip(q_tables, states, actions, rewards, next_states):
        if isinstance(state, dict):
            state_idx = -1 
        else:
            state_idx = int(np.argmax(state))

        action_idx = int(action)

        q_table[state_idx, action_idx] = q_table[state_idx, action_idx] + alpha * (reward + gamma * np.max(q_table[next_state]) - q_table[state_idx, action_idx])
        
def process_state(observation, agent_id):
    """
    Convert the observation to a unique integer state index.
    Adjust this based on the specifics of the environment's observation space.
    """
    if isinstance(observation, tuple):
        for obs in observation:
            if hasattr(obs, 'flatten'):
                observation_1d = obs.flatten()
                agent_indices = np.where(observation_1d == agent_id)[0]
                if agent_indices.size > 0:
                    return int(agent_indices[0])
    else:
        pass

    return -1
    
max_timesteps = 100
env = GothamCityCooperative(max_timesteps)
num_agents = 2
q_tables = initialize_q_tables(env, num_agents)
alpha = 0.1  # Learning rate
gamma = 0.9  # Discount factor
epsilon = 1.0  # Initial epsilon
episodes = 1000

total_rewards_bat = []
total_rewards_robin = []
epsilon_values = []

batman_id = 2 
robin_id = 1

for episode in range(episodes):
    obs = env.reset()
    states = env.reset()
    done = None
    total_reward_bat = 0
    total_reward_robin = 0

    while not done:
        if states is not None:
            actions = [choose_action(q_table, state, epsilon) for q_table, state in zip(q_tables, states)]
        else:
            actions = [env.action_space.sample() for _ in range(num_agents)]

        state_idx = int(np.argmax(state))
        step_output = env.step(*actions)
        obs, reward_bat, reward_robin, done = step_output[:4]

        if states is not None:
            next_states = [process_state(obs, batman_id), process_state(obs, robin_id)]
            update_q_values(q_tables, states, actions, [reward_bat, reward_robin], next_states, alpha, gamma)
            states = next_states

        total_reward_bat += reward_bat
        total_reward_robin += reward_robin

    total_rewards_bat.append(total_reward_bat)
    total_rewards_robin.append(total_reward_robin)
    epsilon_values.append(epsilon)
    epsilon = max(epsilon * 0.99, 0.01)

# Plotting the results
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(total_rewards_bat, label='Batman')
plt.plot(total_rewards_robin, label='Robin')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Total Rewards per Episode')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epsilon_values)
plt.xlabel('Episode')
plt.ylabel('Epsilon')
plt.title('Epsilon Decay over Episodes')
plt.tight_layout()
plt.show()

## Evaluation:

In [None]:
def evaluate_agents(env, q_tables, num_episodes=10):
    total_rewards_bat_eval = []
    total_rewards_robin_eval = []
    
    for episode in range(num_episodes):
        obs = env.reset()
        states = [process_state(obs, batman_id), process_state(obs, robin_id)]
        done = None
        total_reward_bat = 0
        total_reward_robin = 0

        while not done:
            actions = [np.argmax(q_table[state]) if state >= 0 else env.action_space.sample() for q_table, state in zip(q_tables, states)]
            obs, reward_bat, reward_robin, done = env.step(*actions)[:4]
            next_states = [process_state(obs, batman_id), process_state(obs, robin_id)]
            states = next_states

            total_reward_bat += reward_bat
            total_reward_robin += reward_robin

        total_rewards_bat_eval.append(total_reward_bat)
        total_rewards_robin_eval.append(total_reward_robin)

    return total_rewards_bat_eval, total_rewards_robin_eval

eval_rewards_bat, eval_rewards_robin = evaluate_agents(env, q_tables, num_episodes=10)

# Plotting the evaluation results
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(eval_rewards_bat, label='Batman')
plt.plot(eval_rewards_robin, label='Robin')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Evaluation: Total Rewards per Episode')
plt.legend()

plt.subplot(1, 2, 2)
plt.bar(range(10), eval_rewards_bat, label='Batman', alpha=0.6)
plt.bar(range(10), eval_rewards_robin, label='Robin', alpha=0.6)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Evaluation: Rewards Comparison per Episode')
plt.legend()

plt.tight_layout()
plt.show()


## REINFORCE:

In [None]:
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.distributions import Categorical
import numpy as np
import pickle

class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        logits = self.fc2(x)
        probabilities = torch.softmax(logits, dim=-1)
        return probabilities

def reinforce(env, policy_batman, policy_robin, episodes, gamma=0.99):
    optimizer_batman = torch.optim.Adam(policy_batman.parameters(), lr=0.01)
    optimizer_robin = torch.optim.Adam(policy_robin.parameters(), lr=0.01)
    
    total_rewards_batman = []
    total_rewards_robin = []
    
    for episode in range(episodes):
        state, info = env.reset()
        state = np.array(state).flatten()
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        
        state = np.array(state).flatten()
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        log_probs_batman = []
        log_probs_robin = []
        rewards_batman = []
        rewards_robin = []
        done = False

        while not done:
            action_probs_batman = policy_batman(state_tensor)
            action_probs_robin = policy_robin(state_tensor)

            distribution_batman = Categorical(action_probs_batman)
            distribution_robin = Categorical(action_probs_robin)

            action_batman = distribution_batman.sample()
            action_robin = distribution_robin.sample()

            output = env.step(action_batman.item(), action_robin.item())

            next_state, reward_batman, reward_robin, done, *extra = env.step(action_batman.item(), action_robin.item())
            next_state = np.array(next_state).flatten()
            state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
            
            log_prob_batman = distribution_batman.log_prob(action_batman)
            log_prob_robin = distribution_robin.log_prob(action_robin)

            log_probs_batman.append(log_prob_batman)
            log_probs_robin.append(log_prob_robin)
            rewards_batman.append(reward_batman)
            rewards_robin.append(reward_robin)

        update_policy(optimizer_batman, log_probs_batman, rewards_batman, gamma)
        update_policy(optimizer_robin, log_probs_robin, rewards_robin, gamma)

        total_rewards_batman.append(sum(rewards_batman))
        total_rewards_robin.append(sum(rewards_robin))

        print(f"Episode {episode} finished with total reward Batman: {sum(rewards_batman):.2f}, Robin: {sum(rewards_robin):.2f}")

    plot_rewards(total_rewards_batman, total_rewards_robin)


def update_policy(optimizer, log_probs, rewards, gamma):
    discounted_rewards = [gamma ** i * r for i, r in enumerate(rewards)]
    policy_loss = -torch.sum(torch.stack(log_probs) * torch.tensor(discounted_rewards))
    optimizer.zero_grad()
    policy_loss.backward()
    optimizer.step()

def plot_rewards(total_rewards_batman, total_rewards_robin):

# Plotting the results
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(total_rewards_batman, label='Batman')
    plt.plot(total_rewards_robin, label='Robin')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Total Rewards per Episode')
    plt.legend()

input_dim = 81
hidden_dim = 128 
output_dim = env.action_space.n 

policy_batman = PolicyNetwork(input_dim, hidden_dim, output_dim)
policy_robin = PolicyNetwork(input_dim, hidden_dim, output_dim)

env = GothamCityCooperative(max_timesteps=100)
input_dim = env.observation_space.n
action_dim = env.action_space.n

def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
        m.bias.data.fill_(0.01)

policy_batman.apply(init_weights)
policy_robin.apply(init_weights)

reinforce(env, policy_batman, policy_robin, episodes=1000)

## Evaluation:

In [None]:
import torch
import matplotlib.pyplot as plt
from torch.distributions import Categorical

def evaluate_agents(env, policy_batman, policy_robin, num_episodes=10):
    total_rewards_bat_eval = []
    total_rewards_robin_eval = []
    
    for episode in range(num_episodes):
        output = env.reset() 
        state = output[0] if isinstance(output, tuple) else output
        
        state = np.array(state).flatten()
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        done = False
        total_reward_bat = 0
        total_reward_robin = 0

        while not done:
            with torch.no_grad():
                action_probs_batman = policy_batman(state_tensor)
                action_probs_robin = policy_robin(state_tensor)
            
            distribution_batman = Categorical(action_probs_batman)
            distribution_robin = Categorical(action_probs_robin)
            action_batman = distribution_batman.sample()
            action_robin = distribution_robin.sample()

            next_state, reward_bat, reward_robin, done, *extra = env.step(action_batman.item(), action_robin.item())
            next_state = np.array(next_state).flatten()
            state_tensor = torch.FloatTensor(next_state).unsqueeze(0)

            total_reward_bat += reward_bat
            total_reward_robin += reward_robin

        total_rewards_bat_eval.append(total_reward_bat)
        total_rewards_robin_eval.append(total_reward_robin)

    return total_rewards_bat_eval, total_rewards_robin_eval

eval_rewards_bat, eval_rewards_robin = evaluate_agents(env, policy_batman, policy_robin, num_episodes=10)

# Plotting the evaluation results
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(eval_rewards_bat, label='Batman')
plt.plot(eval_rewards_robin, label='Robin')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Evaluation: Total Rewards per Episode')
plt.legend()

plt.subplot(1, 2, 2)
plt.bar(range(10), eval_rewards_bat, label='Batman', alpha=0.6)
plt.bar(range(10), eval_rewards_robin, label='Robin', alpha=0.6)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Evaluation: Rewards Comparison per Episode')
plt.legend()

plt.tight_layout()
plt.show()


# MADDQN

In [None]:
from PIL import Image
import io

class GothamCityCooperative(gymnasium.Env):
    def __init__(self, max_timesteps):
        # Environment Details
        self.grid_size = (9,9)
        
        ## Initializing the Observation Space and Action Space
        self.observation_space = spaces.Discrete(self.grid_size[0]* self.grid_size[1])
        self.action_space = spaces.Discrete(4)
        self.max_timesteps = max_timesteps
        self.time_step = 0
        
        ## Actor Positions
        self.bat_pos = [0,0]
        self.robin_pos = [8,0]
        self.selina_pos = (4,8)
        
        ## Negative Actors
        self.joker_pos = (2,4)
        self.arkham_pos = (4,3)
        self.bane_pos = (1,8)
        self.owl_pos = (4,7)
        self.scarecrow_pos = (6,2)
        
        ## Postive Actors
        self.batmobile_pos = (3,1)
        self.redbird_pos = (6,3)
        self.alfred_pos = ((1,3), (3,5), (7,6), (8,8))
        # Load Images
        self.batman_image = mpimg.imread('images/bat.png')
        self.alfred_image = mpimg.imread('images/alfred.jpg')
        self.robin_image = mpimg.imread('images/robin.jpg')
        self.batmobile_image = mpimg.imread('images/batmobile.png')
        self.redbird_image = mpimg.imread('images/red_bird.jpg')
        
        self.joker_image = mpimg.imread('images/joker.jpg')
        self.arkham_image = mpimg.imread('images/arkham_asylum.jpg')
        self.bane_image = mpimg.imread('images/bane.jpg')
        self.bane_action = mpimg.imread('images/bane_breaks_bats.jpg')
        self.owl_image = mpimg.imread('images/court_of_owls.jpg')
        self.owl_action = mpimg.imread('images/court_of_owls_attack.png')
        self.scarecrow_image = mpimg.imread('images/scare_crow.png')
        self.scarecrow_action = mpimg.imread('images/scarecrow_attack.jpeg')
        
        self.selina_image = mpimg.imread('images/selina.png')
        self.gotham_image = mpimg.imread('images/gotham.jpg')
        
        # Initializing the State of the environment.
        self.state = np.zeros(self.grid_size)
        self.state[tuple(self.bat_pos)] = 2
        self.state[tuple(self.robin_pos)] = 1
        self.state[tuple(self.selina_pos)] = 0.5
    
    def reset(self, **kwargs):
        self.bat_pos = [0,0]
        self.robin_pos = [8,0]
        self.time_step = 0
        
        self.state = np.zeros(self.grid_size)
        self.state[tuple(self.bat_pos)] = 2
        self.state[tuple(self.robin_pos)] = 1
        
        observation = self.state.flatten()
        
        info = {}
        info['Termination Message'] = 'Not Terminated'
        
        return self.bat_pos,self.robin_pos, info
    
    def get_reward(self, agent_pos, agent_old_pos, agent_name):

        if np.array_equal(agent_pos, self.selina_pos):
            # Assigining the reward of +20 on reaching the selina position.
            return 20
        elif np.array_equal(agent_pos, agent_old_pos):
            # Assigining the reward of -1 on statying the same position after action.
            return -1
        elif np.array_equal(agent_pos, self.joker_pos):
            # Assigining the reward of -10 on reaching the joker position.
            return -10
        elif np.array_equal(agent_pos, self.owl_pos):
            # Assigining the reward of -7 on reaching the court of owls position.
            return -7
        elif np.array_equal(agent_pos, self.bane_pos):
            # Assigining the reward of -5 on reaching the bane position.
            return -5
        elif np.array_equal(agent_pos, self.scarecrow_pos):
            # Assigining the reward of -5 on reaching the bane position.
            return -3
        elif agent_pos in self.alfred_pos:
            # Assigining the reward of -5 on reaching the Alfred position.
            return 10
        elif np.array_equal(agent_pos, self.arkham_pos):
            # Assigining the reward of -1 on reaching the Arkham Asylum and moving the agent to starting position.
            if agent_name == 'batman':
                self.bat_pos = [0,0]
            elif agent_name == 'robin':
                self.robin_pos = [8,0]
            return -1
        elif np.array_equal(agent_pos, self.batmobile_pos):
            # Assigining the reward of -1 on reaching the Arkham Asylum and moving the agent to starting position.
            if agent_name == 'batman':
                self.bat_pos = list(random.choice(self.alfred_pos))
                return 10
            elif agent_name == 'robin':
                return 0
        elif np.array_equal(agent_pos, self.redbird_pos):
            # Assigining the reward of -1 on reaching the Arkham Asylum and moving the agent to starting position.
            if agent_name == 'batman':
                self.bat_pos = list(random.choice(self.alfred_pos))
                return 0
            elif agent_name == 'robin':
                self.robin_pos = list(random.choice(self.alfred_pos))
                return 10
        else: return 0
    
    
    def step(self, bat_action, robin_action):
        reward_bat = 0
        reward_robin = 0
        bat_truncated = False
        robin_truncated = False
        self.state = np.zeros(self.grid_size)
        # print('init_bat_pos', tuple(self.bat_pos) != self.selina_pos)
        # print('init_robin_pos', tuple(self.robin_pos) != self.selina_pos)
        if tuple(self.bat_pos) != self.selina_pos:
            bat_old_pos = self.bat_pos.copy()
            if bat_action == 0:
                # Take Down action.
                self.bat_pos[0] += 1
            elif bat_action == 1:
                # Take Up action.
                self.bat_pos[0] -= 1
            elif bat_action == 2:
                # Take Right action.
                self.bat_pos[1] += 1
            elif bat_action == 3:
                # Take Left action.
                self.bat_pos[1] -= 1
            
            self.bat_pos[0] = np.clip(self.bat_pos[0], 0, self.grid_size[0]-1)
            self.bat_pos[1] = np.clip(self.bat_pos[1], 0, self.grid_size[1]-1)
            
            reward_bat = self.get_reward(tuple(self.bat_pos), bat_old_pos, 'batman')
            
            self.state[tuple(self.bat_pos)] = 2
            
            if (self.bat_pos[0] >=0) & (self.bat_pos[0] <= self.grid_size[0]-1) & (self.bat_pos[1] >=0)  & (self.bat_pos[1] <= self.grid_size[1]-1):
                bat_truncated = True
            else:
                bat_truncated = False
        
        if tuple(self.robin_pos) != self.selina_pos:
            robin_old_pos = self.robin_pos.copy()
            if robin_action == 0:
                # Take Down action.
                self.robin_pos[0] += 1
            elif robin_action == 1:
                # Take Up action.
                self.robin_pos[0] -= 1
            elif robin_action == 2:
                # Take Right action.
                self.robin_pos[1] += 1
            elif robin_action == 3:
                # Take Left action.
                self.robin_pos[1] -= 1
            
            self.robin_pos[0] = np.clip(self.robin_pos[0], 0, self.grid_size[0]-1)
            self.robin_pos[1] = np.clip(self.robin_pos[1], 0, self.grid_size[1]-1)
            
            reward_robin = self.get_reward(tuple(self.robin_pos), robin_old_pos, 'robin')
            
            self.state[tuple(self.robin_pos)] = 1
            
            if (self.robin_pos[0] >=0) & (self.robin_pos[0] <= self.grid_size[0]-1) & (self.robin_pos[1] >=0)  & (self.robin_pos[1] <= self.grid_size[1]-1):
                robin_truncated = True
            else:
                robin_truncated = False
        else:
            reward_robin = 20
        
        self.state[tuple(self.selina_pos)] = 0.5
        observation = self.state.flatten()
        
        # Updating the time step.
        self.time_step += 1

        info = {}

        # Updating the episode termination status.
        if np.array_equal(self.bat_pos, self.robin_pos) and np.array_equal(self.bat_pos, self.selina_pos) and np.array_equal(self.robin_pos, self.selina_pos):
            # Goal position is reached.
            terminated = True
            info['Termination Message'] = 'Goal Position Reached !!!'
        elif self.time_step >= self.max_timesteps:
            # Maximum time steps reached.
            terminated = True
            info['Termination Message'] = 'Maximum Time Reached'
        else:
            # Episode not terminated.
            terminated = False
            info['Termination Message'] = 'Not Terminated'
            
        return self.robin_pos,self.bat_pos, reward_bat, reward_robin, terminated, bat_truncated, robin_truncated, info

    
    def render(self):
        plt.figure(figsize=(8, 8))
        fig , ax = plt.subplots()
        
        ax.imshow(self.gotham_image, extent=[0, self.grid_size[1], 0, self.grid_size[0]])
        
        ax.imshow(self.batman_image, extent=[self.bat_pos[1], self.bat_pos[1] + 1, self.grid_size[0] - self.bat_pos[0] - 1, self.grid_size[0] - self.bat_pos[0]])
        ax.imshow(self.robin_image, extent=[self.robin_pos[1], self.robin_pos[1] + 1, self.grid_size[0] - self.robin_pos[0] - 1, self.grid_size[0] - self.robin_pos[0]])
        
        ax.imshow(self.batmobile_image, extent=[self.batmobile_pos[1], self.batmobile_pos[1] + 1, self.grid_size[0] - self.batmobile_pos[0] - 1, self.grid_size[0] - self.batmobile_pos[0]])
        ax.imshow(self.redbird_image, extent=[self.redbird_pos[1], self.redbird_pos[1] + 1, self.grid_size[0] - self.redbird_pos[0] - 1, self.grid_size[0] - self.redbird_pos[0]])
        
        ax.imshow(self.joker_image, extent=[self.joker_pos[1], self.joker_pos[1] + 1, self.grid_size[0] - self.joker_pos[0] - 1, self.grid_size[0] - self.joker_pos[0]])
        ax.imshow(self.arkham_image, extent=[self.arkham_pos[1], self.arkham_pos[1] + 1, self.grid_size[0] - self.arkham_pos[0] - 1, self.grid_size[0] - self.arkham_pos[0]])
        
        if np.array_equal(self.bat_pos, self.bane_pos) or np.array_equal(self.robin_pos, self.bane_pos):
            ax.imshow(self.bane_action, extent=[self.bane_pos[1], self.bane_pos[1] + 1, self.grid_size[0] - self.bane_pos[0] - 1, self.grid_size[0] - self.bane_pos[0]])
        else:
            ax.imshow(self.bane_image, extent=[self.bane_pos[1], self.bane_pos[1] + 1, self.grid_size[0] - self.bane_pos[0] - 1, self.grid_size[0] - self.bane_pos[0]])
        
        if np.array_equal(self.bat_pos, self.owl_pos) or np.array_equal(self.robin_pos, self.owl_pos):
            ax.imshow(self.owl_action, extent=[self.owl_pos[1], self.owl_pos[1] + 1, self.grid_size[0] - self.owl_pos[0] - 1, self.grid_size[0] - self.owl_pos[0]])
        else:
            ax.imshow(self.owl_image, extent=[self.owl_pos[1], self.owl_pos[1] + 1, self.grid_size[0] - self.owl_pos[0] - 1, self.grid_size[0] - self.owl_pos[0]])
        
        if np.array_equal(self.bat_pos, self.scarecrow_pos) or np.array_equal(self.robin_pos, self.scarecrow_pos):
            ax.imshow(self.scarecrow_image, extent=[self.scarecrow_pos[1], self.scarecrow_pos[1] + 1, self.grid_size[0] - self.scarecrow_pos[0] - 1, self.grid_size[0] - self.scarecrow_pos[0]])
        else:
            ax.imshow(self.scarecrow_image, extent=[self.scarecrow_pos[1], self.scarecrow_pos[1] + 1, self.grid_size[0] - self.scarecrow_pos[0] - 1, self.grid_size[0] - self.scarecrow_pos[0]])
        
        ax.imshow(self.selina_image, extent=[self.selina_pos[1], self.selina_pos[1] + 1, self.grid_size[0] - self.selina_pos[0] - 1, self.grid_size[0] - self.selina_pos[0]])
        
        for pos in self.alfred_pos:
            ax.imshow(self.alfred_image, extent=[pos[1], pos[1] + 1, self.grid_size[0] - pos[0] - 1, self.grid_size[0] - pos[0]])
        ax.set_xlim(0, self.grid_size[1])
        ax.set_ylim(0, self.grid_size[0])
        
        ax.axis('off')
        # Save rendered image directly to a buffer
        buf = io.BytesIO()
        fig.savefig(buf, format='png', dpi=fig.dpi, bbox_inches='tight', pad_inches=0)
        buf.seek(0)

        # Convert buffer to PIL Image and then to RGB array
        image = Image.open(buf)
        rgb_image = np.array(image.convert('RGB'))
        
        plt.close(fig)  # Close the figure to prevent it from being displayed
        return rgb_image

In [None]:
np.random.choice(2)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import namedtuple, deque

# Define neural network architecture for Q-network
class QNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size,128)
        self.fc2 = nn.Linear(128,400)
        self.fc3 = nn.Linear(400,64)
        self.fc4 = nn.Linear(64, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.fc4(x)
        return x

# Define replay buffer
class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer = deque(maxlen=buffer_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

# Define MADDQN agent
class MADDQNAgent:
    def __init__(self, input_size, output_size, buffer_size, lr=0.001, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
        self.input_size = input_size
        self.output_size = output_size
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.q_network = QNetwork(input_size, output_size).to(self.device)
        self.target_q_network = QNetwork(input_size, output_size).to(self.device)
        self.target_q_network.load_state_dict(self.q_network.state_dict())
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        
        self.replay_buffer = ReplayBuffer(buffer_size)

    def select_action(self,state,train):
        if train:
            if np.random.rand() < self.epsilon:
                return random.randrange(self.output_size)
            with torch.no_grad():
                state = torch.FloatTensor(state).to(self.device)
                q_values = self.q_network(state)
                return torch.argmax(q_values).item()
        else:
             with torch.no_grad():
                state = torch.FloatTensor(state).to(self.device)
                q_values = self.q_network(state)
                return torch.argmax(q_values).item()

    def train(self, state, action, reward, next_state, done):
        self.replay_buffer.add((state, action, reward, next_state, done))

        if len(self.replay_buffer) > BATCH_SIZE:
            states, actions, rewards, next_states, dones = self.replay_buffer.sample(BATCH_SIZE)
            states = torch.FloatTensor(states).to(self.device)
            actions = torch.LongTensor(actions).to(self.device)
            rewards = torch.FloatTensor(rewards).to(self.device)
            next_states = torch.FloatTensor(next_states).to(self.device)
            dones = torch.FloatTensor(dones).to(self.device)

            q_values = self.q_network(states)
            next_q_values = self.target_q_network(next_states).detach()

            target_q_values = rewards + (1 - dones) * self.gamma * torch.max(next_q_values, dim=1)[0]

            q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)

            loss = self.loss(q_value, target_q_values)

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def update_target_network(self):
        self.target_q_network.load_state_dict(self.q_network.state_dict())

In [None]:
# Define environment and hyperparameters
env = GothamCityCooperative(1000)
BUFFER_SIZE = 10000
BATCH_SIZE = 64
INPUT_SIZE = 2
OUTPUT_SIZE = env.action_space.n  # Assuming 4 actions (up, down, left, right)

# Create two agents
agent1 = MADDQNAgent(INPUT_SIZE, OUTPUT_SIZE, BUFFER_SIZE)
agent2 = MADDQNAgent(INPUT_SIZE, OUTPUT_SIZE, BUFFER_SIZE)

total_bat_reward = []
total_robin_reward = []

NUM_EPISODES = 1000
# Training loop
for episode in range(NUM_EPISODES):
    # Initialize environment and other variables
    done = False
    bat_truncated = False
    robin_truncated = False
    episode_reward1 = 0
    episode_reward2 = 0
    state1,state2,_ = env.reset()
    step = 0

    while not done:
        # Agent 1 selects action
        action1 = agent1.select_action(state1,train=True)

        # Agent 2 selects action
        action2 = agent2.select_action(state2,train=True)

        next_state1,next_state2,bat_reward,robin_reward,done,bat_truncated,robin_truncated,_ = env.step(action1, action2)

        # Train Agent 1
        agent1.train(state1, action1,bat_reward,next_state1, done)

        # Train Agent 2
        agent2.train(state2, action2,robin_reward, next_state1, done)

        # Update states
        state1 = next_state1
        state2 = next_state2

        # Update episode rewards
        episode_reward1 += bat_reward
        episode_reward2 += robin_reward

        if step%4==0:
            agent1.update_target_network()
            agent2.update_target_network()

        step += 1

    total_bat_reward.append(episode_reward1)
    total_robin_reward.append(episode_reward2)

    print('Episode: ',episode,'Agent-1(Batman)reward:',episode_reward1,'Agent-2(Robin)reward:',episode_reward2)

In [None]:
plt.plot(total_bat_reward, label='Batman')
plt.plot(total_robin_reward, label='Robin')
plt.title('Rewards gained during training')
plt.xlabel('Episodes')
plt.ylabel('Rewards')
plt.legend()

In [None]:
torch.save(agent1.target_q_network.state_dict(),'Agent-1_DDQN.pth')
torch.save(agent2.target_q_network.state_dict(),'Agent-2_DDQN.pth')

In [None]:
# agent1 = MADDQNAgent(INPUT_SIZE,OUTPUT_SIZE,BUFFER_SIZE)
# agent2 = MADDQNAgent(INPUT_SIZE,OUTPUT_SIZE,BUFFER_SIZE)
agent1.q_network.load_state_dict(torch.load('Agent-1_DDQN.pth'))
agent2.q_network.load_state_dict(torch.load('Agent-2_DDQN.pth'))

In [None]:
agent1 = MADDQNAgent(INPUT_SIZE,OUTPUT_SIZE,BUFFER_SIZE)
agent2 = MADDQNAgent(INPUT_SIZE,OUTPUT_SIZE,BUFFER_SIZE)
env = GothamCityCooperative(1000)
test_bat_reward = []
test_robin_reward = []
for i in range(10):
    state1,state2,_ = env.reset()
    done = False
    episode_bat_reward = 0
    episode_robin_reward = 0
    while not done:
        action1 = agent1.select_action(state1,train=False)

        # Agent 2 selects action
        action2 = agent2.select_action(state2,train=False)
        
        next_state1,next_state2,bat_reward,robin_reward,done,bat_truncated,robin_truncated,_ = env.step(action1, action2)
        state1 = next_state1
        state2 = next_state2
        episode_bat_reward += bat_reward
        episode_robin_reward += robin_reward

    test_bat_reward.append(episode_bat_reward)
    test_robin_reward.append(episode_robin_reward)


In [None]:
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(test_bat_reward,label='Batman')
plt.title('Testing MADDQN on Multi-agent Grid World environment')
plt.xlabel('Episodes')
plt.ylabel('Rewards')
plt.legend()

plt.subplot(1,2,2)
plt.plot(test_robin_reward,label='Robin')
plt.title('Testing MADDQN on Multi-agent Grid World environment')
plt.xlabel('Episodes')
plt.ylabel('Rewards')
plt.legend()