In [96]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import matplotlib.pyplot as plt
from collections import deque
import ale_py
import os

In [97]:
gym.register_envs(ale_py)

Neural Network Q-function Approximator

In [75]:
class QNetwork(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim=256, n_layers=2):
        super(QNetwork, self).__init__()
        layers = []
        last_dim = input_dim
        # Create hidden layers
        for i in range(n_layers - 1):
            layers.append(nn.Linear(last_dim, hidden_dim))
            layers.append(nn.ReLU())
            last_dim = hidden_dim
        # Output layer
        layers.append(nn.Linear(last_dim, output_dim))
        self.model = nn.Sequential(*layers)
    #     self.initialize_weights()
    # 
    # def initialize_weights(self):
    #     # Initialize all Linear layers uniformly between -0.001 and 0.001
    #     for m in self.model:
    #         if isinstance(m, nn.Linear):
    #             nn.init.xavier_uniform_(m.weight)
    #             nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.model(x)

# replay buffer
class ReplayBuffer:
    def __init__(self, capacity=1000000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states),
                np.array(actions),
                np.array(rewards),
                np.array(next_states),
                np.array(dones))

    def __len__(self):
        return len(self.buffer)

# epsilon greedy action selection
def epsilon_greedy_action(q_network, state, epsilon, n_actions):
    if random.random() < epsilon:
        return random.randrange(n_actions)
    else:
        state_tensor = torch.FloatTensor(np.array(state)).unsqueeze(0).to(device)

        with torch.no_grad():
            q_values = q_network(state_tensor)
        return q_values.argmax().item()


Compute expected q value for expexted sarsa

In [76]:
def compute_expected_q(q_network, next_state, epsilon, n_actions):
    next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0).to(device)
    with torch.no_grad():
        q_values = q_network(next_state_tensor).squeeze(0)
    # Determine the greedy action
    max_action = q_values.argmax().item()
    # Epsilon-greedy probabilities: probability for greedy action gets extra mass
    probs = np.ones(n_actions) * (epsilon / n_actions)
    probs[max_action] += (1 - epsilon)
    expected_q = (probs * q_values.cpu().numpy()).sum()
    return expected_q

Single-episode training function that can handle training w/ or w/o replay_buffer

In [77]:
def train_episode(env, q_network, optimizer, gamma, epsilon, algorithm, max_steps, replay_buffer=None, batch_size=32):
    total_reward = 0.0
    state,_ = env.reset()
    done = False
    steps = 0  # Step counter

    while not done and steps < max_steps:
        steps += 1
        action = epsilon_greedy_action(q_network, state, epsilon, env.action_space.n)
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        total_reward += reward

        # Update logic remains unchanged ...
        if replay_buffer is None:
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
            q_val = q_network(state_tensor)[0, action] # gives the q value of the e-greedy action that we preciously selected
            if done:
                target = reward
            else:
                next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0).to(device) # convert next state array to tensor
                with torch.no_grad():
                    q_next = q_network(next_state_tensor) # predicts Q values for all action in the next state
                if algorithm == 'q_learning':
                    target = reward + gamma * q_next.max().item()  # gets the maximum out of those q values for the next state
                elif algorithm == 'expected_sarsa':
                    n_actions = env.action_space.n
                    q_next = q_next.squeeze(0)
                    max_action = q_next.argmax().item()
                    probs = np.ones(n_actions) * (epsilon / n_actions)
                    probs[max_action] += (1 - epsilon)
                    expected_value = (probs * q_next.cpu().numpy()).sum()
                    target = reward + gamma * expected_value
                else:
                    raise ValueError("Unknown algorithm")
            target = torch.tensor(target).to(device) # converts scalar value into tensor
            loss = (q_val - target) ** 2

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        else:
            replay_buffer.push(state, action, reward, next_state, done)
            if len(replay_buffer) >= batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
                states_tensor = torch.FloatTensor(states).to(device)
                actions_tensor = torch.LongTensor(actions).unsqueeze(1).to(device)
                rewards_tensor = torch.FloatTensor(rewards).to(device)
                next_states_tensor = torch.FloatTensor(next_states).to(device)
                dones_tensor = torch.FloatTensor(dones).to(device)

                q_values = q_network(states_tensor).gather(1, actions_tensor).squeeze(1)
                with torch.no_grad():
                    q_next = q_network(next_states_tensor)
                if algorithm == 'q_learning':
                    target_q = rewards_tensor + gamma * (1 - dones_tensor) * q_next.max(1)[0]
                elif algorithm == 'expected_sarsa':
                    n_actions = env.action_space.n
                    expected_qs = []
                    for i in range(batch_size):
                        q_vals = q_next[i]
                        max_action = q_vals.argmax().item()
                        probs = np.ones(n_actions) * (epsilon / n_actions)
                        probs[max_action] += (1 - epsilon)
                        expected_qs.append((probs * q_vals.cpu().numpy()).sum())
                    expected_qs = torch.FloatTensor(expected_qs).to(device)
                    target_q = rewards_tensor + gamma * (1 - dones_tensor) * expected_qs
                else:
                    raise ValueError("Unknown algorithm")
                loss = nn.MSELoss()(q_values, target_q)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        state = next_state

    return total_reward

