# MLA202: Reinforcement Learning - Take-Home Practical

**Student ID:** 03240424
**Student Name:** Tshering dorji 

This notebook contains solutions for **Problem 1, Problem 2, and Problem 3**.


## Problem 1: Q-Learning on GridWorld (3 marks)
- Implement GridWorld environment
- Implement Q-learning algorithm
- Train agent and visualize rewards
- Show learned policy


In [1]:

import numpy as np
import matplotlib.pyplot as plt


# ------------------------------------------------------------
# Environment Definition
# ------------------------------------------------------------
class GridWorldEnv:
    """
    A compact 5×5 navigation environment.

    Layout:
        S . . . .
        . # . # .
        . . . # .
        . # . . .
        . . . . G

    S = (0,0)
    G = (4,4)
    Walls block movement.
    """

    ACTIONS = {
        0: (-1, 0),   # up
        1: (1, 0),    # down
        2: (0, -1),   # left
        3: (0, 1)     # right
    }

    def __init__(self):
        self.size = 5
        self.start_pos = (0, 0)
        self.goal_pos = (4, 4)

        self.blocked = {(1,1), (1,3), (2,3), (3,1)}

        # Precompute valid state coordinates
        coords = []
        for r in range(self.size):
            for c in range(self.size):
                if (r, c) not in self.blocked:
                    coords.append((r, c))
        self.valid_cells = coords
        self.state_count = len(coords)

        self.current = self.start_pos

    # ---------------------------
    # Core environment operations
    # ---------------------------
    def reset(self):
        self.current = self.start_pos
        return self._to_index(self.current)

    def step(self, action):
        dr, dc = self.ACTIONS[action]
        nr, nc = self.current[0] + dr, self.current[1] + dc

        # Out of map or wall
        if (
            nr < 0 or nr >= self.size or
            nc < 0 or nc >= self.size or
            (nr, nc) in self.blocked
        ):
            # Invalid move → penalty, remain in same location
            reward = -1
            new_cell = self.current
        else:
            new_cell = (nr, nc)

            if new_cell == self.goal_pos:
                reward = 10
                self.current = new_cell
                return self._to_index(new_cell), reward, True
            else:
                reward = -0.1

        self.current = new_cell
        terminal = self.current == self.goal_pos
        return self._to_index(new_cell), reward, terminal

    # ---------------------------
    # State/index conversion
    # ---------------------------
    def _to_index(self, cell):
        try:
            return self.valid_cells.index(cell)
        except ValueError:
            return -1

    def _from_index(self, idx):
        if 0 <= idx < self.state_count:
            return self.valid_cells[idx]
        return None


# ------------------------------------------------------------
# Q-Learning Agent
# ------------------------------------------------------------
def run_q_learning(env, episodes=1000, lr=0.15, disc=0.98, eps=0.12):
    """
    Perform Q-learning on the supplied GridWorld.

    Returns:
        Q-table, rewards list
    """

    num_actions = 4
    Q = np.zeros((env.state_count, num_actions))
    episode_returns = []

    for ep in range(episodes):
        s = env.reset()
        ep_reward = 0

        for _ in range(120):  # execution cap
            # ε-greedy behaviour
            if np.random.rand() < eps:
                a = np.random.randint(num_actions)
            else:
                a = np.argmax(Q[s])

            ns, r, done = env.step(a)

            # compute target
            best_next = np.max(Q[ns])
            target = r + disc * best_next

            # incremental update
            Q[s, a] += lr * (target - Q[s, a])

            ep_reward += r
            s = ns

            if done:
                break

        episode_returns.append(ep_reward)

    return Q, episode_returns


