# Frozen Lake

## Utils

In [1]:
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/953.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m368.6/953.9 kB[0m [31m11.4 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m952.3/953.9 kB[0m [31m14.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1


In [2]:
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple
import gymnasium as gym
import random
import copy

### Sample an episode


function simulates a single episode of a game following the given policy pi

INPUTS


- pi: (16, 4) numpy array
    - pi[s, a] is probability of agent taking action a given he is in state s
    - for given s: we choose actions 0, 1, 2, 3 with probabilities pi[s, 0], pi[s, 1], pi[s, 2], pi[s, 3], respectively


OPUTPUTS:

    
- episode: named tuple that collects trajectory information {s0, a0, r0, ..., s_T, a_T, r_T}
  - single episode has form: episode(states=[0, 1, 2], actions=[1, 2, 3], rewards=[0, 0, 1], terminated=True, truncated=False)
    

In [5]:
episode = namedtuple('episode',
                     'states, actions, rewards, terminated, truncated')

In [6]:
def sample_episode(pi: np.ndarray = 0.25 * np.ones((16, 4)), is_slippery_bool: bool = True) -> episode:
    env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=is_slippery_bool)

    s, info = env.reset(seed=42)
    terminated, truncated = False, False
    states, actions, rewards = [], [], []
    while not truncated and not terminated:
        a = np.random.choice([0, 1, 2, 3], p=pi[s, :])
        s_new, r, terminated, truncated, _ = env.step(a)
        states.append(s)
        actions.append(a)
        rewards.append(r)
        s = s_new
    env.close()
    return episode(states, actions, rewards, terminated, truncated)

### Visualize policy and q value

In [7]:
def plot_result(matrix: np.ndarray, text: str):
    # Define actions and their corresponding arrow directions
    actions = ['left', 'down', 'right', 'up']
    dx = [-1, 0, 1, 0]
    dy = [0, -1, 0, 1]

    # Create a 4x4 plot
    fig, axs = plt.subplots(4, 4, figsize=(10, 10))
    fig.suptitle(f'{text}', ha='center', fontsize=16)

    # Iterate through the matrix and plot arrows
    for idx, cell in enumerate(matrix):
        i, j = divmod(idx, 4)
        max_prob = np.max(cell)
        for action, p in enumerate(cell):
            color = 'red' if p == max_prob else 'black'
            axs[i, j].arrow(0.5, 0.5, dx[action] * 0.2 * p, dy[action] * 0.2 * p, head_width=0.05, head_length=0.1,
                            fc=color, ec=color)
            axs[i, j].text(0.5 + dx[action] * 0.3, 0.5 + dy[action] * 0.3, f'{p:.2f}', color=color, fontsize=12)
        axs[i, j].set_xlim(0, 1)
        axs[i, j].set_ylim(0, 1)
        axs[i, j].axis('off')
    plt.show()

### Calculate mean return of a given policy

In [8]:
def calculate_mean_return(pi: np.ndarray,
                          N_runs: int = 50000, gamma: float = 0.9,
                          is_slippery_bool: bool = False):
    env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=is_slippery_bool)
    mean_return = 0
    s, _ = env.reset(seed=42)
    for n_run in range(N_runs):
        terminated, truncated = False, False
        discount_factor = 1
        while not terminated and not truncated:
            a = np.argmax(pi[s, :])
            new_s, r, terminated, truncated, _ = env.step(a)
            mean_return += discount_factor * r
            discount_factor *= gamma
            s = new_s
        s, _ = env.reset()
    env.close()
    mean_return /= N_runs
    return mean_return

### Convert policy from exporatory to greedy

In [10]:
def policy2greedy(pi: np.ndarray):
    greedy_policy = np.zeros_like(pi)
    for s in range(15):
        max_val = np.max(pi[s, :])
        greedy_actions=[]
        for a in range(4):
            if pi[s, a] == max_val:
                greedy_actions.append(a)
        greedy_policy[s, greedy_actions] = 1 / len(greedy_actions)
    return greedy_policy

# Algorithms

## Random policy

In [11]:
pi = 0.25 * np.ones((16, 4))

In [16]:
calculate_mean_return(pi=pi, N_runs=50000, gamma=1, is_slippery_bool=False)

0.0

In [17]:
calculate_mean_return(pi=pi, N_runs=50000, gamma=1, is_slippery_bool=True)

0.0

## Dynamic Programing

### Policy evaluation

In [57]:
def policy_evaluation(P: dict, pi: np.ndarray, gamma: float = .9, theta: float = 1e-8):
    v = np.zeros((16,))

    # in-place policy evaluation
    max_diff = 1e10
    while max_diff > theta:
        max_diff = 0
        for s in range(15):
            vs = 0
            for a in range(4):
                for prob, s_new, r, _ in P[s][a]:
                    vs += pi[s, a]* prob * (r + gamma * v[s_new])

            max_diff = max(abs(v[s] - vs), max_diff)
            v[s] = vs
    return v

In [58]:
def q_value(P: dict, v: np.ndarray, s: int, gamma: float = 1):
    q = np.zeros((4, ))
    for a in range(4):
        for prob, s_new, r, _ in P[s][a]:
            q[a] += prob * (r + gamma * v[s_new])
    return q

### Policy iteration

In [70]:
def policy_iteration(P: dict, pi: np.ndarray = 0.25 * np.ones((16, 4)),
                     theta: float = 1e-8, gamma: float = 0.9):
    converge = False
    q = np.zeros((16, 4))
    while not converge:
        # 1. policy evaluation
        v = policy_evaluation(P, pi, gamma=gamma, theta=theta)

        # 2. policy improvement
        pi_prim = np.zeros((16, 4))
        for s in range(15):
            q[s] = q_value(P, v, s, gamma)
            max_el = np.max(q[s])
            greedy_actions = []
            for a in range(4):
                if q[s][a] == max_el:
                    greedy_actions.append(a)
            pi_prim[s, greedy_actions] = 1 / len(greedy_actions)

        # 3. stop if pi converged
        if np.max(abs(policy_evaluation(P, pi)[0] - policy_evaluation(P, pi_prim)[0])) < theta * 1e2:
            converge = True

        # 4. Replace policy with new policy
        pi = copy.copy(pi_prim)

    pi = policy2greedy(pi)
    return pi, v, q

### Non-slipery Frozen Lake

In [78]:
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False)
P = env.P
env.close()

In [79]:
pi_dp, v_dp, q_dp = policy_iteration(P=P, theta = 1e-8, gamma = 0.9)

In [80]:
calculate_mean_return(pi=pi_dp, N_runs=50000, gamma = .9, is_slippery_bool=False)

0.5904899999995002

### Slipery Frozen Lake

In [81]:
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True)
P = env.P
env.close()

