In [173]:
import gym 
import numpy as np
import cv2 as cv
import torch 
import torch.nn as nn      
import torch.nn.functional as F
from collections import namedtuple


In [174]:
def rgb_to_grayscale(frame):
    grayscale=np.dot(frame[..., :3], [0.2989, 0.5870, 0.1140])
    return grayscale.astype(np.uint8)
def resize_frame(frame, size=(84, 84)):
    resized_frame = cv.resize(frame, size, interpolation=cv.INTER_AREA)
    return resized_frame
def normalize_frame(frame):
    normalized_frame = frame / 255.0
    return normalized_frame.astype(np.float32)
def preprocess_frame(frame):
    gray = rgb_to_grayscale(frame)
    resized = resize_frame(gray, size=(84, 84))
    normalized = normalize_frame(resized)
    return normalized

In [175]:
from collections import deque
class FrameStack:
    def __init__(self,maxlen=4):
        self.maxlen = maxlen
        self.frames = deque(maxlen=maxlen)
    def push(self, frame):
        self.frames.append(frame)
    def get_stack(self):
        if len(self.frames) < self.maxlen:
            h,w= self.frames[0].shape if self.frames else (84, 84)
            padding =[np.zeros((h,w), dtype=np.float32)]*(self.maxlen - len(self.frames))
            full_stack=list(self.frames)+list(padding)
        else:
            full_stack = list(self.frames)
        return np.stack(full_stack, axis=0)
    def reset(self):
        self.frames.clear()

In [176]:
class AtariWrapper:
    def __init__(self, env_name: str="ALE/Breakout-v5",frame_skip: int=4):
        self.env = gym.make(env_name)
        self.frame_skip = frame_skip
        self.frame_stack = FrameStack(maxlen=4)
        self.action_space = self.env.action_space
    def reset(self):
        obs=self.env.reset()
        self.frame_stack.reset()
        preprocessed_frame = preprocess_frame(obs)
        for _ in range(self.frame_stack.maxlen):
            self.frame_stack.push(preprocessed_frame)
        return self.frame_stack.get_stack()
    def step(self, action):
        total_reward = 0.0
        done=False
        for _ in range(self.frame_skip):
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        preprocessed_frame=preprocess_frame(obs)
        self.frame_stack.push(preprocessed_frame)
        return self.frame_stack.get_stack(), total_reward, done, info
    def get_state(self):
        return self.frame_stack.get_stack()
    def close(self):
        self.env.close()
        

In [177]:
class SumTree:
    def __init__(self, capacity):
        self.capacity=capacity
        self.tree=np.zeros(2 * capacity - 1, dtype=np.float32)
        self.data=np.zeros(capacity, dtype=object)
        self.data_pointer=0
        self.size=0
    def _propagate(self, idx, change):
        parent= (idx-1)//2
        self.tree[parent] += change
        if parent != 0:
            self._propagate(parent, change)
    def _retrieve(self,idx,s):
        left=2*idx+1
        right=2*idx+2
        if left >=len(self.tree):
            return idx
        if s <=self.tree[left]:
            return self._retrieve(left,s)
        else:
            return self._retrieve(right, s - self.tree[left])
    def total_priority(self):
        return self.tree[0]
    def add(self,priority,data):
        idx=self.data_pointer + self.capacity - 1
        self.data[self.data_pointer] = data
        self.update(idx, priority)
        self.data_pointer += 1
        if self.data_pointer >= self.capacity:
            self.data_pointer = 0
        if self.size < self.capacity:
            self.size += 1
    def update(self, idx, priority):
        change = priority - self.tree[idx]
        self.tree[idx] = priority
        self._propagate(idx, change)
    def get_leaf(self, s):
        idx = self._retrieve(0, s)
        data_idx = idx - (self.capacity - 1)
        return idx, self.tree[idx], self.data[data_idx]
    def __len__(self):
        return self.size
    


    

