In [22]:
custom_map_8x8 = [
    list("SFFFFFFF"),
    list("HFFFHFFF"),
    list("FFFFFFFH"),
    list("FFFFFFFF"),
    list("FFFFFFFF"),
    list("FFFFFFFF"),
    list("FHFFHFFF"),
    list("FFFFFFFF"),
    list("FFFFFFFF"),
    list("FFFFFFFG")
]
custom_map_50x50 = [
    list(row) for row in [
        "SFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFFHFFFFFFFFFFFHFFF",
        "FHFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFF",
        "FHFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFFFFFFFHFFFFFFFFFF",
        "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFFFFFFF",
        "FFHFHFFFFFFFHFFFHFFHFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
        "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFFF",
        "FFFFFFFFFHFFFFFFFFFFFFFFFFFFFFFFFFFFFHFHFFFFFFFFHF",
        "HFFFFFHFFFFFFFFFFFFFFFFHHHFFFFFFFFHFFFHFFFFFFFFFFF",
        "FFFFFHFFFHFFFFFFFFFFFFHHFFFFFFFFFFFFFFFFFHFFHFFHHF",
        "FFFFFFFFFFFFHFFHFFFFFFFFFFFFFFFFFFFFFFFFFFFFFHFFFF",
        "FFFFFFFFFFFFHFFFFFFFFFFFFHFFFFFFFFFFFFFFFFFFFFHFFF",
        "FFFFFFFFFFHFFFFFHFFFFFHFFFHFFFFFFFFFFFFFFFFHFFFFFF",
        "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFFFFFFFFFFFFFF",
        "FFFFFFFFFFFFHFFFFFFFFFFFFFFFHFFFFFHHFFFHFFFHFFFFFF",
        "FFHFFFFHFFFFFFFFFFFFFFHFFFFFFFFFFFFFFFFFHHFFFFFFHF",
        "FHFFFFHFFFHFFFFFHFFHFHFFFFFFFFFHFFFFFFHFFFFFHFFFFF",
        "FFFFFFFFFFHFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFHHFFF",
        "FFFFFFFHFFFFFFFHFFFFFFFFFHFHHFFFFFFFFFHFFFFFFFFFFF",
        "HFFFFFFFFHFFFFFHFFFFFFHFHFFFFFFFHHFFFFHFHFHFFHFFFF",
        "FFFFFHFFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFHFFFFFFFFFFF",
        "FFFFFHHFFFFFFFFFFFFFFFFHFFFFFFFFFFFFFFFFFFFFHFFFFH",
        "FFFFFFFFHFFFFFFHFFFFFFFFFFFFFFFFFFFFHFFFFFHFFFFHFH",
        "FFFHFFFFFFFFHFFFFHFFFFFFFFFFFFFFFFFFHFFFFFFFFFFHFF",
        "FFFFFFFFFFFFFFHFHFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFFFF",
        "FFFFFFFFFHFFFFFFFFHHFFFFHFFFFFFHHFHFFFFFFFFFFFFFFF",
        "FFFFFHFFFFHFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFHFFFFFF",
        "FFHFFHHFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
        "FFFFHFFFHFFFFFFFFFFFHFFFFFFHFFHFFFFFFFFFFFFFFFFHFF",
        "HFFFFFFFFFHHFHFFFFFFFFHFFFFFFFFFFFFFFFFFHFFFFFFFFF",
        "FFHFFFFFFFFFHFFHFHFFHFFFFFFHFFFFFFFFHFFFFFFFFFFFFF",
        "HFFFFFFFFFFFFFFFFFFFFHFFHHFFHFFFFFHFFFFFHFFFFFFHFF",
        "FFFFHFFFHFFFFFFFFHFFHFFFFFFFFFFFFFFFFFFFFFFFFHFFFF",
        "FFFFFFFHFFFFFFFFFFFFFHFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
        "FFFFFFFFFFFFFFFHFHFFFFFFHFHFFFFFFFHHFFFFFFFFFFFFFF",
        "FFFFFHFFFFHFFFFFFFFFFFFFFHHFFFFFFFFFFFFFFFFFFFFFFF",
        "FFFFFFFFFFFFHFFFFFFFFFFFFFHFFFHFFFFFFFFFFFFFFFFFFF",
        "FFFFFFHFHFFFFHFHFFFFFFFFFFHFFFFFFFFFFFFFFFFFFFFHFF",
        "HFFFFFFFFFFFFFFFFFFFFFFHFFFHFFFHHHFFHFFFFFFFHFFFFF",
        "FFFFFFFFFFFFHFFHFFFFFHFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
        "FFFFFFFFFFFFFFFHFFFFFFFFFFFFFFFHFFFFFFFFFFFFFHFFFF",
        "FFHHFFFHFFFFHFFFFFFFFFFFFFFFFFFFFFFFFFFFHFHFFHFFFF",
        "FFFFFHFFFFFFHFFFHHFFHHFFFHFFFFFHHFFFFHFFHFFFFFFFFH",
        "FFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFFFFFFFFFFFFFFFFFF",
        "FFHFFFFFHFHFFFFFFFFFFFHFFFFFHFFFFFFFFFFFHFFFFFFFFF",
        "FFFFFFHFFHFHHFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
        "FFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFFFFFFFFFFHFFFFFHHF",
        "FFHFFFHHFFFFHHFHHFFFFFFFFFFFFFHFFFFFFFFFFFFFFHFFFF",
        "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFHFHHFFFFFFFFFFFFH",
        "FFFFFFFFFHFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFH",
        "FHFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFFHFFFFG"
    ]
]

