In [6]:
from enum import Enum
from envs.grid import Grid
from envs.constants import SQUARE_SIZE

# Fire Evacuation Planner MDP
This agent will implement a classic MDP with states, rewards and transition models
Extending the MDP to our use case could include:
 - Fire Spread algorithm:
   - Episode ends if stepping in fire state
   - Firefighter (MDP agent) recieves reward for steps that have people needing to rescue
   - Generate an environment that includes more sophisticated properties - generate walls, based on grid, doors and so on...
   - Default reward could be something like -0.04 to encourage efficiency
   - Pass arguments to the grid when defining the base environment (walls, starting fire, people)

## Compare Reinforcement Learning Methods (Q-learning, SARSA) to Classical Methods (Policy iteration, value iteration, linear programming)
The separate models will aim to answer whether classical models or RL-based are better suited for a simulation of a real-world fire hazard on a building floor.

# Possible challenges of a classical MDP implementation
Since we are dealing with a classical-based MDP, we would need to make sure that all processes are markovian - taking action based only on current state and possible rewards.

If we encode the fire in a way that it spreads independently, then that would mean that our agent acts in a non-MDP way.

 - One way to solve this would be to include the fire status of every grid, which can quickly turn out to be alot of calculations and statuses for a simple grid.

For small grids in examples like 3x4 size, this would be a challenge but for bigger ones, Reinforcement Learning almost definitely need to be adopted in order to manage the changing environment.

In [8]:
import numpy as np
from envs.grid import Grid
from envs.tiles.tile import Tile
from envs.constants import Action, GRID_SIZE # Import GRID_SIZE

