In [1]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from collections import deque
import random

In [2]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [3]:
# Initialize environment
env = gym.make("ALE/Carnival-v5", full_action_space=True)
action_space = env.action_space.n

In [4]:
env.unwrapped.get_action_meanings()

['NOOP',
 'FIRE',
 'UP',
 'RIGHT',
 'LEFT',
 'DOWN',
 'UPRIGHT',
 'UPLEFT',
 'DOWNRIGHT',
 'DOWNLEFT',
 'UPFIRE',
 'RIGHTFIRE',
 'LEFTFIRE',
 'DOWNFIRE',
 'UPRIGHTFIRE',
 'UPLEFTFIRE',
 'DOWNRIGHTFIRE',
 'DOWNLEFTFIRE']

In [5]:
observation, info = env.reset(seed=123, options={})
observation, info

(array([[[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        ...,
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]]], dtype=uint8),
 {'lives': 0,
  'episode_frame_number': 0,
  'frame_number': 0,
  'seeds': (693650678, 2973253328)})

In [6]:
action = env.action_space.sample()
action

14

In [7]:
observation, reward, terminated, truncated, info = env.step(action)
reward

0.0

In [8]:
observation

array([[[0, 0, 0],
        [0, 0, 0],
        [0, 0, 0],
        ...,
        [0, 0, 0],
        [0, 0, 0],
        [0, 0, 0]],

       [[0, 0, 0],
        [0, 0, 0],
        [0, 0, 0],
        ...,
        [0, 0, 0],
        [0, 0, 0],
        [0, 0, 0]],

       [[0, 0, 0],
        [0, 0, 0],
        [0, 0, 0],
        ...,
        [0, 0, 0],
        [0, 0, 0],
        [0, 0, 0]],

       ...,

       [[0, 0, 0],
        [0, 0, 0],
        [0, 0, 0],
        ...,
        [0, 0, 0],
        [0, 0, 0],
        [0, 0, 0]],

       [[0, 0, 0],
        [0, 0, 0],
        [0, 0, 0],
        ...,
        [0, 0, 0],
        [0, 0, 0],
        [0, 0, 0]],

       [[0, 0, 0],
        [0, 0, 0],
        [0, 0, 0],
        ...,
        [0, 0, 0],
        [0, 0, 0],
        [0, 0, 0]]], dtype=uint8)

In [9]:
info

{'lives': 0, 'episode_frame_number': 4, 'frame_number': 4}

In [10]:
terminated

False

In [11]:
truncated

False

In [12]:
# Ensure that the preprocessing function moves the tensor to the correct device
def preprocess_observation(obs):
    transform = T.Compose([
        T.ToPILImage(),
        T.Grayscale(),
        T.Resize((84, 84)),
        T.ToTensor()
    ])
    return transform(obs).unsqueeze(0).to(device)

In [13]:
# Define the DQN network
class DQN(nn.Module):
    def __init__(self, action_space):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc1 = nn.Linear(64 * 7 * 7, 512)
        self.fc2 = nn.Linear(512, action_space)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

In [14]:
# Hyperparameters
gamma = 0.99
epsilon_start = 1.0
epsilon_end = 0.1
epsilon_decay = 100000
learning_rate = 0.0001
batch_size = 32
memory_size = 10000
target_update_frequency = 1000

In [15]:
# Experience replay memory
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [16]:
# Initialize DQN and target network
policy_net = DQN(action_space).to(device)
target_net = DQN(action_space).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