Code for a single run (an experiment on only one seed)

In [78]:
def run_experiment(env_name, algorithm, use_replay, epsilon, lr, seed, episodes=1000, gamma=0.99, batch_size=32):
    env = gym.make(env_name)
    # Set maximum steps based on environment
    if env_name == 'Acrobot-v1':
        max_steps = 500
    elif env_name == 'Assault-ram-v5':
        max_steps = 1000
    else:
        max_steps = 500  # default or other environments

    # Set seeds for reproducibility
    state, info = env.reset(seed=seed)
    env.action_space.seed(seed)
    env.observation_space.seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)

    input_dim = env.observation_space.shape[0]
    output_dim = env.action_space.n

    q_network = QNetwork(input_dim, output_dim, hidden_dim=256, n_layers=2).to(device)
    optimizer = optim.Adam(q_network.parameters(), lr=lr)

    replay_buffer = ReplayBuffer(capacity=1000000) if use_replay else None
    episode_rewards = []
    for ep in range(episodes):
        total_reward = train_episode(env, q_network, optimizer, gamma, epsilon, algorithm, max_steps, replay_buffer, batch_size)
        episode_rewards.append(total_reward)
        print(f"Episode: {ep}, Total Reward: {total_reward}")
    env.close()
    return episode_rewards

Code for all experiments 

In [79]:
def run_multiple_trials(env_name, algorithm, use_replay, epsilon, lr, episodes=1000):
    all_rewards = []
    seeds = [1,2,3,4,5,6,7,8,9,10]
    for seed in seeds:
        print(f"Seed {seed} ...")
        rewards = run_experiment(env_name, algorithm, use_replay, epsilon, lr, seed,
                                 episodes=episodes)
        all_rewards.append(rewards)
    all_rewards = np.array(all_rewards)
    mean_rewards = np.mean(all_rewards, axis=0)
    std_rewards = np.std(all_rewards, axis=0)
    return mean_rewards, std_rewards

Plotting for a single experiment on 10 seeds

In [80]:
def plot_results(mean_rewards, std_rewards, episodes, title, filename):
    plt.figure()
    x = np.arange(episodes)
    plt.plot(x, mean_rewards, label='Mean Reward')
    plt.fill_between(x, mean_rewards - std_rewards, mean_rewards + std_rewards, alpha=0.2)
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.title(title)
    plt.legend()
    plt.savefig(filename)
    plt.close()

Check how long a single run with 500 steps * 1000 episodes last for acrobot for Q_network and expected Sarsa

In [84]:
import time

# Example for Q-Learning on Acrobot
start_time = time.time()
rewards_q = run_experiment("Acrobot-v1", "q_learning", use_replay=False,
                           epsilon=0.1, lr=0.1, seed=0, episodes=1000, gamma=0.99, batch_size=32)
elapsed_time_q = time.time() - start_time
print("Q-Learning run time for Acrobot (max 500 steps/episode):", elapsed_time_q, "seconds")

In [85]:
# xavier_rewards = rewards_q (took 10 mins)
kaming_default_rewards = rewards_q

In [87]:
# Example for Expected SARSA on Acrobot
start_time = time.time()
rewards_es = run_experiment("Acrobot-v1", "expected_sarsa", use_replay=False,
                            epsilon=0.1, lr=0.1, seed=0, episodes=1000, gamma=0.99, batch_size=32)
elapsed_time_es = time.time() - start_time
print("Expected SARSA run time for Acrobot (max 500 steps/episode):", elapsed_time_es, "seconds")

In [88]:
episodes_range = range(1, 1001)
plt.figure(figsize=(10, 6))
plt.plot(episodes_range, xavier_rewards, label="Xavier-Q-Learning", color='green')
plt.plot(episodes_range, kaming_default_rewards, label="default initialization_Qlearning", color='red')
plt.plot(episodes_range, rewards_es, label="Expected SARSA", color='blue')
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Acrobot Reward Curves (Max 500 Steps per Episode)")
plt.legend()
plt.show()

In [92]:
import time

# Example for Q-Learning on Acrobot
start_time = time.time()
rewards_q = run_experiment("Acrobot-v1", "q_learning", use_replay=False,
                           epsilon=0.1, lr=0.01, seed=0, episodes=1000, gamma=0.99, batch_size=32)