class FireEvacuationAgentMDP:
    def __init__(self, np_random: np.random.RandomState):
        """
        Initializes the FireEvacuationAgentMDP.
        The Grid itself manages the agent's initial position and fire state.
        
        Args:
            np_random: A NumPy random state object for reproducibility within the MDP and Grid.
        """
        self.np_random = np_random
        self.grid = Grid(self.np_random) # Pass np_random to the Grid constructor
        
        # Grid dimensions are now derived from GRID_SIZE in constants.py
        self.rows = GRID_SIZE
        self.cols = GRID_SIZE
        
        # Actions are strings based on the Action enum
        self.actions = [action.name.lower() for action in Action if action.name.lower() in ['up', 'left', 'right', 'down']] 
        # Only include movement actions for now, as PickPerson, BreakDoor, PutOutFire require more complex logic
        # for a classic MDP state representation and will be added later if necessary.

        # Determine all possible agent positions based on traversable tiles in the grid
        self.possible_agent_positions = []
        for y in range(self.rows):
            for x in range(self.cols):
                if self.grid.tiles[x][y].is_traversable:
                    self.possible_agent_positions.append((x, y))

        # The agent's actual starting position is determined by the Grid's initialization
        self.start_state_position = tuple(self.grid.agent.pos) # Convert np.array to tuple
        self.current_state_position = tuple(self.grid.agent.pos) # Agent's current (x,y) position

        # The actual MDP state combines agent's position and fire configuration
        self.current_mdp_state = self._get_current_mdp_state()

    def _get_fire_config_tuple(self) -> tuple:
        """
        Returns a flattened tuple representing the fire status of all traversable tiles.
        """
        fire_status = []
        for y in range(self.rows):
            for x in range(self.cols):
                # Ensure we only check tiles that exist in the grid (e.g., not None if grid creation is partial)
                tile = self.grid.tiles[x][y]
                if tile and tile.is_traversable: # Only include traversable tiles in the state representation
                    fire_status.append(tile.is_on_fire)
        return tuple(fire_status)

    def _get_current_mdp_state(self) -> tuple:
        """
        Combines agent's position and fire configuration into the full MDP state.
        """
        return (self.current_state_position[0], self.current_state_position[1]) + self._get_fire_config_tuple()

    def reset(self) -> tuple:
        """
        Resets the MDP to a new initial state by recreating the Grid.
        Returns the new initial MDP state.
        """
        self.grid = Grid(self.np_random) # Recreate grid to reset agent, cat, and fire
        self.current_state_position = tuple(self.grid.agent.pos) # Get new agent position from reset grid
        self.start_state_position = tuple(self.grid.agent.pos) # Update start state as well
        self.current_mdp_state = self._get_current_mdp_state()
        return self.current_mdp_state

    def step(self, action_str: str) -> tuple:
        """
        Executes an action in the environment.
        
        Args:
            action_str (str): The string name of the action (e.g., 'up', 'down').
            
        Returns:
            tuple: (new_mdp_state, reward, is_terminal, info)
        """
        try:
            # Convert the incoming string action to its corresponding Action enum member
            action_enum = Action[action_str.upper()]
        except KeyError:
            raise ValueError(f"Invalid action: {action_str}. Must be one of {[a.name.lower() for a in Action]}")

        # Determine the intended next position based on the action
        x, y = self.current_state_position
        intended_next_position = None
        match action_enum:
            case Action.UP:
                intended_next_position = (x, y + 1)
            case Action.LEFT:
                intended_next_position = (x - 1, y)
            case Action.RIGHT:
                intended_next_position = (x + 1, y)
            case Action.DOWN:
                intended_next_position = (x, y - 1)
            case _: # Handle other actions if they were included in self.actions
                # For now, we only handle movement, others will be no-ops or raise error
                intended_next_position = (x, y) # Stay in place for non-movement actions or invalid ones in match

        # Call the Grid's update method, which handles agent movement and fire spread
        # The Grid's update method itself determines if the agent actually moved or stayed.
        # It returns True if agent moved, False if blocked.
        agent_moved = self.grid.update(intended_next_position)

        # Update the MDP's internal agent position based on the Grid's actual agent position
        self.current_state_position = tuple(self.grid.agent.pos)

        # Determine reward (placeholder for now)
        reward = 0

        # Determine if terminal state (placeholder for now)
        # Check if agent is on fire *after* the grid update
        is_terminal = self.grid.is_agent_dead() # Or if cat is rescued, etc.

        # Construct the full new MDP state after all environment dynamics
        self.current_mdp_state = self._get_current_mdp_state()

        info = {"agent_moved": agent_moved} # Provide some info about the move outcome
        return self.current_mdp_state, reward, is_terminal, info

    def get_possible_states(self) -> list[tuple]:
        """
        Returns a list of all possible (x,y) agent positions.
        Note: For a full MDP state, this would need to generate all
        (agent_pos, fire_config) combinations, which is generally intractable.
        This method refers to traversable (x,y) coordinates.
        """
        return self.possible_agent_positions

    def __str__(self) -> str:
        """
        Provides a string representation of the current MDP state.
        """
        agent_pos_str = f"Agent Pos: {self.current_state_position}"
        fire_config_str = f"Fire Config: {self._get_fire_config_tuple()}"
        return f"Current MDP State: ({agent_pos_str}, {fire_config_str})"

In [11]:
import numpy as np
# Ensure these imports match your file structure relative to where you run this script
# Assuming FireEvacuationAgentMDP.py is in the same directory as this test script
from envs.constants import Action, GRID_SIZE # Import GRID_SIZE for context

# 1. Initialize a NumPy random state (crucial for Grid and MDP consistency)
seed = 42 # For reproducibility
np_random_instance = np.random.RandomState(seed)

# 2. Initialize the FireEvacuationAgentMDP
# It now takes np_random_instance, and the Grid is created internally within the MDP.
print(f"Initializing FireEvacuationAgentMDP for a {GRID_SIZE}x{GRID_SIZE} grid...")
mdp = FireEvacuationAgentMDP(np_random=np_random_instance)
print("FireEvacuationAgentMDP initialized.")

# Display initial state
print(f"\nInitial MDP state: {mdp.current_mdp_state}")
print(f"Agent's starting position (from grid): {mdp.current_state_position}")
print(f"Possible actions: {mdp.actions}")