In [82]:
pi_dp, v_dp, q_dp = policy_iteration(P=P, theta = 1e-8, gamma = 0.9)

In [84]:
calculate_mean_return(pi=pi_dp, N_runs=50000, gamma = .9, is_slippery_bool=True)

0.06862379022899782

## Monte Carlo


In [85]:
def monte_carlo_control(N_episodes: int = 10, epsilon: float = 0.1, gamma: float = 0.9, is_slippery_bool: bool = True):
    q = np.zeros((16, 4))
    pi = .25 * np.ones((16, 4))

    for n_episode in range(1, N_episodes + 1):

        trajectory = sample_episode(pi=pi, is_slippery_bool=is_slippery_bool)

        G = 0
        for step in reversed(range(len(trajectory.states))):  # start from the last step
            s, a, r = trajectory.states[step], trajectory.actions[step], trajectory.rewards[step]

            first_visit = True
            for s_prev, a_prev in zip(trajectory.states[0:step-1], trajectory.actions[0:step-1]):
                if s_prev == s and a_prev == a:
                    first_visit = False

            if first_visit:
                G = r + gamma * G
                q[s, a] += (G - q[s, a]) / n_episode

                a_star = np.max(q[s, :])
                greedy_actions = []
                for i in range(4):
                    if q[s, i] == a_star:
                        greedy_actions.append(i)
                greedy_action = random.choice(greedy_actions)
                pi[s, :] = epsilon / 4
                pi[s, greedy_action] += 1 - epsilon
    pi = policy2greedy(pi=pi)
    return pi, q

### Monte Carlo Non-slippery Frozen Lake

In [86]:
q_mc, pi_mc = monte_carlo_control(N_episodes=10000, epsilon=0.1, is_slippery_bool=False)

In [87]:
calculate_mean_return(pi=pi_mc, N_runs=50000, gamma=0.9, is_slippery_bool=False)

0.5904900000000002

### Monte Carlo Slippery Frozen Lake

In [88]:
q_mc, pi_mc = monte_carlo_control(N_episodes=10000, epsilon=0.1, is_slippery_bool=True)

In [89]:
calculate_mean_return(pi=pi_mc, N_runs=50000, gamma=0.9, is_slippery_bool=True)

0.02308751716963706

## Sarsa and Expected Sarsa

