In [1]:
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
from gymnasium.envs.registration import register
import random
import numpy as np
import time
import matplotlib.pyplot as plt

In [2]:
lake_map = generate_random_map(size=4, p=0.9)

In [3]:
register(
    id="FrozenLake20x20Custom-v1",
    entry_point="gymnasium.envs.toy_text.frozen_lake:FrozenLakeEnv",
    kwargs={
        "desc": lake_map,
        "map_name": "FrozenLake20x20Custom",
        "is_slippery": False
    }
)
env = gym.make("FrozenLake20x20Custom-v1", render_mode="ansi")

In [4]:
def value_iteration(env, gamma=0.9, theta=0.001):
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    V = np.zeros(n_states)
    iterations = 0    
    while True:
        iterations += 1
        delta = 0
        for s in range(n_states):
            q_sa = [sum([p*(r+gamma* V[s_]) for p, s_, r, _ in env.unwrapped.P[s][a]]) for a in range(n_actions)]
            max_q = max(q_sa)
            delta = max(delta, abs(max_q-V[s]))
            V[s] = max_q
        if delta < theta:
            break
    policy = np.zeros(n_states, dtype=int)
    for s in range(n_states):
        q_sa = [sum([p*(r+gamma*V[s_]) for p,s_ ,r, _ in env.unwrapped.P[s][a]])
                for a in range(n_actions)]
        policy[s] = np.argmax(q_sa)
    return policy, V, iterations

In [5]:
def policy_iteration(env, gamma=0.9, theta=0.001):
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    policy = np.random.choice(n_actions, size=n_states)
    V = np.zeros(n_states)
    iterations = 0
    while True:
        iterations += 1
        while True:
            delta = 0
            for s in range(n_states):
                a = policy[s]
                v = sum([p * (r + gamma * V[s_]) for p, s_, r, _ in env.unwrapped.P[s][a]])
                delta = max(delta, abs(v - V[s]))
                V[s] = v
            if delta < theta:
                break
        policy_stable = True
        for s in range(n_states):
            old_action = policy[s]
            q_sa = [sum([p * (r + gamma * V[s_]) for p, s_, r, _ in env.unwrapped.P[s][a]])
                    for a in range(n_actions)]
            policy[s] = np.argmax(q_sa)
            if old_action != policy[s]:
                policy_stable = False
        if policy_stable:
            break
    return policy, V, iterations

In [6]:
def epsilon_greedy(Q, state, epsilon=0.1):
    if np.random.random() < epsilon:
        return np.random.randint(Q.shape[1])  # Random action
    else:
        return np.argmax(Q[state])

In [7]:
def monte_carlo(env, episodes=1000, gamma=0.9, epsilon=0.1):
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    Q = np.zeros((n_states, n_actions))
    N = np.zeros((n_states, n_actions))
    episode_lengths = [] 
    for episode in range(episodes):
        episode_states = []
        s, _ = env.reset()
        done = False
        steps = 0
        while not done:
            a = epsilon_greedy(Q, s, epsilon)
            s_, r, done, truncated, info = env.step(a)
            episode_states.append((s, a, r))
            s = s_
            steps += 1
        episode_lengths.append(steps)
        G = 0
        for t in reversed(range(len(episode_states))):
            s, a, r = episode_states[t]
            G = gamma * G + r
            N[s][a] += 1
            Q[s][a] += (G - Q[s][a]) / N[s][a]    
    policy = np.argmax(Q, axis=1)
    return policy, Q, episode_lengths

In [8]:
def q_learning(env, episodes=1000, alpha=0.1, gamma=0.9, epsilon=0.1):
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    Q = np.zeros((n_states, n_actions))
    episode_lengths = []    
    for episode in range(episodes):
        s, _ = env.reset()
        done = False
        steps = 0
        while not done:
            a = epsilon_greedy(Q, s, epsilon)
            s_, r, done, _, _ = env.step(a)
            a_ = np.argmax(Q[s_])
            Q[s][a] += alpha * (r + gamma * Q[s_][a_] - Q[s][a])
            s = s_
            steps += 1
        episode_lengths.append(steps)
    return np.argmax(Q, axis=1), Q, episode_lengths

In [9]:
def sarsa(env, episodes=1000, alpha=0.1, gamma=0.9, epsilon=0.1):
    Q = np.zeros((env.observation_space.n, env.action_space.n))
    episode_lengths = []  
    for episode in range(episodes):
        s, _ = env.reset()
        a = epsilon_greedy(Q, s, epsilon)
        done = False
        steps = 0
        while not done:
            s_, r, done, _, _ = env.step(a)
            a_ = epsilon_greedy(Q, s_, epsilon)
            Q[s][a] += alpha * (r + gamma * Q[s_][a_] - Q[s][a])
            s, a = s_, a_
            steps += 1
        episode_lengths.append(steps)
    return np.argmax(Q, axis=1), Q, episode_lengths

In [10]:
def evaluate_policy(env, policy, episodes=100):
    total_rewards = 0
    episode_lengths = []
    for _ in range(episodes):
        obs, _ = env.reset()
        done = False
        steps = 0
        episode_reward = 0
        while not done:
            action = policy[obs]
            obs, reward, done, truncated, info = env.step(action)
            episode_reward += reward
            steps += 1
        total_rewards += episode_reward
        episode_lengths.append(steps)
    return total_rewards / episodes, np.mean(episode_lengths)

In [11]:
st = time.time()
policy_vi, V_vi,iterations_vi = value_iteration(env)
et = time.time()
time_vi = et-st
avg_reward_vi, avg_length_vi = evaluate_policy(env, policy_vi)

In [12]:
st = time.time()
policy_pi, V_pi,iterations_pi = policy_iteration(env)
et = time.time()
time_pi = et-st
avg_reward_pi, avg_length_pi = evaluate_policy(env, policy_pi)

In [None]:
st = time.time()
policy_mc, Q_mc,episode_lengths_mc = monte_carlo(env)
et = time.time()
time_mc = et-st
avg_reward_mc, avg_length_mc = evaluate_policy(env, policy_mc)

In [None]:
st = time.time()
policy_q , Q_q,episode_lengths_q  = q_learning(env)
et = time.time()
time_q = et-st
avg_reward_q, avg_length_q = evaluate_policy(env, policy_q)

In [None]:
st = time.time()
policy_s , V_s,episode_lengths_s  = sarsa(env)
et = time.time()
time_s = et-st
avg_reward_s, avg_length_s = evaluate_policy(env, policy_s)

In [None]:
print("VI Avg Reward:", avg_reward_vi)
print("PI Avg Reward:", avg_reward_pi)
print("Montecarlo Avg Reward:", avg_reward_mc)
print("Q-learning Avg Reward:", avg_reward_q)
print("Sarsa Avg Reward:", avg_reward_s)


In [None]:
print("VI Convergence Taken:", time_vi)
print("PI Convergence Taken:", time_pi)
print("Montecarlo ConvergenceTaken:", time_mc)
print("Q-learning Convergence Taken:", time_q)
print("Sarsa Convergence Taken:", time_s)