In [178]:

Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])
class PrioritizedReplayBuffer:
    def __init__(self, capacity=10000, alpha=0.6,beta_start=0.4):
        self.capacity = capacity
        self.alpha = alpha
        self.tree = SumTree(capacity)
        self.beta=beta_start
        self.beta_increment_per_sampling=1e-4
        self.max_priority = 1.0
        self.epsilon = 1e-6  # Small value to avoid zero priority
    def get_beta(self):
        beta = min(1.0, self.beta + self.beta_increment_per_sampling)
        self.beta = beta
        return beta
    def push(self,state, action, reward, next_state, done):
        experience = Experience(state, action, reward, next_state, done)
        priority=self.max_priority** self.alpha
        self.tree.add(priority, experience)
    def sample(self, batch_size):
        batch = []
        indices = np.empty(batch_size, dtype=np.int32)
        priorities=np.empty(batch_size, dtype=np.float32)
        batch_data = []
        segment = self.tree.total_priority() / batch_size
        beta= self.get_beta()
        min_prob = np.min(self.tree.tree[-self.capacity:]) / self.tree.total_priority()
        max_weight = (min_prob * batch_size) ** (-beta)
        for i in range(batch_size):
            a1= segment * i
            a2 = segment * (i + 1)
            s = np.random.uniform(a1, a2)
            idx, p, data = self.tree.get_leaf(s)
            prob=p/self.tree.total_priority()
            weight=(prob * batch_size) ** (-beta)
            batch.append(data)
            priorities[i] = weight/max_weight
            indices[i] = idx
        return batch, indices, priorities
    def update_priorities(self, indices, priorities):
        for idx, priority in zip(indices, priorities):
            priority = (np.abs(priority) + self.epsilon) ** self.alpha
            self.tree.update(idx, priority)
            self.max_priority = max(self.max_priority, priority)
    def __len__(self):
        return self.tree.size




In [179]:
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        c,h,w= input_dim
        self.conv1 = nn.Conv2d(c, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self._conv_output_dim = self._get_conv_output_size((c, h, w))

        self.fc1 = nn.Linear(self._conv_output_dim, 512)
        self.fc2 = nn.Linear(512, output_dim)
    def _get_conv_output_size(self, shape):
        with torch.no_grad():
            x = torch.zeros(1, *shape)  # batch size = 1
            x = self.conv1(x)
            x = self.conv2(x)
            x = self.conv3(x)
            return int(torch.prod(torch.tensor(x.shape[1:])))
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

In [180]:
def compute_double_dqn_loss(policy_net, target_net,states,actions,rewards,next_states,dones,gamma,is_weights):
    q_values = policy_net(states)
    q_values=q_values.gather(1, actions.unsqueeze(1)).squeeze(1)

    next_actions = policy_net(next_states).argmax(dim=1)
    next_q_values = target_net(next_states).gather(1, next_actions.unsqueeze(1)).squeeze(1)
    targets=rewards+ gamma * next_q_values * (1 - dones)
    td_errors=q_values-targets.detach()
    loss=(td_errors ** 2 * is_weights).mean()
    return loss, td_errors.detach()

In [181]:
class AdvancedDQNAgent:
    def __init__(self,state_shape,action_size,config):
        self.state_shape = state_shape
        self.action_size = action_size
        self.gamma = config['gamma']
        self.epsilon_start = config['epsilon_start']
        self.epsilon_end = config['epsilon_end']
        self.epsilon_decay = config['epsilon_decay']
        self.device = config['device']
        self.policy_net = DQN(state_shape, action_size).to(config['device'])
        self.target_net = DQN(state_shape, action_size).to(config['device'])
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        self.optimizer = torch.optim.Adam(self.policy_net.parameters(), lr=config['learning_rate'])
        self.replay_buffer = config['replay_buffer']
        self.steps_done = 0
        self.step_count = 0
    
    def select_action(self, state):
        epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
                  np.exp(-1. * self.step_count / self.epsilon_decay)
        self.step_count += 1

        if np.random.rand() < epsilon:
            return np.random.randint(self.action_size)
        else:
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            with torch.no_grad():
                q_values = self.policy_net(state_tensor)
            return q_values.argmax(1).item()
    def store_transition(self,state, action, reward, next_state, done):
        self.replay_buffer.push(state, action, reward, next_state, done)

    def update(self):
        if len(self.replay_buffer) < 32:
                return None

        batch, indices, is_weights = self.replay_buffer.sample(32)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(np.stack(states)).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(np.stack(next_states)).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)
        is_weights = torch.FloatTensor(is_weights).to(self.device)
        loss, td_errors = compute_double_dqn_loss(
            self.policy_net, self.target_net, states, actions,
            rewards, next_states, dones, self.gamma, is_weights)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.replay_buffer.update_priorities(indices, td_errors.abs().cpu().numpy())
        return loss.item()

    def update_target_network(self):
        self.target_net.load_state_dict(self.policy_net.state_dict())