# 3. Test some steps
print("\n--- Testing Agent Steps ---")

# Step 1: Move right
action_to_take = Action.RIGHT.name.lower()
print(f"\nAgent takes action: {action_to_take}")
new_state, reward, is_terminal, info = mdp.step(action_to_take)
print(f"New MDP state: {new_state}")
print(f"Agent position after move: {mdp.current_state_position}")
print(f"Reward: {reward}, Terminal: {is_terminal}")
print(f"Info (Agent moved): {info['agent_moved']}")


# Step 2: Move down
action_to_take = Action.DOWN.name.lower()
print(f"\nAgent takes action: {action_to_take}")
new_state, reward, is_terminal, info = mdp.step(action_to_take)
print(f"New MDP state: {new_state}")
print(f"Agent position after move: {mdp.current_state_position}")
print(f"Reward: {reward}, Terminal: {is_terminal}")
print(f"Info (Agent moved): {info['agent_moved']}")

# Step 3: Try to move into a wall
# Your Grid's `_create_walls` method places walls at specific positions.
# The Grid's `update` method will prevent movement into non-traversable tiles.
# To properly test this, we might need to know the initial random agent position and wall positions.
# Let's try an action that *might* lead to a wall based on GRID_SIZE and typical wall placements.
# For example, if agent starts at (0,0) and tries to move left, it will hit a boundary.
# If GRID_SIZE is 5, and there's a wall at (0,2), moving up from (0,1) would hit it.

# Let's try to ensure the agent is at (0,0) for a boundary test, if the current Grid allows it.
# Note: Since the Grid randomly places the agent, this specific test might not always
# put the agent in a position to hit a wall directly without manual adjustment or
# a more sophisticated test setup.
print(f"\nAttempting a boundary/wall test:")
current_agent_pos_for_test = mdp.current_state_position
print(f"Agent starts this test at: {current_agent_pos_for_test}")

# Try moving left (if agent is at x=0, this hits boundary)
action_to_take_wall = Action.LEFT.name.lower()
print(f"Agent attempts action: {action_to_take_wall}")
new_state, reward, is_terminal, info = mdp.step(action_to_take_wall)
print(f"New MDP state: {new_state}")
print(f"Agent position after attempt: {mdp.current_state_position}")
print(f"Info (Agent moved): {info['agent_moved']}")
if not info['agent_moved']:
    print("Agent was blocked (boundary or obstacle) and stayed in place.")

# 4. Observe fire dynamics over several steps
print("\n--- Observing Fire Dynamics ---")
# Reset the MDP to a new random start to observe fire changes.
# Each reset will create a new random grid with new agent/cat positions and fire states.
initial_mdp_state_after_reset = mdp.reset()
initial_fire_config = initial_mdp_state_after_reset[2:]
print(f"\nInitial state after reset: {initial_mdp_state_after_reset}")
print(f"Initial fire configuration after reset: {initial_fire_config}")

# Take several 'noop' steps (e.g., trying to move into a wall or just staying put)
# to allow fire to spread/extinguish, as grid.update() is called each step.
print("\nTaking 5 steps to observe fire spread...")
for i in range(5):
    # Agent tries to move right. This ensures grid.update() runs.
    # If the agent is blocked, it effectively becomes a 'noop' for position.
    action_to_take_fire_obs = Action.RIGHT.name.lower() 
    new_state, reward, is_terminal, info = mdp.step(action_to_take_fire_obs)
    
    current_fire_config = new_state[2:]
    fire_changed = "YES" if current_fire_config != initial_fire_config else "NO"
    print(f"Step {i+1}: Agent Pos: {mdp.current_state_position}, Fire changed: {fire_changed}, New fire config: {current_fire_config}")
    initial_fire_config = current_fire_config # Update for next comparison

    if is_terminal:
        print(f"Agent became terminal at step {i+1} (e.g., died in fire).")
        break # Stop if a terminal state is reached

Initializing FireEvacuationAgentMDP for a 6x6 grid...


AttributeError: 'FireFighter' object has no attribute 'pos'