# ------------------------------------------------------------
# Visualization Utilities
# ------------------------------------------------------------
def display_policy(env, Q):
    arrows = {0:'↑', 1:'↓', 2:'←', 3:'→'}
    print("\nDerived Policy:")
    print("-" * 28)

    for r in range(env.size):
        row_symbols = []
        for c in range(env.size):
            if (r, c) == env.start_pos:
                row_symbols.append("S")
            elif (r, c) == env.goal_pos:
                row_symbols.append("G")
            elif (r, c) in env.blocked:
                row_symbols.append("#")
            else:
                idx = env._to_index((r, c))
                action = np.argmax(Q[idx])
                row_symbols.append(arrows[action])
        print("  ".join(row_symbols))
    print("-" * 28)


def plot_rewards(history):
    plt.figure(figsize=(12, 4))

    plt.plot(history, alpha=0.55)
    plt.title("Episode Reward Progression")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.grid(alpha=0.3)

    plt.tight_layout()
    plt.savefig("gridworld_rewards.png", dpi=300)
    plt.close()
    print("Reward curve saved as gridworld_rewards.png")


# ------------------------------------------------------------
# Agent Evaluation
# ------------------------------------------------------------
def evaluate(env, Q, trials=10):
    successes = 0
    steps_list = []

    for _ in range(trials):
        s = env.reset()
        counter = 0

        for _ in range(120):
            a = np.argmax(Q[s])
            s, r, done = env.step(a)
            counter += 1

            if done:
                successes += 1
                steps_list.append(counter)
                break

    print(f"Success: {successes}/{trials}")
    if steps_list:
        print(f"Mean steps to goal: {np.mean(steps_list):.1f}")


# ------------------------------------------------------------
# Driver
# ------------------------------------------------------------
if __name__ == "__main__":

    env = GridWorldEnv()

    Q, logs = run_q_learning(
        env,
        episodes=1000,
        lr=0.15,
        disc=0.98,
        eps=0.15
    )

    display_policy(env, Q)
    plot_rewards(logs)
    evaluate(env, Q, trials=20)

    print("\nQ-table Summary:")
    print("Shape:", Q.shape)
    print("Non-zero entries:", np.count_nonzero(Q))
    print("Largest Q value:", np.max(Q))
    print("Smallest Q value:", np.min(Q))



Derived Policy:
----------------------------
S  →  ↓  →  ↓
↓  #  ↓  #  ↓
→  →  ↓  #  ↓
↓  #  →  →  ↓
→  →  →  →  G
----------------------------
Reward curve saved as gridworld_rewards.png
Success: 20/20
Mean steps to goal: 8.0

Q-table Summary:
Shape: (21, 4)
Non-zero entries: 80
Largest Q value: 9.999999999999995
Smallest Q value: -0.5261204578470817


## Problem 2: Deep Q-Network (DQN) Completion (4 marks)

**Task:** Complete the DQN implementation to solve the CartPole-v1 environment.

- Define a neural network with 2 hidden layers of size 128 and ReLU activations.
- Implement a replay buffer for experience replay.
- Implement a DQN agent with epsilon-greedy action selection.
- Train the agent for 500 episodes.
- Plot rewards over episodes to show learning progress.
- Achieve an average reward > 400 over the last 100 episodes.

In [2]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
import matplotlib.pyplot as plt


# --------------------------------------------------------------------
# Neural Network Model for Q-value Approximation
# --------------------------------------------------------------------
class QNetwork(nn.Module):
    """Feed-forward neural network used to approximate Q(s,a)."""

    def __init__(self, input_dim, output_dim):
        super().__init__()

        self.layers = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )

    def forward(self, x):
        return self.layers(x)


# --------------------------------------------------------------------
# Experience Replay Buffer
# --------------------------------------------------------------------
class Memory:
    """Stores experience tuples for sampling during training."""

    def __init__(self, size):
        self.storage = deque(maxlen=size)

    def add(self, s, a, r, s_next, done):
        self.storage.append((s, a, r, s_next, done))

    def sample(self, n):
        batch = random.sample(self.storage, n)
        s, a, r, s_next, d = zip(*batch)

        return (
            np.array(s, dtype=np.float32),
            np.array(a, dtype=np.int64),
            np.array(r, dtype=np.float32),
            np.array(s_next, dtype=np.float32),
            np.array(d, dtype=np.float32)
        )

    def __len__(self):
        return len(self.storage)


