In [1]:
"""
Multi-Agent Dynamic Grid World Environment
Created by: Ardianto Wibowo
"""

import numpy as np
import sys

# Add the path to the 'env' folder to sys.path
sys.path.append('env')

from ma_gridworld import Env

class QLearningAgent:
    def __init__(self, num_actions, alpha=0.1, gamma=0.9, epsilon=0.1):
        self.num_actions = num_actions
        self.q_table = {}  # Dictionary to store Q-values for each (x, y) coordinate
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon

    def select_action(self, state):
        """Select an action using epsilon-greedy policy."""
        # Initialize Q-values if state is not in the q_table
        if state not in self.q_table:
            self.q_table[state] = [0] * self.num_actions
        
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.num_actions)  # Explore
        else:
            return np.argmax(self.q_table[state])  # Exploit

    def update_q_table(self, state, action, reward, next_state):
        """Update the Q-value for the given state-action pair."""
        # Initialize Q-values if state or next_state is not in the q_table
        if state not in self.q_table:
            self.q_table[state] = [0] * self.num_actions
        if next_state not in self.q_table:
            self.q_table[next_state] = [0] * self.num_actions
        
        best_next_action = np.argmax(self.q_table[next_state])
        self.q_table[state][action] += self.alpha * (
            reward + self.gamma * self.q_table[next_state][best_next_action] - self.q_table[state][action]
        )

    def decay_epsilon(self, epsilon_decay, epsilon_min):
        """Decay epsilon after each episode."""
        self.epsilon = max(self.epsilon * epsilon_decay, epsilon_min)


def get_action(agent_id, observation, num_actions, agents, env):
    """
    This method provides an action chosen by each agent's Q-table:
    1: up, 2: down, 3: left, 4: right, 0: stay
    """
    coordinate_observation = tuple(observation[0])  # Keep observation as (x, y) tuple

    #optional observation data may be used, depend on the agent needs.
    win_state_observation = observation[1]
    sensor_data_observation = observation[2]
    comm_observation = observation[3]

    physical_action = agents[agent_id].select_action(coordinate_observation)

    if env.is_agent_silent:
        comm_action = []  # No communication if agent is silent
    else:
        comm_action = np.random.choice(num_actions)  # Example random communication action

    return physical_action, comm_action


def run(num_episodes, max_steps_per_episode, agents, num_actions, env, epsilon_decay=0.95, epsilon_min=0.01):
    for episode in range(num_episodes):
        print(f"Starting episode {episode + 1}")
        
        observations = env.reset()  # Reset the environment at the start of each episode
        done = [False] * env.num_agents
        step_count = 0

        while not all(done) and step_count < max_steps_per_episode:
            actions = []
            next_observations = []

            for agent_id in range(env.num_agents):
                observation = observations[agent_id]
                action = get_action(agent_id, observation, num_actions, agents, env)
                
                actions.append(action)
                next_observations.append(observation)

            next_observations, rewards, done = env.step(actions)
            step_count += 1

            # Update Q-tables for each agent
            for agent_id in range(env.num_agents):
                observation = observations[agent_id]
                coordinate_observation = tuple(observation[0])  # Use (x,y) tuple as observation data

                reward = rewards[agent_id]

                action = actions[agent_id]
                physical_action = action[0]
                
                next_observation = next_observations[agent_id]
                coordinate_next_observation = tuple(next_observation[0])  # Use (x,y) tuple as next observation data
                
                #optional next observation data may be used, depend on the agent needs.
                win_state_next_observation = next_observation[1]
                sensor_data_next_observation = next_observation[2]
                comm_next_observation = next_observation[3]


                agents[agent_id].update_q_table(coordinate_observation, physical_action, reward, coordinate_next_observation)

            observations = next_observations
            
            # Render the environment
            env.render()

            print(f"Step {step_count}:")
            for agent_id in range(env.num_agents):
                print(f"  Agent {agent_id}: Observation: {observations}, Action: {actions[agent_id]}, Reward: {rewards[agent_id]}, Done: {done[agent_id]}")

        print(f"Episode {episode + 1} finished after {step_count} steps.\n")

        # Decay epsilon for each agent
        for agent in agents:
            agent.decay_epsilon(epsilon_decay, epsilon_min)



