In [1]:
import gymnasium as gym
import numpy as np
import tensorflow.keras
import random
from matplotlib import pyplot as plt
from ipywidgets import interact, IntSlider
import math
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tqdm import tqdm

## Helper Functions

We'll define a helper function to visualize the agent's performance and functions to handle training and evaluation to keep the code modular.

In [2]:
def show_video(images, captions):
  """ Show a sequence of images as an interactive plot. """
  if not images:
      print("No images to display.")
      return
  def f(i):
    plt.imshow(images[i])
    plt.title(captions[i])
    plt.axis('off')
    plt.show()
  interact(f, i=IntSlider(min=0, max=len(images)-1, step=1, value=0))

def train_q_table(env, num_steps, lr, gamma, decay_rate, seed):
    """ Trains a Q-table for the given environment. """
    # Set seeds for reproducibility
    random.seed(seed)
    np.random.seed(seed)
    env.action_space.seed(seed)

    # Initialize Q-table
    q_table = np.zeros((env.observation_space.n, env.action_space.n))

    # Reset environment, seeding it for a reproducible training sequence
    state, _ = env.reset(seed=seed)

    # Training loop
    for step in range(num_steps):
        # Epsilon-greedy action selection
        epsilon = math.exp(-decay_rate * step)
        if random.uniform(0, 1) < epsilon:
            action = env.action_space.sample()  # Explore
        else:
            action = np.argmax(q_table[state, :])  # Exploit

        # Take action
        next_state, reward, done, _, _ = env.step(action)

        # Calculate target y based on Bellman equation
        if done:
            y = reward
        else:
            y = reward + gamma * np.max(q_table[next_state, :])

        # Update Q-table
        q_table[state, action] = q_table[state, action] + lr * (y - q_table[state, action])

        # Update state
        if done:
            state, _ = env.reset() # Reset for the next episode
        else:
            state = next_state

    return q_table

def evaluate_policy(env, q_table, num_trials, seed):
    """ Evaluates a Q-table policy over a number of trials. """
    success_count = 0
    max_steps_per_trial = 100  # Prevent infinite loops in non-optimal policies

    # Seed the environment's RNG to make the sequence of trials reproducible.
    # This reset also provides the starting state for the first trial.
    state, _ = env.reset(seed=seed)
    env.action_space.seed(seed)

    for i in range(num_trials):
        # For subsequent trials, just reset the state to the beginning.
        # This will use the same, seeded RNG to produce the next episode in the sequence.
        if i > 0:
            state, _ = env.reset()

        done = False
        steps = 0
        while not done and steps < max_steps_per_trial:
            action = np.argmax(q_table[state, :])
            state, reward, done, _, _ = env.step(action)
            steps += 1

        if done and reward == 1:
            success_count += 1

    return success_count / num_trials

## Non-Slippery Frozen Lake

First, we'll learn a Q-table for the deterministic version (`is_slippery=False`). In this environment, the agent's actions always succeed as intended.

In [3]:
# Global seed for the notebook
seed = 1234

# Environment setup
env_non_slippery = gym.make("FrozenLake-v1", is_slippery=False, render_mode='rgb_array')

### Q-Learning Algorithm

We implement the Q-learning algorithm as described in the pseudo-code.

In [4]:
# Q-learning parameters
num_steps = 2000
lr = 0.01
gamma = 0.99
decay_rate = 0.0001

# Train the Q-table
q_table_non_slippery = train_q_table(
    env=env_non_slippery,
    num_steps=num_steps,
    lr=lr,
    gamma=gamma,
    decay_rate=decay_rate,
    seed=seed
)

print("Q-table learned for non-slippery environment.")

Q-table learned for non-slippery environment.


### Testing the Learned Policy

Now we test the policy learned by the Q-table. Since the environment is deterministic and the Q-table is well-trained, the agent should find the optimal path to the goal every time.

In [5]:
# Test the policy
state, _ = env_non_slippery.reset(seed=seed)
done = False
max_test_steps = 10000
renders = []
captions = []
step_count = 0

# Initial state
renders.append(env_non_slippery.render())
captions.append(f'Step: {step_count} State: {state} Action: N/A Reward: N/A')

