In [None]:
pip install minatar

Collecting minatar
  Downloading MinAtar-1.0.15-py3-none-any.whl.metadata (685 bytes)
Downloading MinAtar-1.0.15-py3-none-any.whl (16 kB)
Installing collected packages: minatar
Successfully installed minatar-1.0.15


In [None]:
pip install ale-py



In [None]:
pip install gymnasium



In [None]:
from time import sleep
import numpy as np
from IPython.display import clear_output
import gymnasium as gym
from gymnasium.envs.registration import register
import torch
from torch import nn


In [None]:
from gymnasium import spaces
from minatar import Environment

class MinAtarEnv(gym.Env):
    def __init__(self, game_name):
        super().__init__()
        # Initialize the MinAtar environment
        self.minatar_env = Environment(env_name=game_name)

        # Define observation and action spaces
        self.observation_space = spaces.Box(
            low=0,
            high=1,
            shape=self.minatar_env.state_shape(),
            dtype=float
        )
        self.action_space = spaces.Discrete(self.minatar_env.num_actions())

    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        self.minatar_env.reset()
        # Return the initial state and an empty info dict
        return self.minatar_env.state(), {}

    def step(self, action):
        reward, done = self.minatar_env.act(action)
        return self.minatar_env.state(), reward, done, done, {}

    def render(self, mode="human"):
        arr=(self.minatar_env.state()*np.arange(1,7)).max(axis=2)
        num_to_char = {
          0: ' ',
          1: '\u2569',
          2: '\u2ADA',
          3: '\u2ADA',
          4: '\u2ADA',
          5: '|',
          6: ':'
        }
        char_grid = np.vectorize(num_to_char.get)(arr)
        padded_rows = []
        for row in char_grid:
          padded_row = ''.join(char for char in row)
          padded_rows.append(padded_row)
        final_grid = '\n'.join(padded_rows)
        print(final_grid)

    def close(self):
        # Clean up resources if needed
        pass


# Instantiate the custom MinAtar space_invaders environment
env = MinAtarEnv("space_invaders")


In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")


Using cuda device


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import deque

class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(DQN, self).__init__()
        c, h, w = input_shape  # Ensure correct channels
        self.conv1 = nn.Conv2d(c, 32, kernel_size=3, stride=1)  # Change input channels to match env
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1)
        self.fc1 = nn.Linear(64 * (h - 4) * (w - 4), 128)
        self.fc2 = nn.Linear(128, num_actions)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        return self.fc2(x)


In [None]:
# Experience Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)
        return np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)

    def __len__(self):
        return len(self.buffer)

In [None]:
# Epsilon-greedy policy
class EpsilonGreedyPolicy:
    def __init__(self, start_epsilon, end_epsilon, decay):
        self.epsilon = start_epsilon
        self.end_epsilon = end_epsilon
        self.decay = decay

    def select_action(self, q_network, state, num_actions, device):
        if random.random() < self.epsilon:
            return random.randrange(num_actions)
        else:
            state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            with torch.no_grad():
                return q_network(state).argmax(dim=1).item()

    def decay_epsilon(self):
        self.epsilon = max(self.end_epsilon, self.epsilon * self.decay)



