## Prerequisities

In [13]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from gymnasium.wrappers import RecordVideo
import pygame
import time

## Custom Maze Env

In [14]:
# this function creates a 5x5 maze environment with random placements for the goal , mines and agnet. it's a stochastic environment where the agent will move to the 
# intended direction with probability 70% and to each of the perpendicular directions with probability 15%

class GridMazeEnv(gym.Env):
   
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, grid_size=5, render_mode="rgb_array"):
        super().__init__()

        # Define the grid layout
        self.grid_size = grid_size
        self.cell_size = 100
        self.width = self.grid_size * self.cell_size
        self.height = self.grid_size * self.cell_size
        self.render_mode = render_mode

        # Define the action space: 4 discrete actions (0:Up, 1:Right, 2:Down, 3:Left)
        self.action_space = spaces.Discrete(4)
        self.action_to_direction = {0: (-1, 0), 1: (0, 1), 2: (1, 0), 3: (0, -1)}
        self.observation_space = spaces.MultiDiscrete([grid_size, grid_size])

        # PyGame setup for rendering
        self.screen = None
        self.clock = None

    # This function takes a seed, shuffles the coordinates of the grid and returns 4 distinct random coordinates : 1 for the goal, 2 for the mines and 1 for the agent
    def reset(self, seed=None, options=None):
        # Use super().reset(seed=seed) for proper seeding of environment's RNG
        super().reset(seed=seed)

        layout_seed = options.get('layout_seed', None) if options else None
        agent_seed = options.get('agent_seed', None) if options else None

        # Use separate RNGs for layout and agent start
        layout_rng = np.random.default_rng(layout_seed)
        agent_rng = np.random.default_rng(agent_seed) if agent_seed is not None else self.np_random

        # --- Generate Layout (Goal and Non-Adjacent Mines) ---
        
        # Get all possible cell coordinates
        all_cells = [(r, c) for r in range(self.grid_size) for c in range(self.grid_size)]
        
        # Shuffle the list to pick randomly without replacement
        layout_rng.shuffle(all_cells) 

        # Pick Goal
        self.goal_pos = all_cells.pop() # Takes the last element after shuffle

        # Pick First Mine
        mine1_pos = all_cells.pop() # Takes the new last element

        # Pick Second Mine, ensuring it's not adjacent to the first
        mine2_pos = None
        # Create a list of indices into the *current* all_cells list
        eligible_mine2_indices = list(range(len(all_cells))) 
        layout_rng.shuffle(eligible_mine2_indices) # Shuffle indices to try randomly
        
        found_mine2 = False
        for idx_in_list in eligible_mine2_indices:
            potential_mine2_pos = all_cells[idx_in_list]
            # Check Manhattan distance: |r1-r2| + |c1-c2| == 1 means adjacent
            if abs(potential_mine2_pos[0] - mine1_pos[0]) + abs(potential_mine2_pos[1] - mine1_pos[1]) != 1:
                # Found a non-adjacent position
                mine2_pos = potential_mine2_pos
                # Remove the chosen mine from the list using its index *before* popping
                all_cells.pop(idx_in_list) 
                found_mine2 = True
                break # Exit the loop once a valid position is found

        # Safety check (shouldn't happen in a 5x5 grid unless almost full)
        if not found_mine2:
            raise RuntimeError("Could not place two non-adjacent mines. Grid might be too small or logic error.")

        self.mines = [mine1_pos, mine2_pos]

        # --- Agent Placement ---
        # Pick Agent Start from the remaining cells using the agent RNG
        agent_idx_in_list = agent_rng.choice(len(all_cells))
        self.agent_pos = np.array(all_cells[agent_idx_in_list])

        # --- Debug Prints (Optional) ---
        print("goal pos = ", self.goal_pos)
        print("mines pos = ", self.mines)
        print("agent pos = ", self.agent_pos)

        return self.agent_pos, {}


    def step(self, action):
        # 70% intended, 15% perpendicular left, 15% perpendicular right
        outcomes = []
        if action == 0:  # Up
            outcomes = [0, 3, 1] # Actual actions: Up, Left, Right
        elif action == 1: # Right
            outcomes = [1, 0, 2] # Actual actions: Right, Up, Down
        elif action == 2: # Down
            outcomes = [2, 1, 3] # Actual actions: Down, Right, Left
        elif action == 3: # Left
            outcomes = [3, 2, 0] # Actual actions: Left, Down, Up

        # Choose the actual move based on probabilities
        actual_action = np.random.choice(outcomes, p=[0.7, 0.15, 0.15])
        direction = self.action_to_direction[actual_action]

        # Update agent position, ensuring it stays within the grid
        self.agent_pos = np.clip(self.agent_pos + direction, 0, self.grid_size - 1)

        # Determine reward and if the episode is done
        terminated = False
        if np.array_equal(self.agent_pos, self.goal_pos):
            reward = 10.0
            terminated = True
        elif tuple(self.agent_pos) in self.mines:
            reward = -50.0
            terminated = True
        else:
            # Small negative reward to encourage finding the goal faster
            reward = -0.1

        truncated = False # No time limit in this environment
        info = {}

        return self.agent_pos, reward, terminated, truncated, info

    def render(self):
        if self.screen is None:
            pygame.init()
            if self.render_mode == "human":
                self.screen = pygame.display.set_mode((self.width, self.height))
                pygame.display.set_caption("Grid Maze")
            else:  # rgb_array mode
                self.screen = pygame.Surface((self.width, self.height))
            self.clock = pygame.time.Clock()

        # Fill background
        self.screen.fill((255, 255, 255))

        # Draw Goal (Green)
        goal_rect = pygame.Rect(self.goal_pos[1] * self.cell_size, self.goal_pos[0] * self.cell_size, self.cell_size, self.cell_size)
        pygame.draw.rect(self.screen, (0, 255, 0), goal_rect)

        # Draw Mines (Red)
        for mine in self.mines:
            mine_rect = pygame.Rect(mine[1] * self.cell_size, mine[0] * self.cell_size, self.cell_size, self.cell_size)
            pygame.draw.rect(self.screen, (255, 0, 0), mine_rect)

        # Draw Agent (Blue Circle)
        center_x = self.agent_pos[1] * self.cell_size + self.cell_size // 2
        center_y = self.agent_pos[0] * self.cell_size + self.cell_size // 2
        pygame.draw.circle(self.screen, (0, 0, 255), (center_x, center_y), self.cell_size // 3)

        # Draw grid lines
        for x in range(0, self.width, self.cell_size):
            pygame.draw.line(self.screen, (0, 0, 0), (x, 0), (x, self.height))
        for y in range(0, self.height, self.cell_size):
            pygame.draw.line(self.screen, (0, 0, 0), (0, y), (self.width, y))

        if self.render_mode == "human":
            pygame.display.flip()
            self.clock.tick(self.metadata["render_fps"])
        elif self.render_mode == "rgb_array":
            return np.transpose(pygame.surfarray.array3d(self.screen), (1, 0, 2))

    def close(self):
        if self.screen is not None:
            pygame.quit()
            self.screen = None
            self.clock = None

## Policy iteration


In [15]:
# this function executes the policy iteration with its 2 phases : policy evaluation and policy improvement

class PolicyIterationAgent:
    def __init__(self, env, gamma=0.9, theta=1e-6):
        self.env = env
        self.num_states = env.grid_size * env.grid_size
        self.num_actions = env.action_space.n   
        self.gamma = gamma  # Discount factor
        self.theta = theta  # Convergence threshold

        # Initialize a random policy and a zero value function
        # The policy is a mapping from state_index -> action
        self.policy = np.random.randint(self.num_actions, size=self.num_states)
        # The value function is a table of values for each state
        self.value_function = np.zeros(self.num_states)

    def _state_to_index(self, state):
        #Converts a (row, col) state to a single index.
        # multiply the row by thegrid size (here 5) then add the column such that all cells are enumerated from 0 to 24 (like the example in the lecture slides) 
        return state[0] * self.env.grid_size + state[1]

    def _get_transition_model(self):
        # Computes the model of the environment: P(s' | s, a) (probability enni lamma ab2a f state s wakhod action a awsal l state s') 
        # and R(s, a, s') ( lamma ab2a f state s wakhod action a awsal l state s' hakhod reward ad eih).
        # This is a key step for model-based DP algorithms like Policy Iteration.
        model = {}
        for row in range(self.env.grid_size):
            for col in range(self.env.grid_size):
                state_idx = self._state_to_index((row, col))   # the number of the cell in question
                model[state_idx] = {a: [] for a in range(self.num_actions)}   # get all the actions available for this cell

                # If the state is terminal, all actions lead back to it with 0 reward.
                if (row, col) == tuple(self.env.goal_pos) or (row, col) in self.env.mines:
                    for a in range(self.num_actions):
                        model[state_idx][a].append((1.0, state_idx, 0.0))
                    continue

                for action in range(self.num_actions):
                    # Define stochastic outcomes
                    outcomes = []
                    if action == 0: outcomes = [(0.7, 0), (0.15, 3), (0.15, 1)] # Up, Left, Right
                    elif action == 1: outcomes = [(0.7, 1), (0.15, 0), (0.15, 2)] # Right, Up, Down
                    elif action == 2: outcomes = [(0.7, 2), (0.15, 1), (0.15, 3)] # Down, Right, Left
                    elif action == 3: outcomes = [(0.7, 3), (0.15, 2), (0.15, 0)] # Left, Down, Up

                    for prob, actual_action in outcomes:
                        direction = self.env.action_to_direction[actual_action]
                        next_pos = np.clip(np.array([row, col]) + direction, 0, self.env.grid_size - 1)
                        next_state_idx = self._state_to_index(next_pos)

                        # Calculate reward for this transition
                        if tuple(next_pos) == tuple(self.env.goal_pos):
                            reward = 10.0
                        elif tuple(next_pos) in self.env.mines:
                            reward = -50.0
                        else:
                            reward = -0.1

                        model[state_idx][action].append((prob, next_state_idx, reward))
        return model

    def policy_evaluation(self):
        # Calculates the value function for the current policy by iteratively applying the Bellman expectation equation until values converge.
        while True:
            delta = 0
            for s in range(self.num_states):
                v = self.value_function[s]
                action = self.policy[s]
                new_v = 0
                # Sum over possible next states
                for prob, next_s, reward in self.transition_model[s][action]:
                    # Bellman equation 
                    new_v += prob * (reward + self.gamma * self.value_function[next_s])
                self.value_function[s] = new_v
                delta = max(delta, abs(v - self.value_function[s]))

            if delta < self.theta:
                break

    def policy_improvement(self):
        # Improves the policy by acting greedily with respect to the current value function. Returns True if the policy is stable, False otherwise.
        policy_stable = True
        for s in range(self.num_states):
            old_action = self.policy[s]
            action_values = np.zeros(self.num_actions)
            for a in range(self.num_actions):
                # Calculate the value of taking action 'a' in state 's'
                for prob, next_s, reward in self.transition_model[s][a]:
                    action_values[a] += prob * (reward + self.gamma * self.value_function[next_s])

            # Choose the best action
            best_action = np.argmax(action_values)
            self.policy[s] = best_action

            if old_action != best_action:
                policy_stable = False
        return policy_stable

    def solve(self):
        # Runs the main Policy Iteration loop until the policy converges.
        print("Building transition model")
        self.transition_model = self._get_transition_model()
        print("Model built.")

        iteration = 0
        while True:
            iteration += 1
            print(f"\n--- Iteration {iteration} ---")
            print("1. Evaluating policy...")
            self.policy_evaluation()

            print("2. Improving policy...")
            policy_stable = self.policy_improvement()

            if policy_stable:
                print("\nPolicy converged!")
                print(f"Converged in {iteration} iterations.")
                break

    def predict(self, state):
        # Predicts the best action for a given state using the learned policy.
        
        state_idx = self._state_to_index(state)
        return self.policy[state_idx]

## Main


In [20]:
if __name__ == "__main__":
    # --- 1. Training Phase ---
    layout_seed= 54321
    agent_seed_train= 12345
    agent_seed_test= 98765
    env = GridMazeEnv()
    _ = env.reset(options = {"layout_seed": layout_seed, "agent_seed": agent_seed_train})  # Fixed maze layout

    agent = PolicyIterationAgent(env, gamma=0.99)
    start_time = time.time()
    agent.solve()
    end_time = time.time()
    print(f"Training time: {end_time - start_time:.2f} seconds")

    # Display the final optimal policy
    policy_grid = agent.policy.reshape(env.grid_size, env.grid_size)
    action_symbols = {0: "↑", 1: "→", 2: "↓", 3: "←"}
    print("\nOptimal Policy:")
    for r in range(env.grid_size):
        row_str = " | ".join([action_symbols[p] for p in policy_grid[r]])
        print(f" {row_str} ")

    # --- 2. Evaluation & Recording Phase ---
    print("\nRecording video of the optimal policy in action...")

    # Create new environment with the same maze layout (seed 37)
    video_env = GridMazeEnv(render_mode="rgb_array")
    video_env = RecordVideo(video_env, video_folder="videos", name_prefix="policy-iteration-solution")

    # Fix goal/mines using the same seed
    obs, _ = video_env.reset(options = {"layout_seed": layout_seed, "agent_seed": agent_seed_test})

    done = False
    while not done:
        action = agent.predict(obs)
        obs, reward, terminated, truncated, info = video_env.step(action)
        done = terminated or truncated

    video_env.close()
    print("Video saved in the 'videos' folder.")


goal pos =  (4, 0)
mines pos =  [(2, 3), (1, 0)]
agent pos =  [4 1]
Building transition model
Model built.

--- Iteration 1 ---
1. Evaluating policy...
2. Improving policy...

--- Iteration 2 ---
1. Evaluating policy...
2. Improving policy...

--- Iteration 3 ---
1. Evaluating policy...
2. Improving policy...

--- Iteration 4 ---
1. Evaluating policy...
2. Improving policy...

--- Iteration 5 ---
1. Evaluating policy...
2. Improving policy...

--- Iteration 6 ---
1. Evaluating policy...
2. Improving policy...

Policy converged!
Converged in 6 iterations.
Training time: 0.07 seconds

Optimal Policy:
 ↑ | → | ↓ | ← | ← 
 ↑ | → | ↓ | ↑ | ← 
 ↓ | ↓ | ← | ↑ | → 
 ↓ | ↓ | ← | ↓ | ↓ 
 ↑ | ← | ← | ← | ← 

Recording video of the optimal policy in action...
goal pos =  (4, 0)
mines pos =  [(2, 3), (1, 0)]
agent pos =  [2 4]
Video saved in the 'videos' folder.