while not done and step_count < max_test_steps:
    step_count += 1
    # Choose action with max utility from the Q-table
    action = np.argmax(q_table_non_slippery[state, :])

    # Apply action
    next_state, reward, done, _, _ = env_non_slippery.step(action)

    # Render and store frame for video
    renders.append(env_non_slippery.render())
    captions.append(f'Step: {step_count} State: {next_state} Action: {action} Reward: {reward}')

    # Update state
    state = next_state

if done and reward == 1:
    print(f"Success! Reached the goal in {step_count} steps.")
else:
    print(f"Failure. Did not reach the goal after {step_count} steps.")

# Display the successful run
show_video(renders, captions)
env_non_slippery.close()

Success! Reached the goal in 6 steps.


interactive(children=(IntSlider(value=0, description='i', max=6), Output()), _dom_classes=('widget-interact',)…

## Slippery Frozen Lake

Now we will learn a Q-table for the slippery version of Frozen Lake (`is_slippery=True`), where actions do not always have the intended effect. This makes the problem significantly harder.

In [6]:
# Environment setup for the slippery version
env_slippery = gym.make("FrozenLake-v1", is_slippery=True, render_mode='rgb_array')

### Training and Evaluation (2,000 steps)

First, we train with the same parameters as the non-slippery case (2,000 steps). We expect the performance to be poor due to the environment's stochasticity and the short training time.

In [7]:
# Train with 2,000 steps
q_table_slippery_2k = train_q_table(
    env=env_slippery,
    num_steps=2000,
    lr=0.01,
    gamma=0.99,
    decay_rate=0.0001,
    seed=seed
)

# Evaluate the policy over 1000 trials
success_rate_2k = evaluate_policy(env_slippery, q_table_slippery_2k, num_trials=1000, seed=seed)

print(f"Success rate over 1000 trials (2,000 training steps): {success_rate_2k:.2%}")

Success rate over 1000 trials (2,000 training steps): 0.00%


### Training and Evaluation (200,000 steps)

To improve performance, we drastically increase the number of training steps to 200,000 and use a slower epsilon decay rate to encourage more exploration.

In [8]:
# Train with 200,000 steps and a slower decay rate
num_steps_200k = 200000
decay_rate_slower = 0.00001

q_table_slippery_200k = train_q_table(
    env=env_slippery,
    num_steps=num_steps_200k,
    lr=0.01,
    gamma=0.99,
    decay_rate=decay_rate_slower,
    seed=seed
)

# Evaluate the new policy
success_rate_200k = evaluate_policy(env_slippery, q_table_slippery_200k, num_trials=1000, seed=seed)
env_slippery.close()

print(f"Success rate over 1000 trials (200,000 training steps): {success_rate_200k:.2%}")

Success rate over 1000 trials (200,000 training steps): 71.80%


### Why did the success rate improve?

The success rate improved significantly for two main reasons:

1.  **More Training Steps:** The slippery environment is stochastic, meaning the same action in the same state can lead to different next states. A small number of training steps (like 2,000) is insufficient for the agent to experience the full range of possible outcomes for each state-action pair. By increasing the training steps to 200,000, the agent gathers a much larger and more representative sample of transitions. This allows the Q-values to converge more closely to their true expected values, which properly account for the environment's randomness.

2.  **Slower Epsilon Decay:** The decay rate for epsilon was reduced from 0.0001 to 0.00001. A slower decay means that the `epsilon` value stays higher for longer, forcing the agent to perform more random (exploratory) actions throughout the extended training period. In a stochastic environment, thorough exploration is critical to avoid settling on a suboptimal policy that might seem good based on early, lucky outcomes. The prolonged exploration ensures the agent discovers more robust paths to the goal that are less susceptible to the environment's slipperiness.

In summary, the combination of significantly more experience and a more patient exploration strategy allowed the agent to build a much more accurate and robust model of the stochastic environment, leading to a higher success rate.

# Part 2: n-step Deep Q-Learning (Cart Pole)

In this part, we implement an n-step Deep Q-Learning agent to solve the CartPole-v1 environment. We will use a Keras neural network to approximate the Q-function. The agent will be trained by running full episodes and then performing a batch update on the network using the collected experience.

## Setting up the Environment and Model

First, we set the seed for reproducibility across all relevant libraries. Then we define a function to create our Q-network. The network will be a simple multi-layer perceptron (MLP) with one hidden layer, as specified in the instructions.

In [9]:
# Configuration parameters
seed = 1234
gamma = 0.99  # Discount factor for past rewards

# Set seeds
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

# Create the environment
env = gym.make("CartPole-v1", render_mode='rgb_array')
env.reset(seed=seed)
env.action_space.seed(seed)

num_actions = env.action_space.n
state_shape = env.observation_space.shape

def create_q_model():
    """Creates a Keras model for the Q-network."""
    model = keras.Sequential([
        layers.InputLayer(input_shape=state_shape),
        layers.Dense(32, activation="relu"),
        layers.Dense(num_actions, activation="linear") # Linear activation for Q-values
    ])
    return model

## N-step Q-Learning Implementation

Here is the core of the n-step Q-learning algorithm. We will run for a specified number of episodes. In each episode, we collect states, actions, and rewards. At the end of the episode, we calculate the discounted cumulative rewards (also called returns) and use them as targets to update our Q-network in a single batch.

In [10]:
def train_dql(num_episodes, decay_rate, learning_rate):
    """Trains a Deep Q-Learning model on the CartPole environment."""
    q_network = create_q_model()
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
    mse_loss = tf.keras.losses.MeanSquaredError()

    global_step_counter = 0
    episode_rewards_history = []

    # Maximum possible reward for normalization, as per instructions.
    # The environment caps episodes at 500 steps.
    max_possible_reward = 500.0

    print("Starting training...")
    pbar = tqdm(range(num_episodes))
    for episode in pbar:
        # ---- 1. Collect experience by playing one episode ----
        state, _ = env.reset()
        episode_states = []
        episode_actions = []
        episode_rewards = []
        done = False

        while not done:
            global_step_counter += 1

            # Epsilon-greedy action selection
            epsilon = math.exp(-decay_rate * global_step_counter)
            if random.uniform(0, 1) < epsilon:
                action = env.action_space.sample()  # Explore
            else:
                q_values = q_network(tf.expand_dims(state, 0))
                action = tf.argmax(q_values[0]).numpy()  # Exploit

            # Apply action
            next_state, reward, done, _, _ = env.step(action)

            # Store experience
            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)

            state = next_state

        episode_rewards_history.append(len(episode_rewards))

        # ---- 2. Calculate discounted cumulative rewards (returns) ----
        n_steps = len(episode_rewards)
        returns = np.zeros_like(episode_rewards, dtype=np.float32)
        running_return = 0.0
        # Iterate backwards from the last step
        for i in reversed(range(n_steps)):
            running_return = episode_rewards[i] + gamma * running_return
            returns[i] = running_return

        # ---- 3. Normalize returns ----
        returns_normalized = returns / max_possible_reward

        # ---- 4. Prepare data for batch update ----
        states_tensor = tf.convert_to_tensor(episode_states, dtype=tf.float32)
        actions_tensor = tf.convert_to_tensor(episode_actions, dtype=tf.int32)
        returns_tensor = tf.convert_to_tensor(returns_normalized, dtype=tf.float32)

        # Create indices for tf.gather_nd to select the Q-values of actions taken
        action_indices = tf.stack([tf.range(n_steps, dtype=tf.int32), actions_tensor], axis=1)

        # ---- 5. Update the Q-network ----
        with tf.GradientTape() as tape:
            # Predict Q-values for all actions for the states in the episode
            all_q_values = q_network(states_tensor)
            # Select the Q-values for the actions that were actually taken
            action_q_values = tf.gather_nd(all_q_values, action_indices)
            # Calculate the loss between the predicted Q-values and the calculated returns
            loss = mse_loss(returns_tensor, action_q_values)

        # Calculate gradients and update the model
        grads = tape.gradient(loss, q_network.trainable_variables)
        optimizer.apply_gradients(zip(grads, q_network.trainable_variables))

        # Print progress
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(episode_rewards_history[-100:])
            pbar.set_description(f"Episode {episode + 1}/{num_episodes}, Avg Reward (last 100): {avg_reward:.2f}, Epsilon: {epsilon:.4f}")

    print("\nTraining finished.")
    return q_network, episode_rewards_history