In [None]:
# Training function
def train_dqn(env, num_episodes=5000, batch_size=32, gamma=0.99, lr=1e-3, buffer_size=10000):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    input_shape = (env.observation_space.shape[2], env.observation_space.shape[0], env.observation_space.shape[1])
    num_actions = env.action_space.n

    print("Environment Observation Space Shape:", env.observation_space.shape)

    q_network = DQN(input_shape, num_actions).to(device)
    target_network = DQN(input_shape, num_actions).to(device)
    target_network.load_state_dict(q_network.state_dict())

    optimizer = optim.Adam(q_network.parameters(), lr=lr)
    replay_buffer = ReplayBuffer(buffer_size)
    policy = EpsilonGreedyPolicy(1.0, 0.1, 0.995)
    rewards_per_episode = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        state = np.moveaxis(state, -1, 0)  # Ensure correct input shape
        print(f"Episode {episode} started. Initial State Shape: {state.shape}")
        done = False
        total_reward = 0
        step = 0

        while not done:
            action = policy.select_action(q_network, state, num_actions, device)
            next_state, reward, done, _, _ = env.step(action)
            next_state = np.moveaxis(next_state, -1, 0)  # Ensure correct input shape
            replay_buffer.push(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            step += 1

            if len(replay_buffer) > batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
                states = torch.tensor(states, dtype=torch.float32, device=device)
                actions = torch.tensor(actions, dtype=torch.long, device=device).unsqueeze(1)
                rewards = torch.tensor(rewards, dtype=torch.float32, device=device)
                next_states = torch.tensor(next_states, dtype=torch.float32, device=device)
                dones = torch.tensor(dones, dtype=torch.float32, device=device)

                q_values = q_network(states).gather(1, actions).squeeze(1)
                next_q_values = target_network(next_states).max(1)[0]
                target_q_values = rewards + (gamma * next_q_values * (1 - dones))

                loss = nn.MSELoss()(q_values, target_q_values.detach())
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        rewards_per_episode.append(total_reward)
        policy.decay_epsilon()

        if episode % 10 == 0:
            target_network.load_state_dict(q_network.state_dict())
            print(f"Episode {episode}: Total Reward: {total_reward}, Epsilon: {policy.epsilon:.2f}")

    # Plot learning curve
    plt.plot(range(num_episodes), rewards_per_episode)
    plt.xlabel("Episodes")
    plt.ylabel("Total Reward")
    plt.title("DQN Learning Curve")
    plt.show()

    return q_network



In [None]:
env = MinAtarEnv("space_invaders")  # Assuming you have defined this earlier
trained_model = train_dqn(env, num_episodes=5000)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Episode 209 started. Initial State Shape: (6, 10, 10)
Episode 210 started. Initial State Shape: (6, 10, 10)
Episode 210: Total Reward: 4, Epsilon: 0.35
Episode 211 started. Initial State Shape: (6, 10, 10)
Episode 212 started. Initial State Shape: (6, 10, 10)
Episode 213 started. Initial State Shape: (6, 10, 10)
Episode 214 started. Initial State Shape: (6, 10, 10)
Episode 215 started. Initial State Shape: (6, 10, 10)
Episode 216 started. Initial State Shape: (6, 10, 10)
Episode 217 started. Initial State Shape: (6, 10, 10)
Episode 218 started. Initial State Shape: (6, 10, 10)
Episode 219 started. Initial State Shape: (6, 10, 10)
Episode 220 started. Initial State Shape: (6, 10, 10)
Episode 220: Total Reward: 18, Epsilon: 0.33
Episode 221 started. Initial State Shape: (6, 10, 10)
Episode 222 started. Initial State Shape: (6, 10, 10)
Episode 223 started. Initial State Shape: (6, 10, 10)
Episode 224 started. Initial State S

In [None]:
def evaluate_dqn(env, q_network, num_episodes=10):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    total_rewards = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        state = np.moveaxis(state, -1, 0)  # Fix input shape (C, H, W)
        done = False
        total_reward = 0

        while not done:
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            with torch.no_grad():
                action = q_network(state_tensor).argmax(dim=1).item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = np.moveaxis(next_state, -1, 0)  # Fix input shape

            total_reward += reward
            state = next_state

        total_rewards.append(total_reward)
        print(f"Evaluation Episode {episode}: Total Reward: {total_reward}")

    avg_reward = np.mean(total_rewards)
    print(f"Average Reward over {num_episodes} episodes: {avg_reward}")
    return avg_reward

# Run Evaluation
evaluate_dqn(env, trained_model)



Evaluation Episode 0: Total Reward: 1
Evaluation Episode 1: Total Reward: 23
Evaluation Episode 2: Total Reward: 47
Evaluation Episode 3: Total Reward: 19
Evaluation Episode 4: Total Reward: 27
Evaluation Episode 5: Total Reward: 23
Evaluation Episode 6: Total Reward: 19
Evaluation Episode 7: Total Reward: 23
Evaluation Episode 8: Total Reward: 23
Evaluation Episode 9: Total Reward: 8
Average Reward over 10 episodes: 21.3


21.3