# Reinforcement Learning - Monte Carlo and TD learning

> In this notebook, we will be implementing Monte Carlo control and Temporal difference learning algorithms - Q learning and SARSA algorithms. We will be testing our implementation on FrozenLake environment and some custom versions we made. Finally we will compare and try to draw conclusions.

## Importing libs

In [37]:
import gymnasium as gym
from gymnasium.envs.registration import register
from gymnasium.envs.toy_text.frozen_lake import FrozenLakeEnv
from gymnasium import spaces
from tqdm import tqdm
import numpy as np
import time
import random
import pprint

## Timer decorator

In [38]:
def timer(func):
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        duration = end - start
        if args and hasattr(args[0], '__dict__'):
            setattr(args[0], f'{func.__name__}_time', duration)
        print(f"Function '{func.__name__}' took {duration:.4f} seconds")
        return result
    return wrapper

## Custom Environment



Here we are creating a new class for our custom environment with a 5*5 custom map and similar methods to the standard environments in the gymnasium package.

In [39]:
def generate_frozenlake_desc(size, hole_prob=0.2, seed=None):
    import random
    rng = random.Random(seed)
    # Initialize grid as frozen
    desc = [['F'] * size for _ in range(size)]
    # Set start and goal
    desc[0][0] = 'S'
    desc[size-1][size-1] = 'G'
    
    # Generate a monotonic path (right/down moves only)
    path = [(0, 0)]
    r, c = 0, 0
    while (r, c) != (size-1, size-1):
        choices = []
        if r < size-1:
            choices.append((r+1, c))
        if c < size-1:
            choices.append((r, c+1))
        r, c = rng.choice(choices)
        path.append((r, c))
    
    # Set holes for non-path cells (start/goal skipped automatically)
    path_set = set(path)
    for r in range(size):
        for c in range(size):
            if (r, c) not in path_set and rng.random() < hole_prob:
                desc[r][c] = 'H'  # Only set holes; 'F' is already default
    return desc

