In [None]:
#Upload files on Google Colab (Must to everytime you use this)
from google.colab import files
uploaded = files.upload()

Saving HC ROMS.zip to HC ROMS.zip
Saving ROMS.zip to ROMS.zip


In [None]:
#Install roms on Google Colab (Must to everytime you use this)
!python -m atari_py.import_roms .

In [None]:
import gym
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from collections import deque
import numpy as np
import random
import datetime

In [None]:
reward_number = 0.37

In [None]:
class CNN(torch.nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=7),
            nn.ReLU(),
        )
        self.fc = nn.Linear(in_features=2304, out_features=9)
   
    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [None]:
# Instantiate the model and move it to the GPU (if available)
model = CNN()
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)
# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [None]:
def preprocess_frame(frame):
    # Convert the image to grayscale
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    # Resize the image to 84x84
    frame = cv2.resize(frame, (84, 84), interpolation=cv2.INTER_AREA)
    # Rescale the pixel values to between 0 and 1
    frame = frame / 255.0
    # Convert the image to a PyTorch tensor
    frame = torch.from_numpy(frame).float().unsqueeze(0)
    return frame

def update_target_network(model, target_model):
    target_model.load_state_dict(model.state_dict())

In [None]:
def train(model, target_model, memory, batch_size, gamma, optimizer, criterion):
    if len(memory) < batch_size:
        return
    # Sample a batch of transitions from memory
    transitions = random.sample(memory, batch_size)
    batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(*transitions)
    batch_state = torch.cat(batch_state)
    batch_action = torch.cat(batch_action)
    batch_reward = torch.cat(batch_reward)
    batch_next_state = torch.cat(batch_next_state)
    batch_done = torch.cat(batch_done)
    # Compute the Q-values for the current state
    q_values = model(batch_state.to(device))
    q_values = q_values.gather(1, batch_action.unsqueeze(1).to(device)).squeeze(1)
    # Compute the Q-values for the next state using the target network
    target_q_values = target_model(batch_next_state.to(device)).detach()
    target_q_values, _ = target_q_values.max(dim=1)
    # Compute the expected Q-values
    expected_q_values = batch_reward.to(device) + gamma * target_q_values * (1 - batch_done.to(device))
    # Compute the loss
    loss = criterion(q_values, expected_q_values)
    # Update the parameters of the model
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
def main():
    # Initialize the environment
    env = gym.make('Breakout-v0')
    # Initialize the replay memory
    memory = deque(maxlen=100000)
    # Initialize the target network
    target_model = CNN()
    update_target_network(model, target_model)
    target_model.to(device)
    target_model.eval()
    # Initialize the training parameters
    epsilon = 1.0
    epsilon_decay = 0.9999
    epsilon_min = 0.1
    gamma = 0.99
    batch_size = 32
    num_episodes = 1000
    # Initialize the episode reward and steps
    episode_reward = 0
    episode_steps = 0
    # Save the rewards and steps for each episode
    rewards = []
    steps = []
    # Begin the training
    for episode in range(num_episodes):
        # Reset the environment and the state
        state = env.reset()
        state = preprocess_frame(state).to(device)
        done = False
        while not done:
            # Choose an action using an epsilon-greedy policy
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                q_values = model(state.to(device))
                _, action = q_values.max(dim=1)
                action = action.item()
            # Take the action and observe the next state and reward
            next_state, reward, done, _ = env.step(action)
            next_state = preprocess_frame(next_state).to(device)
            # Add the transition to memory
            memory.append((state, torch.tensor([action]), torch.tensor([reward_number if reward > 0 else -reward_number]), next_state, torch.tensor([done])))
            # Update the state, episode reward, and episode steps
            state = next_state
            episode_reward += reward
            episode_steps += 1
            # Train the model
            train(model, target_model, memory, batch_size, gamma, optimizer, criterion)
            # Update the target network every 10000 steps
            if episode_steps % 10000 == 0:
                update_target_network(model, target_model)
        # Decay the epsilon value
        epsilon = max(epsilon * epsilon_decay, epsilon_min)
        # Save the episode reward and steps
        rewards.append(episode_reward)
        steps.append(episode_steps)
        # Print the episode number, reward, and epsilon
        print('Episode: {}, Reward: {}, Epsilon: {:.4f}'.format(episode + 1, episode_reward, epsilon))
        # Reset the episode reward and steps
        episode_reward = 0
        episode_steps = 0
        # Save the model every 50 episodes
        if (episode + 1) % 50 == 0:
            torch.save(model.state_dict(), 'breakout_model_{}.pt'.format(episode + 1))
    # Plot the rewards and steps
    plt.plot(rewards)
    plt.title('Rewards')
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.show()
    plt.plot(steps)
    plt.title('Steps')
    plt.xlabel('Episode')
    plt.ylabel('Steps')
    plt.show()