In [92]:
def sarsa_control(expected_sarsa: bool = False,
                  N_episodes: int = 1000,
                  alpha: float = 0.1, epsilon: float = 0.1, gamma: float = 0.9,
                  is_slippery_bool: bool = False):
    q = np.zeros((16, 4))
    pi = .25 * np.ones((16, 4))
    env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=is_slippery_bool)
    s, info = env.reset(seed=42)

    for n_episode in range(1, N_episodes + 1):
        a = np.random.choice([0, 1, 2, 3], p=pi[s, :])
        terminated, truncated = False, False

        while not terminated and not truncated:
            # take action a and observe reward r, following state s_new (terminated, truncated)
            s_new, r, terminated, truncated, _ = env.step(a)

            # sample action a_new from s_new following current policy
            a_new = np.random.choice([0, 1, 2, 3], p=pi[s_new, :])

            if not expected_sarsa:
                q[s, a] += alpha * (r + gamma * q[s_new, a_new] - q[s, a])
            else:
                expected_q = 0
                for action in range(4):
                    expected_q += pi[s_new, action] * q[s_new, action]
                q[s, a] += alpha * (r + gamma * expected_q - q[s, a])

            # update policy pi for state s
            a_star = np.max(q[s, :])
            greedy_actions = []
            for i in range(4):
                if q[s, i] == a_star:
                    greedy_actions.append(i)
            greedy_action = random.choice(greedy_actions)
            pi[s, :] = epsilon / 4
            pi[s, greedy_action] += 1 - epsilon

            s = s_new
            a = a_new

        s, info = env.reset()

    env.close()
    pi = policy2greedy(pi)
    return pi, q

### SARSA Non-slippery Frozen Lake

In [93]:
q_sarsa, pi_sarsa = sarsa_control(expected_sarsa=False, N_episodes=10000, epsilon=0.1, is_slippery_bool=False)

In [94]:
calculate_mean_return(pi=pi_sarsa, N_runs=50000, gamma=0.9, is_slippery_bool=False)

0.5904900000000002

### SARSA Slippery Frozen Lake

In [95]:
q_sarsa, pi_sarsa = sarsa_control(expected_sarsa=False, N_episodes=10000, epsilon=0.1, is_slippery_bool=True)

In [96]:
calculate_mean_return(pi=pi_sarsa, N_runs=50000, gamma=0.9, is_slippery_bool=True)

0.06375893235768844

### Expected SARSA Non-slippery Frozen Lake

In [97]:
q_sarsa, pi_sarsa = sarsa_control(expected_sarsa=True, N_episodes=10000, epsilon=0.1, is_slippery_bool=False)

In [98]:
calculate_mean_return(pi=pi_sarsa, N_runs=50000, gamma=0.9, is_slippery_bool=False)

0.5904900000000002

### Expected SARSA Slippery Frozen Lake

In [99]:
q_sarsa, pi_sarsa = sarsa_control(expected_sarsa=True, N_episodes=10000, epsilon=0.1, is_slippery_bool=True)

In [100]:
calculate_mean_return(pi=pi_sarsa, N_runs=50000, gamma=0.9, is_slippery_bool=True)

0.06375893235768844

## Q-learning

In [101]:
def q_learning(N_episodes: int = 1000,
               alpha: float = 0.01, epsilon: float = 0.1, gamma: float = 0.9,
               is_slippery_bool: bool = False):
    q = np.zeros((16, 4))
    pi = .25 * np.ones((16, 4))
    env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=is_slippery_bool)
    s, info = env.reset(seed=42)

    for n_episode in range(1, N_episodes + 1):
        terminated, truncated = False, False

        while not terminated and not truncated:
            # sample action a from state s following policy pi
            a = np.random.choice([0, 1, 2, 3], p=pi[s, :])

            # take action a and observe reward r, following state s_new (terminated, truncated)
            s_new, r, terminated, truncated, _ = env.step(a)

            # update q[s, a]
            q_max = np.max(q[s_new, :])
            greedy_actions = []
            for action in range(4):
                if q[s_new, action] == q_max:
                    greedy_actions.append(action)
            a_new_star = random.choice(greedy_actions)
            q[s, a] = q[s, a] + alpha * (r + gamma * q[s_new, a_new_star] - q[s, a])

            # update policy pi for state s
            a_star = np.max(q[s, :])
            greedy_actions = []
            for action in range(4):
                if q[s, action] == a_star:
                    greedy_actions.append(action)
            greedy_action = random.choice(greedy_actions)
            pi[s, :] = epsilon / 4
            pi[s, greedy_action] += 1 - epsilon

            s = s_new

        s, info = env.reset()

    env.close()
    pi = policy2greedy(pi)
    return pi, q

### Q-Learning Non-slippery Frozen Lake

In [107]:
q_q, pi_q = q_learning(N_episodes=10000, epsilon=0.1, is_slippery_bool=False)

In [103]:
calculate_mean_return(pi=pi_q, N_runs=50000, gamma=0.9, is_slippery_bool=False)

0.5904900000000002

### Q-Learning Slippery Frozen Lake

In [114]:
q_q, pi_q = q_learning(N_episodes=10000, epsilon=0.4, is_slippery_bool=True)

In [115]:
calculate_mean_return(pi=pi_q, N_runs=50000, gamma=0.9, is_slippery_bool=True)

0.0483448778038524