In [40]:
class CustomFrozenLakeEnv(gym.Env):
    def __init__(self, slippery=False):
        self.size = 5
        self.n_states = self.size * self.size
        self.n_actions = 4
        self.slippery = slippery
        
        # Define custom 50x50 map
        self.desc = np.array(generate_frozenlake_desc(50, hole_prob=0.2, seed=42), dtype='<U1')
        
        # Calculate positions
        self.start_pos = None
        self.goal_pos = []
        self.hole_pos = []
        for row in range(self.size):  ## idea was that there may be many goals but not doing that anymore
            for col in range(self.size):
                if self.desc[row, col] == 'S':
                    self.start_pos = (row, col)
                elif self.desc[row, col] == 'G':
                    self.goal_pos.append((row, col))
                elif self.desc[row, col] == 'H':
                    self.hole_pos.append((row, col))
        
        # first is row and second is coulumn
        self.actions = {
            0: (0, -1),    # Left
            1: (1, 0),     # Down
            2: (0, 1),     # Right
            3: (-1, 0)     # Up
        }
        
        # Build transition model
        self.P = self._make_transition_model()
        
        # Define spaces and state
        self.action_space = spaces.Discrete(self.n_actions)
        self.observation_space = spaces.Discrete(self.n_states)
        self.state = None

    def reset(self, seed=None, **kwargs):
        super().reset(seed=seed, **kwargs)
        self.state = self.pos_to_state(self.start_pos) #what;s the satte of the start position
        return self.state, {} # we ain't sending any info

    def step(self, action):
        current_pos = self.state_to_pos(self.state) # we are only storing state not position so this roundabout
        row, col = current_pos
        
        # if it is slippery then some stochastic flavour
        if self.slippery:
            action = self.np_random.choice([action, (action + 1) % 4, (action - 1) % 4])
        

        # calculus :)
        dr, dc = self.actions[action]
        new_row, new_col = row + dr, col + dc
        
        # Ensure within bounds
        new_row = np.clip(new_row, 0, self.size - 1)
        new_col = np.clip(new_col, 0, self.size - 1)
        new_pos = (new_row, new_col)
        new_state = self.pos_to_state(new_pos)
        
        # Check for hole or goal
        terminated = False
        reward = 0.0
        if new_pos in self.hole_pos:
            terminated = True
        elif new_pos in self.goal_pos:
            terminated = True
            reward = 1.0
        
        self.state = new_state #change state
        return new_state, reward, terminated, False, {}

    

    def _make_transition_model(self):
        P = {s: {a: [] for a in range(self.n_actions)} for s in range(self.n_states)} #initializing P[s][a] to an empty list
        for s in range(self.n_states):
            row, col = self.state_to_pos(s)
            # Check if current state is terminal
            current_pos = (row, col)
            if current_pos in self.hole_pos or current_pos in self.goal_pos:
                for a in range(self.n_actions):
                    # Terminal state: stay indefinitely with 0 reward
                    P[s][a] = [(1.0, s, 0.0, True)]
                continue  # Skip normal transition logic

            # Existing transition logic for non-terminal states
            for a in range(self.n_actions):
                outcomes = []
                actions_to_consider = ([a] if not self.slippery else [a, (a+1)%4, (a-1)%4])
                prob = 1.0 if not self.slippery else 1/3 # stochastic case
                for a2 in actions_to_consider:
                    #compute new state
                    dr, dc = self.actions[a2]
                    nr = np.clip(row + dr, 0, self.size - 1)
                    nc = np.clip(col + dc, 0, self.size - 1)
                    new_pos = (nr, nc)
                    ns = self.pos_to_state(new_pos)
                    #check and assign prob accordingly
                    done = new_pos in self.hole_pos or new_pos in self.goal_pos
                    reward = 1.0 if new_pos in self.goal_pos else 0.0
                    outcomes.append((prob, ns, reward, done))
                P[s][a] = outcomes
        return P

    # easy peasy
    def pos_to_state(self, pos):
        row, col = pos
        return row * self.size + col

    def state_to_pos(self, state):
        row = state // self.size
        col = state % self.size
        return (row, col)

This is another custom environment similar to FrozenLake with a 4*4 size except with a twist. The agent will be rewarded only when it collects key along its path to reach the goal. This forces the agent to adopt a particular route. Kind of like travelling in a traffic where you are required to visit a stop (say a petrol pump)