DQN(
  (conv1): Conv2d(1, 32, kernel_size=(8, 8), stride=(4, 4))
  (conv2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
  (conv3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=3136, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=18, bias=True)
)

In [17]:
optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

In [18]:
def select_action(state, steps_done):
    epsilon = epsilon_end + (epsilon_start - epsilon_end) * \
              np.exp(-1. * steps_done / epsilon_decay)
    if random.random() > epsilon:
        with torch.no_grad():
            return policy_net(state).max(1)[1].item()
    else:
        return random.randrange(action_space)

In [19]:
def optimize_model():
    if len(memory) < batch_size:
        return
    transitions = memory.sample(batch_size)
    batch = zip(*transitions)

    states, actions, rewards, next_states, dones = batch
    states = torch.cat(states).to(device)
    actions = torch.tensor(actions, dtype=torch.long).to(device)
    rewards = torch.tensor(rewards, dtype=torch.float).to(device)
    next_states = torch.cat(next_states).to(device)
    dones = torch.tensor(dones, dtype=torch.float).to(device)

    state_action_values = policy_net(states).gather(1, actions.unsqueeze(1))
    next_state_values = target_net(next_states).max(1)[0].detach()
    expected_state_action_values = (next_state_values * gamma * (1 - dones)) + rewards

    loss = nn.functional.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [24]:
# Training loop
num_episodes = 200
steps_done = 0

for episode in range(num_episodes):
    obs, _ = env.reset()
    state = preprocess_observation(obs)

    total_reward = 0
    done = False

    while not done:
        action = select_action(state, steps_done)
        next_obs, reward, done, _, _ = env.step(action)
        total_reward += reward

        next_state = preprocess_observation(next_obs)
        memory.push(state, action, reward, next_state, done)

        state = next_state
        optimize_model()
        steps_done += 1

        if steps_done % target_update_frequency == 0:
            target_net.load_state_dict(policy_net.state_dict())

    print(f"Episode {episode + 1}: Total Reward: {total_reward}")

env.close()

Episode 1: Total Reward: 960.0
Episode 2: Total Reward: 680.0
Episode 3: Total Reward: 240.0
Episode 4: Total Reward: 480.0
Episode 5: Total Reward: 880.0
Episode 6: Total Reward: 700.0
Episode 7: Total Reward: 820.0
Episode 8: Total Reward: 500.0
Episode 9: Total Reward: 280.0
Episode 10: Total Reward: 740.0
Episode 11: Total Reward: 800.0
Episode 12: Total Reward: 820.0
Episode 13: Total Reward: 1180.0
Episode 14: Total Reward: 440.0
Episode 15: Total Reward: 280.0
Episode 16: Total Reward: 840.0
Episode 17: Total Reward: 1480.0
Episode 18: Total Reward: 720.0
Episode 19: Total Reward: 540.0
Episode 20: Total Reward: 860.0
Episode 21: Total Reward: 420.0
Episode 22: Total Reward: 680.0
Episode 23: Total Reward: 480.0
Episode 24: Total Reward: 740.0
Episode 25: Total Reward: 740.0
Episode 26: Total Reward: 1900.0
Episode 27: Total Reward: 280.0
Episode 28: Total Reward: 940.0
Episode 29: Total Reward: 1060.0
Episode 30: Total Reward: 900.0
Episode 31: Total Reward: 900.0
Episode 32: T

In [25]:
# Add this import at the top
import os

# Save the model
model_dir = 'saved_models'
os.makedirs(model_dir, exist_ok=True)
model_path = os.path.join(model_dir, 'dqn_carnival.pth')

torch.save(policy_net.state_dict(), model_path)
print(f"Model saved at {model_path}")

Model saved at saved_models\dqn_carnival.pth


In [26]:
import gymnasium as gym
import torch
import torchvision.transforms as T
import imageio

# Define your DQN model here or import it
# from DQN import DQN  

# Load the saved model
model_path = 'saved_models/dqn_carnival.pth'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
policy_net = DQN(action_space).to(device)
policy_net.load_state_dict(torch.load(model_path, map_location=device, weights_only=False))
policy_net.eval()

# Initialize environment with rgb_array mode
env = gym.make("ALE/Carnival-v5", full_action_space=True, render_mode="rgb_array")

# Preprocessing function
def preprocess_observation(obs):
    transform = T.Compose([
        T.ToPILImage(),
        T.Grayscale(),
        T.Resize((84, 84)),
        T.ToTensor()
    ])
    return transform(obs).unsqueeze(0).to(device)

# Play one episode and save frames
obs, _ = env.reset()
done = False
frames = []

while not done:
    state = preprocess_observation(obs)
    with torch.no_grad():
        action = policy_net(state).max(1)[1].item()
    obs, _, terminated, truncated, _ = env.step(action)
    
    done = terminated or truncated
    
    # Capture the current frame
    frame = env.render()  # This will now return an RGB array
    frames.append(frame)

env.close()

# Save frames as a gif
gif_path = 'carnival_gameplay.gif'
imageio.mimsave(gif_path, frames, fps=30)

print(f"Gameplay saved as {gif_path}")

Gameplay saved as carnival_gameplay.gif