if __name__ == "__main__":

    gsize=15 #grid size (square)
    gpixels=30 #grid cell size in pixels

    is_sensor_active = True #True:  Activate the sensory observation data
    sensory_size = 3 #'is_sensor_active' must be True. The value must be odd, if event will be converted to one level odd number above
    
    num_agents = 3 #the number of agents will be run in paralel
    num_obstacles = 10 #the number of obstacles
    is_single_target = False #True: all agents have a single target, False: each agent has their own target
    num_targets_per_agent = 2 #'is_single_target' must be False to have an effect
    
    is_agent_silent = True #True: communication among agents is allowed

    num_episodes=150 #the number of episode will be run
    max_steps_per_episode=400 #each episode will be stopped when max_step is reached

    eps_moving_targets = 151 #set this value greater than 'num_episodes' to keep the targets in a stationary position
    eps_moving_obstacles = 151 #set this value greater than 'num_episodes' to keep the obstacles in a stationary position

    render = True #True: render the animation into the screen (so far, it is still can not be deactivated)

    min_obstacle_distance_from_target = 1 #min grid distance of each obstacles relative to targets
    max_obstacle_distance_from_target = 5 #max grid distance of each obstacles relative to targets
    min_obstacle_distance_from_agents = 1 #min grid distance of each obstacles relative to agents

    reward_normal = -1 #reward value of normal steps
    reward_obstacle = -5 #reward value when hit an obstacle
    reward_target = 50 #reward value when reach the target

    is_totally_random = False #True: target and obstacles initial as well as movement position is always random on each call, False: only random at the beginning. 
    animation_speed = 0.0000001 #smaller is faster 
    is_destroy_environment = True #True: automatically close the animation after all episodes end.  

    # Initialize environment
    env = Env(
        num_agents=num_agents, num_targets_per_agent=num_targets_per_agent, num_obstacles=num_obstacles,
        eps_moving_obstacles=eps_moving_obstacles, eps_moving_targets=eps_moving_targets,
        is_agent_silent=is_agent_silent, is_single_target=is_single_target, sensory_size=sensory_size,
        gpixels=gpixels, gheight=gsize, gwidth=gsize, is_sensor_active=is_sensor_active,
        min_obstacle_distance_from_target=min_obstacle_distance_from_target,
        max_obstacle_distance_from_target=max_obstacle_distance_from_target,
        min_obstacle_distance_from_agents=min_obstacle_distance_from_agents,
        is_totally_random=is_totally_random, animation_speed=animation_speed,
        reward_normal=reward_normal, reward_obstacle=reward_obstacle, reward_target=reward_target
    )

    alpha=0.1
    gamma=0.9
    epsilon=0.1
    epsilon_decay = 0.95 
    epsilon_min = 0.01
    num_actions = len(env.action_space)

    # Initialize Q-learning agents
    agents = [QLearningAgent(num_actions, alpha, gamma, epsilon) for _ in range(num_agents)]

    # Run episodes
    run(num_episodes, max_steps_per_episode, agents, num_actions, env, epsilon_decay, epsilon_min)

    # Destroy environment if needed
    if is_destroy_environment:
        env.destroy_environment()


Starting episode 1
Step 1:
  Agent 0: Observation: [[[0, 0], False, [[None, None, None], [None, 'agent', 'empty'], [None, 'empty', 'empty']], []], [[14, 0], False, [[None, None, None], ['empty', 'agent', None], ['empty', 'empty', None]], []], [[13, 14], False, [['empty', 'empty', 'empty'], ['empty', 'empty', 'agent'], [None, None, None]], []]], Action: (0, []), Reward: -1, Done: False
  Agent 1: Observation: [[[0, 0], False, [[None, None, None], [None, 'agent', 'empty'], [None, 'empty', 'empty']], []], [[14, 0], False, [[None, None, None], ['empty', 'agent', None], ['empty', 'empty', None]], []], [[13, 14], False, [['empty', 'empty', 'empty'], ['empty', 'empty', 'agent'], [None, None, None]], []]], Action: (0, []), Reward: -1, Done: False
  Agent 2: Observation: [[[0, 0], False, [[None, None, None], [None, 'agent', 'empty'], [None, 'empty', 'empty']], []], [[14, 0], False, [[None, None, None], ['empty', 'agent', None], ['empty', 'empty', None]], []], [[13, 14], False, [['empty', 'empty

KeyboardInterrupt: 