In [41]:
class ExpandedFrozenLakeEnv(gym.Env):
    
    def __init__(self, slippery=False):
        self.size = 4
        self.n_states_base = self.size * self.size
        self.n_states = self.n_states_base * 2  # Double for key status
        self.n_actions = 4
        self.slippery = slippery
        
        # Define 4x4 map with key
        self.desc = np.array([
            ['S', 'F', 'F', 'F'],
            ['F', 'H', 'F', 'K'],
            ['F', 'F', 'F', 'F'],
            ['H', 'F', 'F', 'G']
        ], dtype='<U1')
        
        # Identify key, start, goal, and hole positions
        self.start_pos = None
        self.goal_pos = []
        self.hole_pos = []
        self.key_pos = None
        for row in range(self.size): # same idea as for the above class
            for col in range(self.size):
                if self.desc[row, col] == 'S':
                    self.start_pos = (row, col)
                elif self.desc[row, col] == 'G':
                    self.goal_pos.append((row, col))
                elif self.desc[row, col] == 'H':
                    self.hole_pos.append((row, col))
                elif self.desc[row, col] == 'K':
                    self.key_pos = (row, col)
        
        self.actions = {
            0: (0, -1),   # Left
            1: (1, 0),    # Down
            2: (0, 1),    # Right
            3: (-1, 0)    # Up
        }

        self.P = self._make_transition_model()

        self.action_space = spaces.Discrete(self.n_actions)
        self.observation_space = spaces.Discrete(self.n_states)
        # Initialize state
        self.state = None
        self.has_key = None

    def reset(self, seed=None, **kwargs):
        super().reset(seed=seed, **kwargs)
        self.has_key = False
        self.state = self.pos_to_state(self.start_pos)
        return self.get_full_state(), {}

    def step(self, action):
        current_pos = self.state_to_pos(self.state)
        row, col = current_pos
        
        # act 
        if self.slippery:
            action = self.np_random.choice([action, (action + 1) % 4, (action - 1) % 4])
        
        dr, dc = self.actions[action]
        new_row, new_col = row + dr, col + dc
        
        # Ensure within bounds
        new_row = np.clip(new_row, 0, self.size - 1)
        new_col = np.clip(new_col, 0, self.size - 1)
        new_pos = (new_row, new_col)
        new_state = self.pos_to_state(new_pos)
        
        # Check if key is collected
        if new_pos == self.key_pos:
            self.has_key = True
        
        # Check for hole or goal
        terminated = False
        reward = 0.0
        if new_pos in self.hole_pos:
            terminated = True
        elif new_pos in self.goal_pos and self.has_key:
            terminated = True
            reward = 1.0
        
        self.state = new_state
        full_state = self.get_full_state()
        return full_state, reward, terminated, False, {}

    def _make_transition_model(self):
        n_base = self.n_states_base  # 16 for 4x4 grid
        P = {s: {a: [] for a in range(self.n_actions)} for s in range(self.n_states)} #initialize P[s][a] to an empty list
        
        for s in range(self.n_states):
            base_state = s % n_base
            has_key = (s >= n_base)
            row, col = self.state_to_pos(base_state)
            current_pos = (row, col)

            # Terminal states: hole OR goal with key
            if current_pos in self.hole_pos or (current_pos in self.goal_pos and has_key):
                for a in range(self.n_actions):
                    P[s][a] = [(1.0, s, 0.0, True)]
                continue #skipping normal logic for now

            for a in range(self.n_actions):
                outcomes = []
                actions_to_consider = [a] if not self.slippery else [a, (a+1)%4, (a-1)%4]
                prob = 1.0 if not self.slippery else 1.0/len(actions_to_consider)
                
                for a2 in actions_to_consider:
                    #compute new state
                    dr, dc = self.actions[a2]
                    nr = np.clip(row + dr, 0, self.size-1)
                    nc = np.clip(col + dc, 0, self.size-1)
                    new_pos = (nr, nc)
                    new_base_state = self.pos_to_state(new_pos)
                    
                    # Update key status
                    new_has_key = has_key
                    if not has_key and new_pos == self.key_pos:
                        new_has_key = True
                    
                    next_state = new_base_state + (n_base * int(new_has_key))
                    
                    # Check terminal conditions
                    terminated = False
                    reward = 0.0
                    if new_pos in self.hole_pos:
                        terminated = True
                    elif new_pos in self.goal_pos and new_has_key:
                        terminated = True
                        reward = 1.0
                    
                    outcomes.append((prob, next_state, reward, terminated))
                
                P[s][a] = outcomes
        
        return P

    #trivial stuff
    def pos_to_state(self, pos):
        row, col = pos
        return row * self.size + col

    def state_to_pos(self, state):
        row = state // self.size
        col = state % self.size
        return (row, col)

    def get_full_state(self):
        return self.state + (self.n_states_base * int(self.has_key))

### Registering these custom environmnts

In [42]:
# Register environments
gym.register(
    id="CustomFrozenLake-v1",
    entry_point=CustomFrozenLakeEnv,
    kwargs={'slippery': False},
)

gym.register(
    id="ExpandedFrozenLake-v1",
    entry_point=ExpandedFrozenLakeEnv,
    kwargs={'slippery': False},
)

gym.register(
    id="CustomFrozenLake-v1-slip",
    entry_point=CustomFrozenLakeEnv,
    kwargs={'slippery': True},
)

gym.register(
    id="ExpandedFrozenLake-v1-slip",
    entry_point=ExpandedFrozenLakeEnv,
    kwargs={'slippery': True},
)

We will iterate over these environment ids to test.