# --------------------------------------------------------------------
# DQN Agent
# --------------------------------------------------------------------
class DeepQAgent:
    """Encapsulates policy selection, learning, and target network updates."""

    def __init__(self, state_dim, action_dim):
        self.n_actions = action_dim

        # Networks
        self.policy = QNetwork(state_dim, action_dim)
        self.target = QNetwork(state_dim, action_dim)
        self.target.load_state_dict(self.policy.state_dict())
        self.target.eval()

        # Hyperparameters
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_lower = 0.01
        self.epsilon_decay = 0.995
        self.batch_size = 64

        # Replay buffer
        self.memory = Memory(size=10000)

        # Optimizer
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-3)

    def choose_action(self, state, explore=True):
        """Epsilon-greedy policy."""
        if explore and np.random.rand() < self.epsilon:
            return np.random.randint(self.n_actions)

        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        with torch.no_grad():
            q_vals = self.policy(state)
        return q_vals.argmax().item()

    def learn(self):
        """Performs a gradient update if enough samples are available."""
        if len(self.memory) < self.batch_size:
            return None

        s, a, r, s_next, d = self.memory.sample(self.batch_size)

        s = torch.tensor(s)
        a = torch.tensor(a).unsqueeze(1)
        r = torch.tensor(r)
        s_next = torch.tensor(s_next)
        d = torch.tensor(d)

        # Current Q-values for actions taken
        q_pred = self.policy(s).gather(1, a).squeeze()

        # Target Q-values using target network
        with torch.no_grad():
            next_q = self.target(s_next).max(1)[0]
            q_target = r + (1 - d) * self.gamma * next_q

        loss = nn.MSELoss()(q_pred, q_target)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Epsilon decay
        if self.epsilon > self.epsilon_lower:
            self.epsilon *= self.epsilon_decay

        return loss.item()

    def sync_target(self):
        """Update target network parameters."""
        self.target.load_state_dict(self.policy.state_dict())


# --------------------------------------------------------------------
# Training Routine
# --------------------------------------------------------------------
def train_cartpole(num_episodes=500):
    env = gym.make("CartPole-v1")

    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    agent = DeepQAgent(state_dim, action_dim)

    episode_rewards = []
    losses = []

    print("=" * 60)
    print("Training Deep Q-Network on CartPole-v1")
    print("=" * 60)

    for ep in range(num_episodes):

        result = env.reset()
        state = result[0] if isinstance(result, tuple) else result

        total_reward = 0
        ep_losses = []

        done = False
        while not done:
            action = agent.choose_action(state, explore=True)
            step_result = env.step(action)

            # Support both new and old Gym APIs
            if len(step_result) == 5:
                next_state, reward, terminated, truncated, _ = step_result
                done = terminated or truncated
            else:
                next_state, reward, done, _ = step_result

            agent.memory.add(state, action, reward, next_state, done)

            loss = agent.learn()
            if loss is not None:
                ep_losses.append(loss)

            state = next_state
            total_reward += reward

        # Update target network periodically
        if (ep + 1) % 10 == 0:
            agent.sync_target()

        episode_rewards.append(total_reward)
        if ep_losses:
            losses.append(np.mean(ep_losses))

        # Status update
        if (ep + 1) % 50 == 0:
            avg100 = np.mean(episode_rewards[-100:]) if len(episode_rewards) >= 100 else np.mean(episode_rewards)
            print(f"Episode {ep+1}/{num_episodes}")
            print(f"  Latest Reward: {total_reward}")
            print(f"  100-episode Avg: {avg100:.2f}")
            print(f"  Epsilon: {agent.epsilon:.3f}")
            print("-"*40)

        # Solve condition
        if len(episode_rewards) >= 100 and np.mean(episode_rewards[-100:]) >= 475:
            print(f"\nEnvironment solved at episode {ep + 1}!")
            break

    env.close()
    plot_training(episode_rewards, losses)
    return agent, episode_rewards


