In [6]:
import numpy as np
import random
from typing import Dict, List, Tuple
import time

class GridWorldMDP:
    def __init__(self, size: int = 5, obstacles: List[Tuple[int, int]] = None):
        self.size = size
        self.states = [(i, j) for i in range(size) for j in range(size)]
        self.actions = ['up', 'down', 'left', 'right']
        
        # Randomly place start and goal states
        self.start_state = (random.randint(0, size-1), random.randint(0, size-1))
        self.goal_state = (random.randint(0, size-1), random.randint(0, size-1))
        
        # Make sure start and goal states are different
        while self.goal_state == self.start_state:
            self.goal_state = (random.randint(0, size-1), random.randint(0, size-1))
            
        # Set obstacles
        self.obstacles = obstacles if obstacles else []
        while len(self.obstacles) < size:  # Add random obstacles
            obs = (random.randint(0, size-1), random.randint(0, size-1))
            if obs not in self.obstacles and obs != self.start_state and obs != self.goal_state:
                self.obstacles.append(obs)

    def get_next_state(self, state: Tuple[int, int], action: str) -> Tuple[int, int]:
        """Determine next state given current state and action."""
        x, y = state
        if action == 'up':
            new_state = (max(0, x-1), y)
        elif action == 'down':
            new_state = (min(self.size-1, x+1), y)
        elif action == 'left':
            new_state = (x, max(0, y-1))
        else:  # right
            new_state = (x, min(self.size-1, y+1))
            
        # Check if new state is an obstacle
        if new_state in self.obstacles:
            return state
        return new_state

    def get_reward(self, state: Tuple[int, int]) -> float:
        """Get reward for being in a state."""
        if state == self.goal_state:
            return 100
        elif state in self.obstacles:
            return -50
        return -1  # Small negative reward for each step

def value_iteration(mdp: GridWorldMDP, gamma: float = 0.9, epsilon: float = 1e-6) -> Dict[Tuple[int, int], float]:
    """Perform value iteration to find optimal values for each state."""
    V = {state: 0 for state in mdp.states}
    while True:
        delta = 0
        V_new = V.copy()
        
        for state in mdp.states:
            if state == mdp.goal_state:
                continue
                
            # Find maximum value over all actions
            max_value = float('-inf')
            for action in mdp.actions:
                next_state = mdp.get_next_state(state, action)
                value = mdp.get_reward(next_state) + gamma * V[next_state]
                max_value = max(max_value, value)
            
            V_new[state] = max_value
            delta = max(delta, abs(V_new[state] - V[state]))
        
        V = V_new
        if delta < epsilon:
            break
    
    return V

def get_optimal_policy(mdp: GridWorldMDP, V: Dict[Tuple[int, int], float], gamma: float = 0.9) -> Dict[Tuple[int, int], str]:
    """Extract optimal policy from value function."""
    policy = {}
    
    for state in mdp.states:
        if state == mdp.goal_state:
            policy[state] = None
            continue
            
        best_action = None
        best_value = float('-inf')
        
        for action in mdp.actions:
            next_state = mdp.get_next_state(state, action)
            value = mdp.get_reward(next_state) + gamma * V[next_state]
            
            if value > best_value:
                best_value = value
                best_action = action
                
        policy[state] = best_action
    
    return policy

# New Policy Iteration Function
def policy_iteration(mdp: GridWorldMDP, gamma: float = 0.9, epsilon: float = 1e-6) -> Tuple[Dict[Tuple[int, int], str], Dict[Tuple[int, int], float]]:
    """
    Perform policy iteration to find the optimal policy.
    
    Returns:
    - policy: Optimal policy as a dictionary mapping each state to an action
    - V: Optimal value function for the policy
    """
    # Start with a random policy
    policy = {state: random.choice(mdp.actions) for state in mdp.states if state != mdp.goal_state}
    V = {state: 0 for state in mdp.states}
    
    while True:
        # Policy Evaluation: Use the current policy to compute V
        V = value_iteration(mdp, gamma, epsilon)  # Policy evaluation using value iteration
        
        # Policy Improvement: Update the policy
        stable = True
        for state in mdp.states:
            if state == mdp.goal_state:
                continue
                
            # Find the best action according to the current V
            best_action = max(mdp.actions, key=lambda a: mdp.get_reward(mdp.get_next_state(state, a)) + gamma * V[mdp.get_next_state(state, a)])
            if policy[state] != best_action:
                stable = False
                policy[state] = best_action
                
        # Stop if the policy is stable (converged)
        if stable:
            return policy, V

def visualize_grid(mdp: GridWorldMDP, path: List[Tuple[int, int]] = None):
    """Visualize the grid world with the path taken."""
    grid = [[' ' for _ in range(mdp.size)] for _ in range(mdp.size)]
    
    # Place obstacles
    for obs in mdp.obstacles:
        grid[obs[0]][obs[1]] = '█'
    
    # Place start and goal
    grid[mdp.start_state[0]][mdp.start_state[1]] = 'S'
    grid[mdp.goal_state[0]][mdp.goal_state[1]] = 'G'
    
    # Place path
    if path:
        for state in path[1:-1]:  # Exclude start and goal states
            if state != mdp.goal_state and state != mdp.start_state:
                grid[state[0]][state[1]] = '•'
    
    # Print the grid
    print('\n'.join([' '.join(row) for row in grid]))