def evaluate_policy_dql(env, q_network, num_trials=100):
    """Evaluates the performance of a trained DQL policy."""
    print(f"\nEvaluating policy over {num_trials} trials...")
    total_steps = 0
    success_count = 0
    all_steps = []

    for _ in range(num_trials):
        state, _ = env.reset()
        done = False
        steps = 0
        while not done:
            q_values = q_network(tf.expand_dims(state, 0))
            action = tf.argmax(q_values[0]).numpy()
            state, _, done, _, _ = env.step(action)
            steps += 1

        all_steps.append(steps)
        total_steps += steps
        if steps >= 200:
            success_count += 1

    avg_steps = total_steps / num_trials
    success_rate = success_count / num_trials

    print(f"Average steps per episode: {avg_steps:.2f}")
    print(f"Success rate (>= 200 steps): {success_rate:.2%}")
    return avg_steps, success_rate

### Experiment 1: Standard Training

We train for 3000 episodes with a slow epsilon decay rate. This gives the agent ample time to explore the environment and learn a robust policy.

In [11]:
# Experiment 1: Parameters from instructions
num_episodes_exp1 = 3000
decay_rate_exp1 = 0.00001
learning_rate_exp1 = 0.001

trained_model_exp1, _ = train_dql(num_episodes_exp1, decay_rate_exp1, learning_rate_exp1)
evaluate_policy_dql(env, trained_model_exp1)