In [43]:
# env ids to test
env_ids = ['FrozenLake-v1','CustomFrozenLake-v1','ExpandedFrozenLake-v1','CustomFrozenLake-v1-slip','ExpandedFrozenLake-v1-slip']  # frozenlake-v1 is for the default 

## Monte Carlo Implementation

In [44]:
@timer
def monte_carlo(env, episodes=10000, alpha=0.1, discount=0.99, epsilon=0.1):
    """
    Monte Carlo control using first-visit method and epsilon-greedy policy.
    Returns Q table of state-action values.
    """
    n_actions = env.action_space.n
    n_states = env.observation_space.n

    

    Q = np.zeros((n_states, n_actions)) #q value function initialization

    for ep in range(episodes):
        state, _ = env.reset()
        done = False
        episode = []

        # generating an episode
        while not done:
            #exploration
            if random.random() < epsilon:
                action = env.action_space.sample() # choose any action randomly with a probability of epsilon
            else :
                best_actions = np.argwhere(Q[state] == np.max(Q[state])).flatten() #listof all actions with the max q return
                action = int(np.random.choice(best_actions)) #choose any one from them randomly

            # perform the chosen action
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            episode.append((state,action,reward))
            state = next_state

        G = 0
        visited = set()
        for t in range(len(episode)-1,-1,-1): # compute return backwards
            s, a, r = episode[t]
            G = discount*G + r # since we are traversing backwards we can discount without keeping track of the number of terms
            if (s, a) not in visited:
                visited.add((s, a))
                # Incremental update
                Q[s, a] += alpha * (G - Q[s, a])


    return Q



## Temporal Difference Implementation

A generic TD update for $Q(s_t, a_t)$ takes the form:

$$
Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[\text{Target} - Q(s_t, a_t)\right],
$$

where $\alpha$ is a step-size (learning rate), and Target is an estimate of the return just one-step ahead plus estimated future values.

- In **SARSA**, the target is:

$$
r_{t+1} + \gamma Q(s_{t+1}, a_{t+1}),
$$

using the next action $a_{t+1}$ actually chosen by the current policy (**on-policy**).

- In **Q-learning**, the target is:

$$
r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a'),
$$

using the best possible next action according to current $Q$ (**off-policy**, because it imagines following the greedy policy from the next state even if the       behavior policy actually explores).

Summing up:

- **SARSA update**:

$$
Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[r_{t+1} + \gamma Q(s_{t+1}, a_{t+1}) - Q(s_t, a_t)\right].
$$

- **Q-learning update**:

$$
Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t)\right].
$$

In both cases, during learning we select actions via an $\epsilon$-greedy policy over current $Q$: with probability $\epsilon$ choose a random action, else choose:

$$
\arg\max_a Q(s, a).
$$

This ensures exploration.


In [45]:
@timer
# -------------------------------------------------
# Q_LEARNING (On-Policy Temporal-Difference)
# -------------------------------------------------
def q_learning(env, episodes=1000, alpha=0.1, discount=0.99, epsilon=0.1):
    """
    Q-Learning algorithm with epsilon-greedy exploration.
    Returns Q value function
    """
    n_actions = env.action_space.n
    n_states = env.observation_space.n
    Q = np.zeros((n_states, n_actions)) #init q value functions for all pairs

    for ep in range(episodes):
        state, _ = env.reset()
        done = False

        
        while not done:
            # Epsilon-greedy action selection
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                best_actions = np.argwhere(Q[state] == np.max(Q[state])).flatten()
                action = int(np.random.choice(best_actions))
            
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Q-Learning update (off-policy)
            best_next = 0 if done else np.max(Q[next_state]) # choose the best q of the nest state for updating irrespective of our current policy (off policy)
            Q[state, action] += alpha * (reward + discount * best_next - Q[state, action])
            
            state = next_state
    return Q
    