def simulate_episode(mdp: GridWorldMDP, policy: Dict[Tuple[int, int], str], max_steps: int = 100) -> Tuple[List[Tuple[int, int]], bool]:
    """Simulate one episode following the given policy with some randomness."""
    current_state = mdp.start_state
    path = [current_state]
    steps = 0
    
    while steps < max_steps:
        if current_state == mdp.goal_state:
            return path, True
            
        # Add some randomness to make it more dynamic
        if random.random() < 0.2:  # 20% chance of random action
            action = random.choice(mdp.actions)
        else:
            action = policy[current_state]
            
        current_state = mdp.get_next_state(current_state, action)
        path.append(current_state)
        steps += 1
    
    return path, False

# Now you can call policy_iteration like this:
# mdp = GridWorldMDP(size=5)
# optimal_policy, optimal_values = policy_iteration(mdp)

def run_simulation():
    """Run a complete simulation using policy iteration."""
    # Create MDP
    mdp = GridWorldMDP(size=4)
    
    print("Initial Grid:")
    visualize_grid(mdp)
    
    # Compute optimal policy using policy iteration
    print("\nComputing optimal policy using Policy Iteration...")
    optimal_policy, optimal_values = policy_iteration(mdp)
    
    # Simulate episode
    print("\nSimulating episode using optimal policy...")
    path, success = simulate_episode(mdp, optimal_policy)
    
    print("\nFinal Path:")
    visualize_grid(mdp, path)
    
    if success:
        print("\nSuccess! Agent reached the goal! 🎉")
    else:
        print("\nBetter luck next time! Agent didn't reach the goal in time. 😞")
    
    # Print some statistics
    print(f"\nPath length: {len(path)} steps")
    print(f"Start state: {mdp.start_state}")
    print(f"Goal state: {mdp.goal_state}")

# You can run a simulation using the policy iteration function:
if __name__ == "__main__":
    print("=== Policy Iteration Simulation ===")
    run_simulation()

=== Policy Iteration Simulation ===
Initial Grid:
    █  
    G  
S     █
  █ █  

Computing optimal policy using Policy Iteration...

Simulating episode using optimal policy...

Final Path:
    █  
• • G  
S     █
• █ █  

Success! Agent reached the goal! 🎉

Path length: 6 steps
Start state: (2, 0)
Goal state: (1, 2)


# Lab 4

In [7]:
def policy_improvement(mdp: GridWorldMDP, V: Dict[Tuple[int, int], float], gamma: float = 0.9) -> Dict[Tuple[int, int], str]:
    """Perform policy improvement given the current value function."""
    new_policy = {}

    for state in mdp.states:
        if state == mdp.goal_state:
            new_policy[state] = None  # No action needed for the goal state
            continue

        # Find the best action based on the current value function
        best_action = None
        best_value = float('-inf')

        for action in mdp.actions:
            next_state = mdp.get_next_state(state, action)
            value = mdp.get_reward(next_state) + gamma * V[next_state]

            if value > best_value:
                best_value = value
                best_action = action

        new_policy[state] = best_action

    return new_policy

In [11]:
# Create an instance of GridWorldMDP
mdp = GridWorldMDP(size=5)

# Run value iteration to get the value function
values = value_iteration(mdp)

# Get the initial policy based on the value function
initial_policy = get_optimal_policy(mdp, values)

# Display initial state-value function
print("Evaluating initial policy...")
print("Policy evaluation complete.")
print("Initial state-value function:")
for state, value in values.items():
    print(f"State: {state}, Value: {value:.2f}")

# Apply policy improvement using the value function
print("\nImproving policy...")
improved_policy = policy_improvement(mdp, values)
print("Policy improvement complete.")

# Display initial and improved policy
print("\nInitial policy:")
for state, action in initial_policy.items():
    print(f"State: {state}, Action: {action}")

print("\nImproved policy:")
for state, action in improved_policy.items():
    print(f"State: {state}, Action: {action}")

Evaluating initial policy...
Policy evaluation complete.
Initial state-value function:
State: (0, 0), Value: -10.00
State: (0, 1), Value: -10.00
State: (0, 2), Value: -10.00
State: (0, 3), Value: -10.00
State: (0, 4), Value: -10.00
State: (1, 0), Value: -10.00
State: (1, 1), Value: -10.00
State: (1, 2), Value: -10.00
State: (1, 3), Value: -10.00
State: (1, 4), Value: -10.00
State: (2, 0), Value: -10.00
State: (2, 1), Value: -10.00
State: (2, 2), Value: -10.00
State: (2, 3), Value: -10.00
State: (2, 4), Value: -10.00
State: (3, 0), Value: -10.00
State: (3, 1), Value: -10.00
State: (3, 2), Value: -10.00
State: (3, 3), Value: 89.00
State: (3, 4), Value: 100.00
State: (4, 0), Value: -10.00
State: (4, 1), Value: -10.00
State: (4, 2), Value: 89.00
State: (4, 3), Value: 100.00
State: (4, 4), Value: 0.00

Improving policy...
Policy improvement complete.

Initial policy:
State: (0, 0), Action: up
State: (0, 1), Action: up
State: (0, 2), Action: up
State: (0, 3), Action: up
State: (0, 4), Action