In [None]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import time

class CustomFrozenLakeEnv(gym.Env):
    def __init__(self, grid, slip_prob=0.2):
        super().__init__()
        self.map = grid
        self.nrow = len(self.map)
        self.ncol = len(self.map[0])
        self.nS = self.nrow * self.ncol
        self.nA = 4
        self.slip_prob = slip_prob

        self.action_space = spaces.Discrete(self.nA)
        self.observation_space = spaces.Discrete(self.nS)
        self.pos_to_state = lambda r, c: r * self.ncol + c
        self.state_to_pos = lambda s: (s // self.ncol, s % self.ncol)
        self.state = None
        self.P = self._build_transition_matrix()

    def _build_transition_matrix(self):
        P = {s: {a: [] for a in range(self.nA)} for s in range(self.nS)}
        for r in range(self.nrow):
            for c in range(self.ncol):
                s = self.pos_to_state(r, c)
                tile = self.map[r][c]
                for a in range(self.nA):
                    transitions = []
                    directions = [(0, -1), (1, 0), (0, 1), (-1, 0)]  # L, D, R, U

                    for i, (dr, dc) in enumerate(directions):
                        prob = 1 - self.slip_prob if i == a else self.slip_prob / 3.0
                        new_r = min(max(r + dr, 0), self.nrow - 1)
                        new_c = min(max(c + dc, 0), self.ncol - 1)
                        new_s = self.pos_to_state(new_r, new_c)
                        new_tile = self.map[new_r][new_c]
                        reward = 1.0 if new_tile == 'G' else -1.0 if new_tile == 'H' else -0.01
                        done = new_tile in "GH"
                        transitions.append((prob, new_s, reward, done))

                    P[s][a] = transitions
        return P

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.state = (0, 0)
        return self._get_obs(), {}

    def step(self, action):
        r, c = self.state
        directions = [(0, -1), (1, 0), (0, 1), (-1, 0)]  # L, D, R, U

        if np.random.rand() < self.slip_prob:
            action = np.random.choice([a for a in range(self.nA) if a != action])

        dr, dc = directions[action]
        r = min(max(r + dr, 0), self.nrow - 1)
        c = min(max(c + dc, 0), self.ncol - 1)
        self.state = (r, c)

        tile = self.map[r][c]
        reward = 1.0 if tile == 'G' else -1.0 if tile == 'H' else -0.01
        done = tile in "GH"
        return self._get_obs(), reward, done, False, {}

    def _get_obs(self):
        r, c = self.state
        return self.pos_to_state(r, c)
    
def evaluate_policy(env, policy, episodes=100):
    total_reward = 0
    total_steps = 0
    success_count = 0

    for _ in range(episodes):
        state = env.reset()[0]
        done = False
        steps = 0

        while not done:
            action = policy[state]
            state, reward, done, _, _ = env.step(action)
            steps += 1
            total_reward += reward
            if reward == 1.0:  # goal reached
                success_count += 1

        total_steps += steps

    avg_reward = total_reward / episodes
    avg_length = total_steps / episodes
    success_rate = success_count / episodes

    return  avg_length,avg_reward



def run_and_evaluate(algo_name, algo_func, env, episodes, is_model_based=False):
    start_time = time.time()
    if is_model_based:
        policy = algo_func(env)
        iterations = episodes
    else:
        Q = algo_func(env, episodes)
        policy = np.argmax(Q, axis=1)
        iterations = episodes
    duration = time.time() - start_time
    avg_reward, avg_length = evaluate_policy(env, policy, 100)
    print(f"{algo_name:<20} {duration:.4f}     {avg_length:.3f}         {avg_reward:.3f}   {iterations}")
    return policy



helper function

In [None]:
import time

def greedy_action(Q_s):
    return np.random.choice(np.flatnonzero(Q_s == Q_s.max()))

def get_policy(Q):
    return np.argmax(Q, axis=1)

def evaluate_policy(env, policy, episodes=1000):
    total_reward = 0
    total_length = 0
    for _ in range(episodes):
        state, _ = env.reset()
        done = False
        while not done:
            action = policy[state]
            state, reward, done, _, _ = env.step(action)
            total_reward += reward
            total_length += 1
    return total_length / episodes, total_reward / episodes



In [25]:
def monte_carlo(env, episodes, gamma=0.99, epsilon=0.1):
    Q = np.zeros((env.nS, env.nA))
    N = np.zeros((env.nS, env.nA))

    for ep in range(episodes):
        state, _ = env.reset()
        episode = []
        done = False
        while not done:
            action = env.action_space.sample() if np.random.rand() < epsilon else greedy_action(Q[state])
            next_state, reward, done, _, _ = env.step(action)
            episode.append((state, action, reward))
            state = next_state

        G = 0
        visited = set()
        for t in reversed(range(len(episode))):
            s, a, r = episode[t]
            G = gamma * G + r
            if (s, a) not in visited:
                visited.add((s, a))
                N[s, a] += 1
                Q[s, a] += (G - Q[s, a]) / N[s, a]
    return Q




In [26]:
def q_learning(env, episodes, gamma=0.99):
    Q = np.zeros((env.nS, env.nA))
    epsilon, alpha = 0.5, 0.5
    for ep in range(episodes):
        decay = 1 - ep / episodes
        e = epsilon * decay
        a = alpha * decay
        state, _ = env.reset()
        done = False
        while not done:
            action = env.action_space.sample() if np.random.rand() < e else greedy_action(Q[state])
            next_state, reward, done, _, _ = env.step(action)
            Q[state, action] += a * (reward + gamma * np.max(Q[next_state]) - Q[state, action])
            state = next_state
    return Q


In [27]:
def sarsa(env, episodes, gamma=0.99):
    Q = np.zeros((env.nS, env.nA))
    epsilon, alpha = 0.5, 0.5
    for ep in range(episodes):
        decay = 1 - ep / episodes
        e = epsilon * decay
        a = alpha * decay
        state, _ = env.reset()
        action = env.action_space.sample() if np.random.rand() < e else greedy_action(Q[state])
        done = False
        while not done:
            next_state, reward, done, _, _ = env.step(action)
            next_action = env.action_space.sample() if np.random.rand() < e else greedy_action(Q[next_state])
            Q[state, action] += a * (reward + gamma * Q[next_state, next_action] - Q[state, action])
            state, action = next_state, next_action
    return Q



In [None]:
def policy_iteration(env, gamma=0.99, theta=1e-5):
    policy = np.zeros(env.nS, dtype=int)
    V = np.zeros(env.nS)
    is_stable = False
    P = env.P

    while not is_stable:
        
        while True:
            delta = 0
            for s in range(env.nS):
                v = V[s]
                a = policy[s]
                V[s] = sum([p * (r + gamma * V[s_]) for p, s_, r, _ in P[s][a]])
                delta = max(delta, abs(v - V[s]))
            if delta < theta:
                break

        
        is_stable = True
        for s in range(env.nS):
            old_action = policy[s]
            q_vals = np.array([sum([p * (r + gamma * V[s_]) for p, s_, r, _ in P[s][a]]) for a in range(env.nA)])
            policy[s] = np.argmax(q_vals)
            if old_action != policy[s]:
                is_stable = False
    return policy



In [None]:
def value_iteration(env, gamma=0.99, theta=1e-5):
    V = np.zeros(env.nS)
    P = env.P
    while True:
        delta = 0
        for s in range(env.nS):
            v = V[s]
            V[s] = max([sum([p * (r + gamma * V[s_]) for p, s_, r, _ in P[s][a]]) for a in range(env.nA)])
            delta = max(delta, abs(v - V[s]))
        if delta < theta:
            break
    policy = np.zeros(env.nS, dtype=int)
    for s in range(env.nS):
        q_vals = np.array([sum([p * (r + gamma * V[s_]) for p, s_, r, _ in P[s][a]]) for a in range(env.nA)])
        policy[s] = np.argmax(q_vals)

    return policy

In [None]:
print(f"{'Algorithm':<20} {'Time (s)':<10}  {'Avg Reward':<12} {'Avg Ep Len':<12} {'Episodes'}")
print("-" * 70)
env = CustomFrozenLakeEnv(custom_map_8x8, slip_prob=0.2)
run_and_evaluate("Monte Carlo", monte_carlo, env, episodes=40000)
run_and_evaluate("SARSA", sarsa, env, episodes=2000)
run_and_evaluate("Q-learning", q_learning, env, episodes=2000)
run_and_evaluate("Policy Iteration", policy_iteration, env, episodes=14, is_model_based=True)
run_and_evaluate("Value Iteration", value_iteration, env, episodes=14, is_model_based=True)


Algorithm            Time (s)    Avg Reward   Avg Ep Len   Episodes
----------------------------------------------------------------------
Monte Carlo          9.6781     0.483         26.700   40000
SARSA                0.4597     0.639         21.080   2000
Q-learning           0.5138     0.631         21.910   2000
Policy Iteration     0.2494     0.340         16.950   14
Value Iteration      0.3088     0.542         18.830   14


array([2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 2, 2, 2, 2, 1, 1, 1, 1, 2, 2, 2,
       2, 2, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 2, 1, 2, 1, 1, 1, 2, 2,
       2, 2, 2, 2, 1, 1, 2, 2, 2, 2, 2, 2, 2, 1])

In [None]:
print(f"{'Algorithm':<20} {'Time (s)':<10}  {'Avg Reward':<12} {'Avg Ep Len':<12} {'Episodes'}")
print("-" * 70)

env = CustomFrozenLakeEnv(custom_map_50x50, slip_prob=0.2)

run_and_evaluate("Monte Carlo", monte_carlo, env, episodes=40000)
run_and_evaluate("SARSA", sarsa, env, episodes=2000)
run_and_evaluate("Q-learning", q_learning, env, episodes=2000)
run_and_evaluate("Policy Iteration", policy_iteration, env, episodes=14, is_model_based=True)
run_and_evaluate("Value Iteration", value_iteration, env, episodes=14, is_model_based=True)

Algorithm            Time (s)    Avg Reward   Avg Ep Len   Episodes
----------------------------------------------------------------------
Monte Carlo          4.5584     -1.379         38.860   40000
SARSA                1.3494     -2.154         116.410   2000
Q-learning           1.4390     -3.095         210.520   2000
Policy Iteration     17.6725     -1.097         102.680   14
Value Iteration      9.6555     -1.099         100.850   14


array([2, 2, 2, ..., 2, 2, 1])