# --------------------------------------------------------------------
# Plot Function
# --------------------------------------------------------------------
def plot_training(rewards, losses):
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    # Reward Plot
    axes[0].plot(rewards, label="Reward")
    if len(rewards) >= 100:
        rolling = [np.mean(rewards[max(0, i-99):i+1]) for i in range(len(rewards))]
        axes[0].plot(rolling, label="Moving Avg (100)", linewidth=2)
    axes[0].axhline(475, color="red", linestyle="--", label="Solved Threshold")
    axes[0].set_title("Episode Rewards")
    axes[0].set_xlabel("Episode")
    axes[0].set_ylabel("Reward")
    axes[0].grid(True)
    axes[0].legend()

    # Loss Plot
    if losses:
        axes[1].plot(losses, label="Loss")
        axes[1].set_title("Training Loss")
        axes[1].set_xlabel("Episode")
        axes[1].set_ylabel("Loss")
        axes[1].grid(True)
        axes[1].legend()

    plt.tight_layout()
    plt.savefig("dqn_cartpole_results.png", dpi=300)
    print("Plot saved as dqn_cartpole_results.png")
    plt.close()


# --------------------------------------------------------------------
# Agent Evaluation
# --------------------------------------------------------------------
def evaluate(agent, episodes=10):
    env = gym.make("CartPole-v1")
    scores = []

    print("\nEvaluating trained agent...")

    for i in range(episodes):
        result = env.reset()
        state = result[0] if isinstance(result, tuple) else result
        done = False
        total = 0

        while not done:
            action = agent.choose_action(state, explore=False)
            step_output = env.step(action)

            if len(step_output) == 5:
                state, reward, terminated, truncated, _ = step_output
                done = terminated or truncated
            else:
                state, reward, done, _ = step_output

            total += reward

        scores.append(total)
        print(f"Episode {i+1}: Reward = {total}")

    env.close()

    print("\nEvaluation Summary:")
    print(f"  Average Score: {np.mean(scores):.2f}")
    print(f"  Std Dev: {np.std(scores):.2f}")
    print(f"  Min/Max: {np.min(scores)} / {np.max(scores)}")


# --------------------------------------------------------------------
# Main
# --------------------------------------------------------------------
if __name__ == "__main__":
    model, rewards = train_cartpole(num_episodes=500)
    evaluate(model, episodes=10)
    print("\nTraining Complete.")

Training Deep Q-Network on CartPole-v1
Episode 50/500
  Latest Reward: 203.0
  100-episode Avg: 92.88
  Epsilon: 0.010
----------------------------------------
Episode 100/500
  Latest Reward: 128.0
  100-episode Avg: 152.42
  Epsilon: 0.010
----------------------------------------
Episode 150/500
  Latest Reward: 178.0
  100-episode Avg: 197.37
  Epsilon: 0.010
----------------------------------------
Episode 200/500
  Latest Reward: 19.0
  100-episode Avg: 206.48
  Epsilon: 0.010
----------------------------------------
Episode 250/500
  Latest Reward: 375.0
  100-episode Avg: 270.90
  Epsilon: 0.010
----------------------------------------
Episode 300/500
  Latest Reward: 281.0
  100-episode Avg: 291.11
  Epsilon: 0.010
----------------------------------------
Episode 350/500
  Latest Reward: 20.0
  100-episode Avg: 196.36
  Epsilon: 0.010
----------------------------------------
Episode 400/500
  Latest Reward: 136.0
  100-episode Avg: 122.68
  Epsilon: 0.010
----------------------

## Problem 3: Analysis and Debugging (3 marks)

**Task:** Fix the buggy Q-learning implementation for FrozenLake.

