In [31]:
!pip install torch wandb gymnasium numpy



In [32]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import wandb
import os
import random
import math
import gymnasium as gym
import numpy as np
from collections import deque, namedtuple
import time
from gymnasium.wrappers import RecordVideo
import itertools

In [None]:
wandb.login(key="")
wandb.init(project="RL assignment 2", name="assignment2_run1")



0,1
accuracy,▁███████████████████████████████████████
epoch,▁▁▁▁▁▁▂▂▂▂▂▂▂▂▂▃▃▃▃▃▄▅▅▅▅▅▅▆▆▆▆▆▆▆▆▇████
loss,█▅▄▄▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁

0,1
accuracy,1.0
epoch,10000.0
loss,0.0


In [37]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):
    """
    A simple replay buffer, as described in the DQN paper and lecture.
    Stores transitions and allows for random sampling of batches.
    """
    def __init__(self, capacity):
        # Use a deque as the memory. It automatically handles max length.
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition (state, action, next_state, reward)"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        """Select a random batch of transitions for training"""
        return random.sample(self.memory, batch_size)

    def __len__(self):
        """Return the current size of the memory"""
        return len(self.memory)

In [38]:
class DQN(nn.Module):
    """
    The Deep Q-Network model.
    It's a simple feed-forward neural network.
    Input: State (s)
    Output: Q-value for each possible action Q(s, a)
    """
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        # Define the layers, matching the lecture slide
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    def forward(self, x):
        """
        Defines the forward pass of the network.
        Uses ReLU activation functions as shown in the lecture
        """
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        # The final layer returns raw Q-values (no activation)
        return self.layer3(x)

In [39]:
# Use the GPU if available, otherwise use the CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def select_action(state, policy_net, n_actions, epsilon):
    """
    Selects an action using an epsilon-greedy policy.
    With probability (1-epsilon), it exploits (picks the best action).
    With probability (epsilon), it explores (picks a random action).
    """
    sample = random.random()
    if sample > epsilon:
        # EXPLOITATION: Get the best action from the policy_net
        with torch.no_grad():
            # policy_net(state) returns Q-values for all actions
            # .max(1)[1] gets the *index* of the max Q-value
            # .view(1, 1) reshapes it to [[action]]
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        # EXPLORATION: Pick a random action
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)

In [40]:
class DiscretizeActionWrapper(gym.ActionWrapper):
    """
    Wraps the Pendulum-v1 environment to discretize its continuous action space.
    The continuous action is a torque in [-2.0, 2.0].
    We will map 3 discrete actions {0, 1, 2} to {-2.0, 0.0, 2.0}.
    """
    def __init__(self, env, num_discrete_actions=3):
        super().__init__(env)
        self.num_actions = num_discrete_actions
        # Redefine the action space as discrete
        self.action_space = gym.spaces.Discrete(self.num_actions)
        # Create a mapping from discrete action index to continuous torque value
        self.action_map = np.linspace(
            self.env.action_space.low[0],
            self.env.action_space.high[0],
            self.num_actions
        )

    def action(self, action_index):
        # Map the discrete action index back to a continuous value
        continuous_action = [self.action_map[action_index]]
        return np.array(continuous_action, dtype=np.float32)

In [None]:
def optimize_model(policy_net, target_net, optimizer, memory, batch_size, gamma, loss_fn):
    """
    Performs one step of DQN optimization (backpropagation).
    Samples a batch, computes the loss, and updates the policy_net.
    """
    if len(memory) < batch_size:
        return  # Not enough experiences in memory to sample a batch

    # Sample a batch of transitions from replay memory
    transitions = memory.sample(batch_size)
    batch = Transition(*zip(*transitions))

    # Create tensors for states, actions, and rewards
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Identify non-final next states
    non_final_mask = torch.tensor(tuple(s is not None for s in batch.next_state), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

    # Compute Q_current: Q(s, a) for the actions taken
    Q_current = policy_net(state_batch).gather(1, action_batch)

    # Compute Q_target: R + gamma * max_a' Q(s', a')
    Q_target_next = torch.zeros(batch_size, device=device)
    with torch.no_grad():
        # DQN: Get max Q-value from target network
        Q_target_next[non_final_mask] = target_net(non_final_next_states).max(1)[0]

    Q_target = reward_batch + (Q_target_next * gamma)

    # Compute loss (Smooth L1 / Huber Loss)
    loss = loss_fn(Q_current, Q_target.unsqueeze(1))

    # Backpropagation
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

In [None]:
# --- Main Training Function ---
def run_experiment(
    env_name,
    num_episodes=500,
    batch_size=128,
    gamma=0.99,
    eps_decay=1000,
    tau=0.005,
    lr=1e-4,
    memory_size=10000,
    wandb_run_name=None,
    wandb_group=None
):
    eps_start = 0.9
    eps_end = 0.05

    # Initialize wandb
    if wandb_run_name is None:
        wandb_run_name = f"{env_name}_DQN_{int(time.time())}"

    config = {
        "env_name": env_name,
        "model_type": "DQN",
        "num_episodes": num_episodes,
        "batch_size": batch_size,
        "gamma": gamma,
        "eps_start": eps_start,
        "eps_end": eps_end,
        "eps_decay": eps_decay,
        "tau": tau,
        "lr": lr,
        "memory_size": memory_size,
    }

    run = wandb.init(
        project="cmps458_assignment2",
        name=wandb_run_name,
        group=wandb_group,
        config=config
    )

    # Setup Environment
    if env_name == "Pendulum-v1":
        env = gym.make(env_name)
        env = DiscretizeActionWrapper(env, num_discrete_actions=5)
    else:
        env = gym.make(env_name)

    n_actions = env.action_space.n
    state, info = env.reset()
    n_observations = len(state)

    # Initialize Networks and Optimizer
    policy_net = DQN(n_observations, n_actions).to(device)
    target_net = DQN(n_observations, n_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = torch.optim.AdamW(policy_net.parameters(), lr=lr, amsgrad=True)
    memory = ReplayMemory(memory_size)
    loss_fn = nn.SmoothL1Loss()

    print(f"--- Starting Training: {wandb_run_name} (Group: {wandb_group}) ---")

    # Training Loop
    steps_done = 0
    for i_episode in range(num_episodes):
        state, info = env.reset()
        state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

        episode_duration = 0
        episode_reward = 0
        
        for t in itertools.count():
            epsilon = eps_end + (eps_start - eps_end) * math.exp(-1. * steps_done / eps_decay)
            action = select_action(state, policy_net, n_actions, epsilon)
            steps_done += 1

            observation, reward, terminated, truncated, _ = env.step(action.item())
            episode_duration += 1
            episode_reward += reward
            reward = torch.tensor([reward], device=device)
            done = terminated or truncated

            next_state = None if terminated else torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
            memory.push(state, action, next_state, reward)
            state = next_state

            # Optimize model
            optimize_model(policy_net, target_net, optimizer, memory, batch_size, gamma, loss_fn)

            # Soft update of target network (more efficient)
            with torch.no_grad():
                for target_param, policy_param in zip(target_net.parameters(), policy_net.parameters()):
                    target_param.data.mul_(1 - tau).add_(policy_param.data, alpha=tau)

            if done:
                break

        # Log episode metrics
        wandb.log({
            "episode": i_episode,
            "duration": episode_duration,
            "episode_reward": episode_reward,
            "epsilon": epsilon
        })

    print(f"--- Training Complete: {wandb_run_name} ---")
    model_path = f"{wandb_run_name}.pth"
    torch.save(policy_net.state_dict(), model_path)
    run.finish()
    return model_path, config


# --- Test & Record Function ---
def test_and_record(
    env_name,
    model_type,
    model_path,
    num_tests=100,
    wandb_run_name=None,
    wandb_group=None,
    config=None
):
    """
    Tests a trained DQN agent and records video of first episode.
    """
    print(f"\n--- Testing {model_type} on {env_name} ---")

    if wandb_run_name is None:
        wandb_run_name = f"test_{model_type}"

    if config is None:
        config = {"env_name": env_name, "model_type": model_type, "num_tests": num_tests}

    run = wandb.init(
        project="cmps458_assignment2_tests",
        name=wandb_run_name,
        group=wandb_group,
        config=config
    )

    # Setup Environment
    if env_name == "Pendulum-v1":
        base_env = gym.make(env_name, render_mode="rgb_array")
        env = DiscretizeActionWrapper(base_env, num_discrete_actions=5)
    else:
        env = gym.make(env_name, render_mode="rgb_array")

    # Wrap for Video Recording - use simpler path for Colab compatibility
    video_folder = f"./videos_{wandb_run_name}"
    os.makedirs(video_folder, exist_ok=True)
    env = RecordVideo(
        env,
        video_folder=video_folder,
        episode_trigger=lambda e: e == 0,
        name_prefix=f"{model_type}",
        disable_logger=True  # Disable verbose logging
    )

    # Load Model
    n_actions = env.action_space.n
    state, info = env.reset()
    n_observations = len(state)

    model = DQN(n_observations, n_actions).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    # Run Tests
    test_durations = []
    test_rewards = []
    
    for i_episode in range(num_tests):
        state, info = env.reset()
        state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

        episode_duration = 0
        episode_reward = 0
        
        for t in itertools.count():
            action = select_action(state, model, n_actions, epsilon=0.0)  # Greedy
            observation, reward, terminated, truncated, _ = env.step(action.item())
            episode_duration += 1
            episode_reward += reward
            
            if terminated or truncated:
                break
                
            state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        wandb.log({
            "test_episode": i_episode,
            "test_duration": episode_duration,
            "test_reward": episode_reward
        })
        test_durations.append(episode_duration)
        test_rewards.append(episode_reward)

    # Close environment to ensure video is written
    env.close()
    
    avg_duration = sum(test_durations) / num_tests
    avg_reward = sum(test_rewards) / num_tests
    
    print(f"Testing complete. Video saved in '{video_folder}'")
    print(f"Average duration: {avg_duration:.2f}, Average reward: {avg_reward:.2f}")

    # Upload video to wandb if it exists
    video_files = [f for f in os.listdir(video_folder) if f.endswith('.mp4')]
    if video_files:
        video_path = os.path.join(video_folder, video_files[0])
        wandb.log({"test_video": wandb.Video(video_path, fps=30, format="mp4")})
        print(f"Video uploaded to wandb: {video_files[0]}")

    wandb.log({"avg_test_duration": avg_duration, "avg_test_reward": avg_reward})
    run.finish()

In [None]:
# --- MountainCar-v0: DQN ---
group_name = "MountainCar_Test"
run_name = f"MountainCar_DQN_lr-{0.001}"
model_path, config = run_experiment(
    env_name="MountainCar-v0",
    num_episodes=500,
    wandb_run_name=run_name,
    wandb_group=group_name
)
test_and_record(
    "MountainCar-v0", run_name, model_path,
    wandb_run_name=f"test_{run_name}",
    wandb_group=group_name,
    config=config
)


[34m[1mwandb[0m: [32m[41mERROR[0m The nbformat package was not found. It is required to save notebook history.


--- Starting Training for CartPole_DQN_lr-0.001 (Group: CartPole_LR_Test) ---
Config: {'env_name': 'CartPole-v1', 'model_type': 'DQN', 'num_episodes': 500, 'batch_size': 128, 'gamma': 0.99, 'eps_start': 0.9, 'eps_end': 0.05, 'eps_decay': 1000, 'tau': 0.005, 'lr': 0.0001, 'memory_size': 10000}


KeyboardInterrupt: 

In [None]:
# --- MountainCar-v0 Hyperparameter Sweep ---

# 1. Test Learning Rates (lr)
lr_tests = [1e-3, 1e-4]
for lr in lr_tests:
    group_name = "MountainCar_LR_Test"
    run_name = f"MountainCar_DQN_lr-{lr}"

    model_path, config = run_experiment(
        env_name="MountainCar-v0",
        num_episodes=500,
        lr=lr,  # <-- Variable we are testing
        wandb_run_name=run_name,
        wandb_group=group_name
    )
    test_and_record(
        "MountainCar-v0", run_name, model_path,
        wandb_run_name=f"test_{run_name}",
        wandb_group=group_name,
        config=config
    )

# 2. Test Discount Factors (gamma)
gamma_tests = [0.99, 0.9]
for gamma in gamma_tests:
    group_name = "MountainCar_Gamma_Test"
    run_name = f"MountainCar_DQN_gamma-{gamma}"

    model_path, config = run_experiment(
        env_name="MountainCar-v0",
        num_episodes=500,
        gamma=gamma,  # <-- Variable we are testing
        wandb_run_name=run_name,
        wandb_group=group_name
    )
    test_and_record(
        "MountainCar-v0", run_name, model_path,
        wandb_run_name=f"test_{run_name}",
        wandb_group=group_name,
        config=config
    )


# 3. Test Epsilon Decay Rates
eps_decay_tests = [1000, 5000]
for eps_decay in eps_decay_tests:
    group_name = "MountainCar_EpsDecay_Test"
    run_name = f"MountainCar_DQN_eps-{eps_decay}"

    model_path, config = run_experiment(
        env_name="MountainCar-v0",
        num_episodes=1000,
        lr=1e-3,
        eps_decay=eps_decay,  # <-- Variable we are testing
        wandb_run_name=run_name,
        wandb_group=group_name
    )
    test_and_record(
        "MountainCar-v0", run_name, model_path,
        wandb_run_name=f"test_{run_name}",
        wandb_group=group_name,
        config=config
    )

# 4. Test Replay Memory Sizes
memory_tests = [10000, 50000]
for mem_size in memory_tests:
    group_name = "MountainCar_Memory_Test"
    run_name = f"MountainCar_DQN_mem-{mem_size}"

    model_path, config = run_experiment(
        env_name="MountainCar-v0",
        num_episodes=1000,
        lr=1e-3,
        memory_size=mem_size,  # <-- Variable we are testing
        wandb_run_name=run_name,
        wandb_group=group_name
    )
    test_and_record(
        "MountainCar-v0", run_name, model_path,
        wandb_run_name=f"test_{run_name}",
        wandb_group=group_name,
        config=config
    )