In [182]:
config = {
    'learning_rate': 2e-4,  # Learning rate for the optimizer
    'gamma': 0.99,  # Discount factor for future rewards
    'buffer_size': 20000,  # Maximum size of the replay buffer
    'epsilon_start': 1.0,  # Initial value of epsilon for exploration
    'epsilon_end': 0.01,  # Final value of epsilon for exploration
    'epsilon_decay': 1000000,  # Number of steps over which epsilon decays
    'batch_size': 16,  # Number of samples to draw from the buffer for each training step
    'target_update_freq': 500,  # Frequency (in training steps) to update the target network
    'initial_replay_size': 5000,  # Minimum number of experiences in the buffer before training starts
    'alpha': 0.6,  # PER prioritization
    'beta_start': 0.4,  # PER importance sampling
    'max_episodes': 1000,  # Maximum number of episodes to run
    'target_score': 12.0,  # Mean score over 50 episodes
    'device': torch.device("cuda" if torch.cuda.is_available() else "cpu")
}




In [183]:

def train_agent(env, agent, config):
    rewards_per_episode = []
    recent_scores = []
    best_mean_score = -float('inf')
    losses = []

    for episode in range(config['max_episodes']):
        state = env.reset()
        done = False
        total_reward = 0
        step_in_episode = 0

        while not done:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.store_transition(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            step_in_episode += 1

            loss = agent.update()

            if agent.step_count % config['target_update_freq'] == 0:
                agent.update_target_network()

        rewards_per_episode.append(total_reward)
        recent_scores.append(total_reward)
        if len(recent_scores) > 50:
            recent_scores.pop(0)

        mean_recent = np.mean(recent_scores)
        print(f"Episode {episode}: Reward = {total_reward}, Mean(50) = {mean_recent:.2f}")

        if mean_recent >= config['target_score']:
            print("Target performance reached.")
            break

    return rewards_per_episode,losses, agent

In [184]:
import matplotlib.pyplot as plt

def plot_training_results(episode_rewards, mean_scores, losses):
    """Create comprehensive training visualization"""
    fig, axs = plt.subplots(2, 2, figsize=(14, 10))
    
    # Episode rewards with rolling mean
    axs[0, 0].plot(episode_rewards, label='Episode Reward', alpha=0.6)
    if len(episode_rewards) >= 10:
        rolling_mean = np.convolve(episode_rewards, np.ones(10)/10, mode='valid')
        axs[0, 0].plot(range(len(rolling_mean)), rolling_mean, label='Rolling Mean (10)', color='red')
    axs[0, 0].set_title("Episode Rewards")
    axs[0, 0].set_xlabel("Episode")
    axs[0, 0].set_ylabel("Reward")
    axs[0, 0].legend()

    # Mean scores vs target
    mean_scores = [np.mean(episode_rewards[max(0, i-49):i+1]) for i in range(len(episode_rewards))]
    axs[0, 1].plot(mean_scores, label='Mean Score (50)', color='orange')
    axs[0, 1].axhline(y=12.0, color='green', linestyle='--', label='Target Score')
    axs[0, 1].set_title("Mean Score Over 50 Episodes")
    axs[0, 1].set_xlabel("Episode")
    axs[0, 1].set_ylabel("Mean Score")
    axs[0, 1].legend()

    # Loss evolution
    if losses:
        axs[1, 0].plot(losses, label='Loss')
        axs[1, 0].set_title("Training Loss")
        axs[1, 0].set_xlabel("Training Step")
        axs[1, 0].set_ylabel("Loss")
        axs[1, 0].legend()
    else:
        axs[1, 0].text(0.5, 0.5, 'No losses recorded', ha='center', va='center')
        axs[1, 0].set_title("Training Loss")

    # Histogram of scores
    axs[1, 1].hist(episode_rewards, bins=20, color='purple', alpha=0.7)
    axs[1, 1].set_title("Score Distribution")
    axs[1, 1].set_xlabel("Episode Reward")
    axs[1, 1].set_ylabel("Frequency")

    plt.tight_layout()
    plt.show()


In [185]:
if __name__ == "__main__":
    print("=" * 60)
    print("Assignment: Prioritized Experience Replay with Double DQN")
    print("Author: Your Name | Course: RL Assignment")
    print("=" * 60)

    # Environment setup
    env=AtariWrapper(env_name="ALE/Breakout-v5", frame_skip=4)
    action_size = env.action_space.n
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    state_shape = env.reset().shape  # (4, 84, 84)
    action_size = env.action_space.n

    # Instantiate replay buffer
    buffer = PrioritizedReplayBuffer(capacity=config['buffer_size'],
                                     alpha=config['alpha'],
                                     beta_start=config['beta_start'])
    config['replay_buffer'] = buffer

    # Create agent
    agent = AdvancedDQNAgent(state_shape=state_shape, action_size=action_size, config=config)

    # Run training
    rewards, trained_agent = train_agent( env=AtariWrapper(env_name="-ALE/Breakout-v5"),agent=agent, config=config)

    # Plot results
    plot_training_results(rewards, config['target_score'])

    print("Training complete. Results saved as 'training_results.png'.")

Assignment: Prioritized Experience Replay with Double DQN
Author: Your Name | Course: RL Assignment


NamespaceNotFound: Namespace `ALE` does not exist. Have you installed the proper package for `ALE`?

In [170]:
import gym
env = gym.make("ALE/Breakout-v5", render_mode="human")
obs, info = env.reset()

done = False
while not done:
    obs, reward, terminated, truncated, info = env.step(env.action_space.sample())
    done = terminated or truncated
    env.render()
env.close()

NamespaceNotFound: Namespace `ALE` does not exist. Have you installed the proper package for `ALE`?

In [172]:
from ale_py import ALEInterface

ale = ALEInterface()

# Load the ROM first (update path to your ROM file)
rom_path = '/Users/kanishksharma/miniconda/envs/spikeverse/lib/python3.10/site-packages/AutoROM/roms/breakout.bin'
ale.loadROM(rom_path)

# Now get the minimal action set
roms = ale.getMinimalActionSet()
print("Available minimal action set:", roms)


Available minimal action set: [<Action.NOOP: 0>, <Action.FIRE: 1>, <Action.RIGHT: 3>, <Action.LEFT: 4>]


A.L.E: Arcade Learning Environment (version 0.7.5+db37282)
[Powered by Stella]
Game console created:
  ROM file:  /Users/kanishksharma/miniconda/envs/spikeverse/lib/python3.10/site-packages/AutoROM/roms/breakout.bin
  Cart Name: Breakout - Breakaway IV (1978) (Atari)
  Cart MD5:  f34f08e5eb96e500e851a80be3277a56
  Display Format:  AUTO-DETECT ==> NTSC
  ROM Size:        2048
  Bankswitch Type: AUTO-DETECT ==> 2K

Running ROM file...
Random seed is 1748870542