- Identify 4 bugs in the given code.
- Explain why each bug prevents learning.
- Provide the correct fix.
- Compare the performance of buggy vs fixed code.
- Achieve a success rate > 0.70 after 10,000 episodes.

In [3]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt


# ============================================================================
# BUG ANALYSIS (unchanged, but formatted cleanly)
# ============================================================================

"""
BUG 1: Q-table initialization
Impact: Possible dimension mismatch
Fix: Use Q = zeros((n_states, n_actions))

BUG 2: Action selection
Issue: np.argmin instead of np.argmax
Impact: Agent selects worst actions
Fix: Use np.argmax(Q[state])

BUG 3: Q-learning update rule
Issue: Missing learning rate
Fix: Use TD update: Q[s,a] += α (target - current)

BUG 4: No epsilon decay
Impact: Agent never exploits
Fix: Add epsilon = max(eps_min, eps * decay)
"""


# ============================================================================
# BUGGY VERSION (kept identical)
# ============================================================================

def buggy_train_frozenlake(episodes=10000):
    env = gym.make('FrozenLake-v1', is_slippery=True)

    n_states = env.observation_space.n
    n_actions = env.action_space.n

    Q = np.zeros((n_states, n_actions))

    alpha = 0.8
    gamma = 0.95
    epsilon = 0.3

    rewards = []

    for episode in range(episodes):
        result = env.reset()
        state = result[0] if isinstance(result, tuple) else result

        total_reward = 0
        done = False

        while not done:
            # BUG: picks WORST Q-values
            if np.random.random() < epsilon:
                action = np.random.randint(n_actions)
            else:
                action = np.argmin(Q[state])

            result = env.step(action)
            if len(result) == 5:
                next_state, reward, terminated, truncated, _ = result
                done = terminated or truncated
            else:
                next_state, reward, done, _ = result

            # BUG: Missing learning rate and TD update
            Q[state, action] = reward + gamma * np.max(Q[next_state])

            state = next_state
            total_reward += reward

        rewards.append(total_reward)

    print(f"Buggy version success rate: {np.mean(rewards[-100:]):.3f}")
    return Q, rewards


# ============================================================================
# FIXED VERSION (cleaner and more structured)
# ============================================================================

def fixed_train_frozenlake(episodes=10000):
    env = gym.make('FrozenLake-v1', is_slippery=True)

    n_states = env.observation_space.n
    n_actions = env.action_space.n

    print("FrozenLake Environment Summary")
    print(f"  States: {n_states}")
    print(f"  Actions: {n_actions}")
    print("-" * 60)

    # Initialize Q-table
    Q = np.zeros((n_states, n_actions))

    # Hyperparameters
    alpha = 0.1
    gamma = 0.99
    epsilon = 1.0
    epsilon_min = 0.01
    epsilon_decay = 0.9995

    rewards = []

    print("Training Hyperparameters")
    print(f"  α = {alpha}")
    print(f"  γ = {gamma}")
    print(f"  ε_initial = {epsilon}")
    print(f"  ε_decay = {epsilon_decay}")
    print("-" * 60)

    for episode in range(episodes):
        result = env.reset()
        state = result[0] if isinstance(result, tuple) else result

        total_reward = 0
        done = False
        steps = 0
        max_steps = 100

        while not done and steps < max_steps:

            # FIX: Correct epsilon-greedy
            if np.random.random() < epsilon:
                action = np.random.randint(n_actions)
            else:
                action = np.argmax(Q[state])

            result = env.step(action)
            if len(result) == 5:
                next_state, reward, terminated, truncated, _ = result
                done = terminated or truncated
            else:
                next_state, reward, done, _ = result

            # FIX: Proper Q-learning update
            current_q = Q[state, action]
            target_q = reward + gamma * np.max(Q[next_state])
            Q[state, action] = current_q + alpha * (target_q - current_q)

            state = next_state
            total_reward += reward
            steps += 1

        rewards.append(total_reward)

        # FIX: Apply epsilon decay
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        if (episode + 1) % 1000 == 0:
            print(f"Episode {episode+1}: "
                  f"Success(100ep)={np.mean(rewards[-100:]):.3f}, "
                  f"Epsilon={epsilon:.3f}")

    print("\nFinal success rate:", np.mean(rewards[-100:]))
    return Q, rewards