Starting training...


Episode 3000/3000, Avg Reward (last 100): 113.36, Epsilon: 0.1610: 100%|██████████| 3000/3000 [01:31<00:00, 32.95it/s]



Training finished.

Evaluating policy over 100 trials...
Average steps per episode: 269.60
Success rate (>= 200 steps): 96.00%


(269.6, 0.96)

### Experiment 2: Rapid Decay and Short Training

Now, we drastically change the parameters. We train for only 100 episodes and use a much faster epsilon decay rate.

In [12]:
# Experiment 2: Modified parameters
num_episodes_exp2 = 100
decay_rate_exp2 = 0.001
learning_rate_exp2 = 0.001

trained_model_exp2, _ = train_dql(num_episodes_exp2, decay_rate_exp2, learning_rate_exp2)
evaluate_policy_dql(env, trained_model_exp2)

env.close()

Starting training...


Episode 100/100, Avg Reward (last 100): 18.09, Epsilon: 0.1638: 100%|██████████| 100/100 [00:01<00:00, 74.03it/s]



Training finished.

Evaluating policy over 100 trials...
Average steps per episode: 15.80
Success rate (>= 200 steps): 0.00%


### Why did the performance drop so drastically in Experiment 2?

The performance plummeted in the second experiment due to two interconnected factors:

1.  **Premature Exploitation:** The `decay_rate` was increased from `0.00001` to `0.001`. This causes the exploration probability, `epsilon`, to decrease very rapidly. The agent stops taking random actions and starts exploiting its learned knowledge far too early. Since it has only been trained for a few episodes, its "knowledge" is based on a tiny, unrepresentative sample of the environment. It likely latches onto a poor, short-sighted strategy and never explores enough to find the better, long-term solution required to balance the pole for 200+ steps.

2.  **Insufficient Training Data:** Training for only 100 episodes is not enough time for the neural network to learn the complex dynamics of the CartPole environment. In the early stages, episodes are very short as the agent's policy is essentially random. With only 100 short episodes, the total number of (state, action, reward) samples collected is extremely small. The Q-network cannot generalize from such sparse data and fails to learn a meaningful policy.

In essence, the second experiment combines the worst of both worlds: it forces the agent to commit to a strategy before it has had a chance to explore (due to rapid epsilon decay) and it doesn't provide enough experience for any strategy it learns to be a good one (due to the low number of episodes). The first experiment succeeded because the long training duration and slow decay rate allowed for a healthy balance between exploration and exploitation, which is crucial for effective reinforcement learning.