elapsed_time_q = time.time() - start_time
print("Q-Learning run time for Acrobot (max 500 steps/episode):", elapsed_time_q, "seconds")

In [93]:
import time

# Example for Q-Learning on Acrobot
start_time = time.time()
rewards_es = run_experiment("Acrobot-v1", "expected_sarsa", use_replay=False,
                           epsilon=0.1, lr=0.01, seed=0, episodes=1000, gamma=0.99, batch_size=32)
elapsed_time_q = time.time() - start_time
print("Q-Learning run time for Acrobot (max 500 steps/episode):", elapsed_time_q, "seconds")

In [94]:
episodes_range = range(1, 1001)
plt.figure(figsize=(10, 6))
plt.plot(episodes_range, rewards_q, label="Q-Learning", color='green')
plt.plot(episodes_range, rewards_es, label="Expected SARSA", color='red')
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Acrobot Reward Curves (Max 500 Steps per Episode)")
plt.legend()
plt.show()

In [95]:
def exponential_moving_average(data, alpha=0.1):
    smoothed = []
    smoothed.append(data[0])  # First value remains the same
    for i in range(1, len(data)):
        smoothed.append(alpha * data[i] + (1 - alpha) * smoothed[-1])
    return smoothed

alpha = 0.05  # Adjust smoothness factor

smoothed_rewards_q = exponential_moving_average(rewards_q, alpha)
smoothed_rewards_es = exponential_moving_average(rewards_es, alpha)

plt.figure(figsize=(10, 6))
plt.plot(episodes_range, smoothed_rewards_q, label="Q-Learning (EMA Smoothed)", color='green')
plt.plot(episodes_range, smoothed_rewards_es, label="Expected SARSA (EMA Smoothed)", color='red')
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Exponentially Smoothed Acrobot Reward Curves")
plt.legend()
plt.show()

# Check how long it takes with replay buffer

In [89]:
import time

# Example for Q-Learning on Acrobot
start_time = time.time()
rewards_q = run_experiment("Acrobot-v1", "q_learning", use_replay=True,
                           epsilon=0.1, lr=0.1, seed=0, episodes=1000, gamma=0.99, batch_size=32)
elapsed_time_q = time.time() - start_time
print("Q-Learning run time for Acrobot (max 500 steps/episode):", elapsed_time_q, "seconds")

In [90]:
# Example for Expected SARSA on Acrobot
start_time = time.time()
rewards_es = run_experiment("Acrobot-v1", "expected_sarsa", use_replay=True,
                            epsilon=0.1, lr=0.1, seed=0, episodes=1000, gamma=0.99, batch_size=32)
elapsed_time_es = time.time() - start_time
print("Expected SARSA run time for Acrobot (max 500 steps/episode):", elapsed_time_es, "seconds")

In [91]:
episodes_range = range(1, 1001)
plt.figure(figsize=(10, 6))
plt.plot(episodes_range, rewards_q, label="Q-Learning", color='green')
plt.plot(episodes_range, rewards_es, label="Expected SARSA", color='red')
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Acrobot Reward Curves (Max 500 Steps per Episode)")
plt.legend()
plt.show()

Run all experiments

In [51]:

# Create a directory to save plots if it doesn't exist
os.makedirs("plots", exist_ok=True)

# List of environments
env_list = ['Acrobot-v1', 'Assault-ram-v5']
#  epsilon values 
epsilon_list = [0.05, 0.1, 0.15]
# Step-size parameters 
lr_list = [0.1, 0.01, 0.001]
# Algorithms to compare
algorithms = ['q_learning', 'expected_sarsa']
# Whether to use replay buffer or not
replay_options = [False, True]

num_seeds = 10      # 10 learning trials
episodes = 1000     # 1000 episodes per run
gamma = 0.99        # Discount factor

# Loop over each environment
for env_name in env_list:
    # For each replay configuration
    for use_replay in replay_options:
        # For each combination of epsilon and step-size (lr)
        for epsilon in epsilon_list:
            for lr in lr_list:
                # For each algorithm: Q-learning (green) and Expected SARSA (red)
                for algo in algorithms:
                    title = (f"{env_name} - {algo.upper()} - " +
                             f"{'Replay' if use_replay else 'No Replay'} - " +
                             f"ϵ: {epsilon}, LR: {lr}")
                    print("Running:", title)
                    mean_rewards, std_rewards = run_multiple_trials(env_name, algo,
                                                                    use_replay, epsilon,
                                                                    lr,
                                                                    episodes)
                    # Save the plot
                    fname = f"plots/{env_name}_{algo}_{'replay' if use_replay else 'no_replay'}_eps{epsilon}_lr{lr}.png"
                    plot_results(mean_rewards, std_rewards, episodes, title, fname)
                    print(f"Saved plot to {fname}")