In [5]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [6]:

import gymnasium as gym
import numpy as np
import time
from gymnasium.envs.registration import register
from gymnasium.envs.toy_text.frozen_lake import generate_random_map


# Step 1: Register Custom Environments

custom_map = [
    "SFFH",
    "HFHF",
    "FFFH",
    "HFFG"
]  # S: Start, F: Frozen, H: Hole, G: Goal

register(
    id="FrozenLake-Custom-v1",
    entry_point="gymnasium.envs.toy_text:FrozenLakeEnv",
    kwargs={"desc": custom_map, "is_slippery": False},
    max_episode_steps=100,
)

register(
    id="FrozenLake-Expanded-v1",
    entry_point="gymnasium.envs.toy_text:FrozenLakeEnv",
    kwargs={"desc": generate_random_map(size=8), "is_slippery": True},
    max_episode_steps=200,
)

original_env = gym.make("FrozenLake-v1", is_slippery=True)
custom_env = gym.make("FrozenLake-Custom-v1")
expanded_env = gym.make("FrozenLake-Expanded-v1")


# Step 2: Value Iteration

def value_iteration(env, gamma=0.99, theta=1e-8):
    V = np.zeros(env.observation_space.n)
    while True:
        delta = 0
        for s in range(env.observation_space.n):
            action_values = np.zeros(env.action_space.n)
            for a in range(env.action_space.n):
                for prob, next_state, reward, done in env.unwrapped.P[s][a]:
                    action_values[a] += prob * (reward + gamma * V[next_state])
            best_value = np.max(action_values)
            delta = max(delta, abs(best_value - V[s]))
            V[s] = best_value
        if delta < theta:
            break

    policy = np.zeros(env.observation_space.n, dtype=int)
    for s in range(env.observation_space.n):
        action_values = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for prob, next_state, reward, done in env.unwrapped.P[s][a]:
                action_values[a] += prob * (reward + gamma * V[next_state])
        policy[s] = np.argmax(action_values)
    return policy, V


# Step 3: Policy Iteration

def policy_iteration(env, gamma=0.99):
    policy = np.random.randint(env.action_space.n, size=env.observation_space.n)
    V = np.zeros(env.observation_space.n)
    is_policy_stable = False

    while not is_policy_stable:
        # Policy Evaluation
        while True:
            delta = 0
            for s in range(env.observation_space.n):
                v = V[s]
                a = policy[s]
                V[s] = sum([
                    prob * (reward + gamma * V[next_state])
                    for prob, next_state, reward, done in env.unwrapped.P[s][a]
                ])
                delta = max(delta, abs(v - V[s]))
            if delta < 1e-8:
                break

        # Policy Improvement
        is_policy_stable = True
        for s in range(env.observation_space.n):
            old_action = policy[s]
            action_values = np.zeros(env.action_space.n)
            for a in range(env.action_space.n):
                for prob, next_state, reward, done in env.unwrapped.P[s][a]:
                    action_values[a] += prob * (reward + gamma * V[next_state])
            best_action = np.argmax(action_values)
            policy[s] = best_action
            if old_action != best_action:
                is_policy_stable = False

    return policy, V


# Step 4: Evaluate Policy on Episodes

def evaluate_policy(env, policy, episodes=100):
    total_rewards = []
    steps_taken = []
    for _ in range(episodes):
        obs, _ = env.reset()
        done = False
        total_reward, steps = 0, 0
        while not done:
            obs, reward, done, truncated, _ = env.step(policy[obs])
            total_reward += reward
            steps += 1
        total_rewards.append(total_reward)
        steps_taken.append(steps)
    return np.mean(total_rewards), np.mean(steps_taken)


# Step 5: Compare Performance

def evaluate_all(envs, names):
    for env, name in zip(envs, names):
        print(f"\n=== Evaluating {name} ===")
        for method_name, algorithm in [
            ("Value Iteration", value_iteration),
            ("Policy Iteration", policy_iteration)
        ]:
            start = time.time()
            policy, _ = algorithm(env)
            elapsed = time.time() - start
            avg_reward, avg_steps = evaluate_policy(env, policy)
            print(f"{method_name}: Avg Reward = {avg_reward:.2f}, Avg Steps = {avg_steps:.2f}, Time = {elapsed:.4f}s")


# Final Execution

evaluate_all(
    [original_env, custom_env, expanded_env],
    ["Original FrozenLake (4x4, Slippery)",
     "Custom FrozenLake (4x4, Not Slippery)",
     "Expanded FrozenLake (8x8, Slippery)"]
)



=== Evaluating Original FrozenLake (4x4, Slippery) ===
Value Iteration: Avg Reward = 0.86, Avg Steps = 49.74, Time = 0.0796s
Policy Iteration: Avg Reward = 0.79, Avg Steps = 46.65, Time = 0.0180s

=== Evaluating Custom FrozenLake (4x4, Not Slippery) ===
Value Iteration: Avg Reward = 1.00, Avg Steps = 6.00, Time = 0.0015s
Policy Iteration: Avg Reward = 1.00, Avg Steps = 6.00, Time = 0.0020s

=== Evaluating Expanded FrozenLake (8x8, Slippery) ===
Value Iteration: Avg Reward = 0.01, Avg Steps = 96.45, Time = 0.2742s
Policy Iteration: Avg Reward = 0.02, Avg Steps = 88.02, Time = 0.2889s