In [46]:
@timer
# -------------------------------------------------
# SARSA (On-Policy Temporal-Difference)
# -------------------------------------------------
def sarsa(env, episodes=1000, alpha=0.1, discount=0.99, epsilon=0.1):
    """
    SARSA algorithm (on-policy TD control) with epsilon-greedy policy.
    Returns Q table of state-action values.
    """
    n_actions = env.action_space.n
    n_states = env.observation_space.n
    Q = np.zeros((n_states, n_actions)) #init q values for all state action pairs

    for ep in range(episodes):
        state, _ = env.reset()

        # Choose initial action (epsilon strategy)
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            best_actions = np.argwhere(Q[state] == np.max(Q[state])).flatten()
            action = int(np.random.choice(best_actions))

        
        done = False
        while not done:
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Choose next action (epsilon-greedy)
            if random.random() < epsilon:
                next_action = env.action_space.sample()
            else:
                best_actions = np.argwhere(Q[next_state] == np.max(Q[next_state])).flatten()
                next_action = int(np.random.choice(best_actions))
            
            # SARSA update (on-policy)
            Q[state, action] += alpha * (reward + discount * Q[next_state, next_action] * (not done) - Q[state, action])
            state, action = next_state, next_action
            
    return Q

## Policy Evaluation

Testing our policy on the environments and  printing the time for al the algorithms and the average return obtained

In [47]:
def evaluate_policy(env, Q, episodes=100, discount=1.0):
    """
    Evaluate a given policy derived from Q (greedy) by running episodes.
    Returns the average total (discounted) return.
    """
    total_return = 0.0
    for ep in range(episodes):
        state, _ = env.reset()
        done = False
        G = 0.0
        t = 0
        while not done:
            # Greedy action
            best_actions = np.argwhere(Q[state] == np.max(Q[state])).flatten()
            action = int(np.random.choice(best_actions))
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            G += (discount**t) * reward
            t += 1 # keeping track of the power to raise discount with
            state = next_state
        total_return += G
    avg_return = total_return / episodes
    return avg_return


## Results

In [48]:
for eid in env_ids:
    print(eid)
    env = gym.make(eid)
    # Train and evaluate each algorithm
    mc_Q = monte_carlo(env, episodes=50000, alpha=0.03, discount=0.99, epsilon=0.1)
    ql_Q = q_learning(env, episodes=50000, alpha=0.1, discount=0.99, epsilon=0.1)
    sa_Q = sarsa(env, episodes=50000, alpha=0.1, discount=0.99, epsilon=0.1)

    print("Average Return (MC):", evaluate_policy(env, mc_Q, episodes=1000, discount=0.99))
    print("Average Return (Q-Learning):", evaluate_policy(env, ql_Q, episodes=1000, discount=0.99))
    print("Average Return (SARSA):", evaluate_policy(env, sa_Q, episodes=1000, discount=0.99))
    print()


FrozenLake-v1
Function 'monte_carlo' took 28.1259 seconds
Function 'q_learning' took 41.7209 seconds
Function 'sarsa' took 34.3838 seconds
Average Return (MC): 0.5296274978890726
Average Return (Q-Learning): 0.5266961739987945
Average Return (SARSA): 0.5042581437466054

CustomFrozenLake-v1
Function 'monte_carlo' took 38.8958 seconds
Function 'q_learning' took 42.8572 seconds
Function 'sarsa' took 40.4023 seconds
Average Return (MC): 0.0
Average Return (Q-Learning): 0.0
Average Return (SARSA): 0.0

ExpandedFrozenLake-v1
Function 'monte_carlo' took 15.6120 seconds
Function 'q_learning' took 11.9723 seconds
Function 'sarsa' took 12.7915 seconds
Average Return (MC): 0.9509900499000175
Average Return (Q-Learning): 0.9509900499000175
Average Return (SARSA): 0.9509900499000175

CustomFrozenLake-v1-slip
Function 'monte_carlo' took 51.3769 seconds
Function 'q_learning' took 55.1089 seconds
Function 'sarsa' took 51.7960 seconds
Average Return (MC): 0.0
Average Return (Q-Learning): 0.0
Average Re