# Bailey's Reinforcement Learning

## Part 1: Policy Iteration on the Frozen Lake Environment

In [1]:
import numpy as np
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
from typing import NamedTuple

class Params(NamedTuple):
    gamma: float  # Discount factor
    theta: float  # Convergence threshold
    episodes: int  # Number of test episodes
    map_size: int  # Size of the FrozenLake map
    seed: int  # Seed for reproducibility
    is_slippery: bool  # Whether the environment is slippery
    proba_frozen: float  # Probability a tile is frozen

def policy_evaluation(policy, env, params: Params):
    """Evaluates a given policy by computing the state-value function."""
    V = np.zeros(env.observation_space.n)
    envP = env.unwrapped.P  # Access transition dynamics
    while True:
        delta = 0
        for s in range(env.observation_space.n):
            v = 0
            for a, action_prob in enumerate(policy[s]):
                for prob, next_state, reward, done in envP[s][a]:
                    v += action_prob * prob * (reward + params.gamma * V[next_state] * (not done))
            delta = max(delta, abs(V[s] - v))
            V[s] = v
        if delta < params.theta:
            break
    return V

def policy_improvement(env, V, params: Params):
    """Improves the policy based on the computed state-value function."""
    policy = np.zeros([env.observation_space.n, env.action_space.n])
    envP = env.unwrapped.P  # Access transition dynamics
    for s in range(env.observation_space.n):
        q_values = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for prob, next_state, reward, done in envP[s][a]:
                q_values[a] += prob * (reward + params.gamma * V[next_state] * (not done))
        best_action = np.argmax(q_values)
        policy[s, best_action] = 1.0
    return policy

def policy_iteration(env, params: Params):
    """Runs the policy iteration algorithm."""
    policy = np.ones([env.observation_space.n, env.action_space.n]) / env.action_space.n
    while True:
        V = policy_evaluation(policy, env, params)
        new_policy = policy_improvement(env, V, params)
        if np.all(policy == new_policy):
            break
        policy = new_policy
    return policy, V

def evaluate_policy(env, policy, params: Params):
    """Evaluates the learned policy by running it in the environment."""
    total_steps = []
    success_count = 0
    goal_state = np.where(env.unwrapped.desc.flatten() == b'G')[0][0]
    
    for _ in range(params.episodes):
        state, _ = env.reset()
        steps = 0
        done = False
        while not done:
            action = np.argmax(policy[state])
            state, _, done, _, _ = env.step(action)
            steps += 1
        total_steps.append(steps)
        success_count += 1 if state == goal_state else 0
    print(f"\nAverage Steps per Episode: {np.mean(total_steps):.2f}")
    print(f"Success Rate: {success_count / params.episodes:.2%}")

# Define parameters
params = Params(
    gamma=0.95,
    theta=1e-6,
    episodes=100,
    map_size=7,
    seed=123,
    is_slippery=False,
    proba_frozen=0.9
)

# Create FrozenLake environment
env = gym.make("FrozenLake-v1", is_slippery=params.is_slippery, desc=generate_random_map(size=params.map_size, p=params.proba_frozen, seed=params.seed))

# Run Policy Iteration
optimal_policy, optimal_value = policy_iteration(env, params)

# Print Optimal Policy
policy_arrows = {0: "←", 1: "↓", 2: "→", 3: "↑"}
policy_grid = np.array([policy_arrows[np.argmax(optimal_policy[s])] if np.max(optimal_policy[s]) > 0 else "•" for s in range(env.observation_space.n)]).reshape(params.map_size, params.map_size)
print("\nOptimal Policy:")
print("\n".join([" ".join(row) for row in policy_grid]))

# Evaluate Policy
evaluate_policy(env, optimal_policy, params)



Optimal Policy:
↓ ↓ ↓ ↓ ↓ ↓ ←
↓ ↓ → ↓ ↓ ↓ ↓
↓ ↓ ← ↓ ↓ ↓ ↓
→ ↓ → ↓ ↓ ↓ ↓
← ↓ ← ↓ ↓ ↓ ↓
↓ ↓ ↓ ↓ ↓ ↓ ↓
→ → → → → → ←

Average Steps per Episode: 12.00
Success Rate: 100.00%