# ============================================================================
# COMPARISON WRAPPER
# ============================================================================

def compare_buggy_vs_fixed():
    print("="*60)
    print("PROBLEM 3: DEBUGGING Q-LEARNING")
    print("="*60)

    episodes = 5000

    print("\nRunning BUGGY version...")
    Q_buggy, rewards_buggy = buggy_train_frozenlake(episodes)

    print("\nRunning FIXED version...")
    Q_fixed, rewards_fixed = fixed_train_frozenlake(episodes)

    print("\nComparison Results")
    print("="*60)
    print(f"Buggy success rate  (last 100): {np.mean(rewards_buggy[-100:]):.3f}")
    print(f"Fixed success rate  (last 100): {np.mean(rewards_fixed[-100:]):.3f}")
    print(f"Total successes (buggy): {sum(rewards_buggy)}")
    print(f"Total successes (fixed): {sum(rewards_fixed)}")

    plot_comparison(rewards_buggy, rewards_fixed)

    print("\nPROBLEM 3 COMPLETE!")
    print("="*60)


# ============================================================================
# PLOTTING
# ============================================================================

def plot_comparison(rewards_buggy, rewards_fixed, window=100):
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))

    ma_buggy = [np.mean(rewards_buggy[max(0, i-window+1):i+1])
                for i in range(len(rewards_buggy))]
    ma_fixed = [np.mean(rewards_fixed[max(0, i-window+1):i+1])
                for i in range(len(rewards_fixed))]

    axes[0].plot(ma_buggy, label='Buggy', linewidth=2, alpha=0.7)
    axes[0].plot(ma_fixed, label='Fixed', linewidth=2, alpha=0.7)
    axes[0].set_title("Learning Curve Comparison")
    axes[0].set_xlabel("Episode")
    axes[0].set_ylabel("Success Rate")
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)

    final_buggy = np.mean(rewards_buggy[-100:])
    final_fixed = np.mean(rewards_fixed[-100:])

    axes[1].bar(['Buggy', 'Fixed'], [final_buggy, final_fixed],
                color=['red', 'green'], alpha=0.7)
    axes[1].set_title("Final Success Rate")
    axes[1].set_ylabel("Success Rate")
    axes[1].set_ylim([0, 1])
    axes[1].grid(True, alpha=0.3, axis='y')

    plt.tight_layout()
    plt.savefig("problem3_comparison.png", dpi=300)
    plt.close()
    print("Plot saved: problem3_comparison.png")


# ============================================================================
# MAIN
# ============================================================================

if __name__ == "__main__":
    compare_buggy_vs_fixed()


PROBLEM 3: DEBUGGING Q-LEARNING

Running BUGGY version...
Buggy version success rate: 0.000

Running FIXED version...
FrozenLake Environment Summary
  States: 16
  Actions: 4
------------------------------------------------------------
Training Hyperparameters
  α = 0.1
  γ = 0.99
  ε_initial = 1.0
  ε_decay = 0.9995
------------------------------------------------------------
Episode 1000: Success(100ep)=0.040, Epsilon=0.606
Episode 2000: Success(100ep)=0.160, Epsilon=0.368
Episode 3000: Success(100ep)=0.300, Epsilon=0.223
Episode 4000: Success(100ep)=0.410, Epsilon=0.135
Episode 5000: Success(100ep)=0.450, Epsilon=0.082

Final success rate: 0.45

Comparison Results
Buggy success rate  (last 100): 0.000
Fixed success rate  (last 100): 0.450
Total successes (buggy): 6
Total successes (fixed): 940
Plot saved: problem3_comparison.png

PROBLEM 3 COMPLETE!
