# **Homework 2**

In [9]:
import gdown
import zipfile
import os

In [10]:
#Check if files exists in data folder
if os.path.exists('data/'):
    print('Files already downloaded.')

else:
    output_path = 'data.zip'
    file_id = '1KDN-rFCq9IDJ7_kNW5y5Co100KNpklz-'
    url = f'https://drive.google.com/uc?id={file_id}'
    # Download the zip file
    gdown.download(url, output_path, quiet=False)

    # Extract the contents of the zip file
    with zipfile.ZipFile(output_path, 'r') as zip_ref:
        zip_ref.extractall('data')

    # Remove the zip file
    os.remove(output_path)


Files already downloaded.


In [11]:
if(os.path.exists('test/') and os.path.exists('train')):
    print('Files already extracted')
else:
    print('Extracting the test.zip and train.zip files...')
    # Extract the test.zip file
    with zipfile.ZipFile('data/public/test.zip', 'r') as zip_ref:
        zip_ref.extractall()

    # Extract the train.zip file
    with zipfile.ZipFile('data/public/train.zip', 'r') as zip_ref:
        zip_ref.extractall()

    print('Done!')

Files already extracted


## First Approach: Deep Reinforcement Learning (DRL)

In this approach, we will utilize Deep Reinforcement Learning (DRL) techniques to solve our problem. DRL combines the power of deep neural networks with reinforcement learning algorithms to learn optimal policies in complex environments.

### Step 1: Environment Setup

First, we need to define our environment. This includes selecting an appropriate gym environment or creating a custom environment that suits our problem. The environment should provide observations, actions, and rewards.

### Step 2: Agent Design

Next, we design our DRL agent. The agent consists of a deep neural network, often referred to as the Q-network, which takes observations as input and outputs action values for each possible action. We can use popular deep learning frameworks like PyTorch or TensorFlow to implement the Q-network.

### Step 3: Training Loop

The training loop involves the following steps:

1. Initialize the Q-network with random weights.
2. Observe the current state from the environment.
3. Select an action using an exploration-exploitation strategy, such as epsilon-greedy or softmax.
4. Execute the selected action in the environment and observe the next state and reward.
5. Update the Q-network using the observed state, action, next state, and reward.
6. Repeat steps 2-5 until convergence or a maximum number of iterations.

During training, we can use techniques like experience replay and target networks to stabilize and improve the learning process.

### Step 4: Evaluation

After training, we evaluate the performance of our agent by running it in the environment and measuring its performance metrics, such as average reward or success rate. This helps us assess the effectiveness of our DRL approach.

### Step 5: Fine-tuning and Optimization

Based on the evaluation results, we can fine-tune and optimize our DRL approach. This may involve adjusting hyperparameters, modifying the network architecture, or trying different exploration-exploitation strategies.

### Conclusion

Deep Reinforcement Learning (DRL) offers a powerful approach to solving complex problems by combining deep neural networks with reinforcement learning algorithms. By following the steps outlined above, we can develop and train a DRL agent to learn optimal policies in our environment. However, it is important to note that DRL can be computationally intensive and may require significant computational resources and time for training.

## Second Approach

For the second approach, we will use a different architecture to train our model. Instead of using a pre-trained ResNet18 model, we will use a custom convolutional neural network (CNN) architecture.

### Step 1: Data Loading and Preprocessing

Similar to the first approach, we will load and preprocess our dataset using the same transformations. We will also create data loaders for the training and validation sets.

### Step 2: Model Architecture

In this approach, we will define a custom CNN model. The model will consist of multiple convolutional layers followed by fully connected layers. We will use ReLU activation functions and dropout regularization to prevent overfitting.

### Step 3: Training Loop

We will train the model using a similar training loop as in the first approach. We will iterate over the training set, compute the loss, perform backpropagation, and update the model's weights.

### Step 4: Model Evaluation

After training, we will evaluate the model on the validation set. We will calculate the accuracy of the model by comparing the predicted labels with the ground truth labels.

### Step 5: Save the Model

Finally, we will save the trained model to a file for future use.

## Conclusion

In this second approach, we used a custom CNN architecture to train our model. This approach allows us to have more control over the model's architecture and potentially achieve better performance. However, it requires more manual design and experimentation compared to using a pre-trained model like ResNet18.

It is important to note that the choice of architecture depends on the specific problem and dataset. It is recommended to experiment with different architectures and hyperparameters to find the best model for your task.


In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import ImageFolder
import torch.nn.functional as F
from torch.optim.lr_scheduler import StepLR

# Step 1: Data Loading and Preprocessing
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(96),   # Randomly resize and crop the image
    transforms.RandomHorizontalFlip(),  # Randomly flip the image horizontally
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),  # Adjust color
    transforms.RandomRotation(30),       # Randomly rotate the image
    transforms.ToTensor(),               # Convert the image to a PyTorch tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize the image
])

# Define data transformations for validation (you can modify these based on your needs)
val_transform = transforms.Compose([
    transforms.Resize(96),
    transforms.CenterCrop(96),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Load your dataset (adjust this based on your dataset structure)
train_dataset = ImageFolder(root='train', transform=train_transform)
val_dataset = ImageFolder(root='test', transform=val_transform)

num_workers = 2

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=num_workers)

# Print shapes of one batch of training and validation data
for images, labels in train_loader:
    print('Image batch dimensions:', images.shape)
    print('Image label dimensions:', labels.shape)
    break

# Get Input Shape from train_loader
input_shape = next(iter(train_loader))[0].shape
print('Input Shape:', input_shape)


Image batch dimensions: torch.Size([64, 3, 96, 96])
Image label dimensions: torch.Size([64])


In [13]:
class DQN(nn.Module):
    def __init__(self, n_frames, n_actions, h_dimension):
        super(DQN, self).__init__()

        # CNN
        self.layers_cnn = nn.Sequential(
            nn.Conv2d(n_frames, 6, kernel_size=(7, 7), stride=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Conv2d(6, 12, kernel_size=(4, 4)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Flatten(),
            nn.Linear(432, h_dimension),
            nn.ReLU(),
            nn.Linear(h_dimension, n_actions)
        )

    def forward(self, x):
        o = self.layers_cnn(x)  # (BS, ACTIONS)
        return o

In [14]:
import random
from collections import deque
import numpy as np

class DQNAgent:
    def __init__(self,
                 action_space,
                 epsilon=1.0,
                 gamma=0.95,
                 epsilon_min=0.1,
                 epsilon_decay=0.9999,
                 lr=1e-3,
                 memory_len=5000,
                 frames=3,
                 hidden_dimension=None,
                 device=None):

        self.device = device
        self.epsilon = epsilon
        self.gamma = gamma
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.memory_len = memory_len
        self.lr = lr
        self.memory = deque(maxlen=self.memory_len)
        self.action_space = action_space

        self.target_model = DQN(frames, len(self.action_space), hidden_dimension).to(self.device)
        self.model =        DQN(frames, len(self.action_space), hidden_dimension).to(self.device)

        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def is_explore(self):
        flip = np.random.rand() <= self.epsilon
        return flip

    def act(self, state, is_only_random=False, is_only_exploit=False):
        if not is_only_exploit and self.is_explore() or is_only_random:
            action_index = np.random.randint(len(self.action_space))
            # print(action_index, self.ACTION_SPACE[action_index])
        else:
            q_values = self.target_model(state)[0]
            action_index = torch.argmax(q_values)
            # print("predicted action", action_index)
        return self.action_space[action_index]

    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, self.action_space.index(action), reward, next_state, done))

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        train_state = []
        train_target = []

        for state, action_index, reward, next_state, done in minibatch:
            # state = torch.Tensor(state)
            target = self.model(state)[0]
            train_state.append(target)

            target_copy = target.detach().clone().to(self.device)
            if done:
                target_copy[action_index] = reward
            else:
                t = self.target_model(next_state)[0]
                target_copy[action_index] = reward + self.gamma * torch.max(t)
            train_target.append(target_copy)

        # Actual training
        criterion = nn.MSELoss()
        pred, tru = torch.stack(train_state), torch.stack(train_target)
        loss = criterion(pred, tru)

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load_model(self, name):
        self.model = torch.load(name)
        self.target_model = torch.load(name)
        self.model.eval()

    def save_model(self, name):
        torch.save(self.target_model, name)

In [15]:
class Config:

    SEED = 1

    STARTING_EPISODE_TRAIN = 0
    ENDING_EPISODE_TRAIN = STARTING_EPISODE_TRAIN + 1000

    STARTING_EPISODE_TEST = ENDING_EPISODE_TRAIN + 1
    ENDING_EPISODE_TEST = STARTING_EPISODE_TEST + 100

    SKIP_FRAMES = 2
    TRAINING_BATCH_SIZE = 64
    UPDATE_TARGET_MODEL_FREQUENCY = 5
    N_FRAMES = 3
    HIDDEN_DIMENSION_FC = 150

    GAS_WEIGHT = 1.3

    ACTION_SPACE = [
        (-1, 1, 0.2), (0, 1, 0.2), (1, 1, 0.2),  # .  Action Space Structure
        (-1, 1, 0), (0, 1, 0), (1, 1, 0),        # (Steering Wheel, Gas, Break)
        (-1, 0, 0.2), (0, 0, 0.2), (1, 0, 0.2),  # .  -1~1     0~1        0~1
        (-1, 0, 0), (0, 0, 0), (1, 0, 0)
    ]


In [16]:

import os
from datetime import datetime
import cv2
import torch
import matplotlib.pyplot as plt
import random
import numpy as np

import json


def write_json_to_file(data, file_path):
    """
    Write JSON data to a file.

    Parameters:
    - data: A dictionary representing the JSON data.
    - file_path: The path where the JSON file will be written.
    """
    try:
        with open(file_path, 'w') as json_file:
            json.dump(data, json_file, indent=4)
        print(f"JSON data successfully written to {file_path}")
    except Exception as e:
        print(f"Error writing JSON data to {file_path}: {e}")


def read_json_from_file(file_path):
    """
    Read JSON data from a file.

    Parameters:
    - file_path: The path of the JSON file to be read.

    Returns:
    - A dictionary representing the JSON data.
    - If there is an error reading the file, returns None.
    """
    try:
        with open(file_path, 'r') as json_file:
            data = json.load(json_file)
        print(f"JSON data successfully read from {file_path}")
        return data
    except Exception as e:
        print(f"Error reading JSON data from {file_path}: {e}")
        return None


def make_all_paths(is_dynamic_root=True, dir_name="rl_class"):
    ROOT = "data"

    if is_dynamic_root:
        date_str = datetime.now().strftime("%m-%d-%Y_%H-%M-%S")
        dir_name = "rl_class_{}".format(date_str)
    else:
        dir_name = dir_name

    path_root = ROOT + "/" + dir_name + "/"
    dirs = ["models", "plots", "videos"]
    for d in dirs:
        path = path_root + d
        if not os.path.exists(path):
            os.makedirs(path)
        print(">> Created dir", path)
    return path_root


def plot_state_car(data, title=None):
    assert len(data.shape) == 3, "Can only handle 3D mats."
    assert data.shape[0] < 10, "Too many states to plot. Adjust the plots position first."

    # Create a figure with three subplots
    fig, axs = plt.subplots(1, data.shape[0], figsize=(10, 4))

    # Plot each image using imshow()
    for i in range(data.shape[0]):
        axs[i].imshow(data[i], cmap='gray')  # You can adjust the colormap if needed
        axs[i].axis('off')                   # Turn off axis labels

    plt.title(title)
    plt.show()


def plot_frame_car(data, title=None):
    plt.imshow(data, cmap="gray")  # You can adjust the colormap if needed
    plt.axis('off')  # Turn off axis labels
    plt.title(title)
    plt.show()


def preprocess_frame_car(frame):
    def crop(frame):
        # Crop to 84x84
        return frame[:-12, 6:-6]

    def make_img_gray(frame):
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        return frame

    def normalize(frame):
        return frame / 255.0

    # frame = crop(frame)
    frame = make_img_gray(frame)
    frame = frame.astype(float)
    frame = normalize(frame)
    # frame = frame * 2 - 1   # maps [0,1] to [-1,1]
    return frame


def seed_everything(seed=42):
    # Set seed for Python random module
    random.seed(seed)

    # Set seed for NumPy
    np.random.seed(seed)

    # Set seed for PyTorch
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False  # disable if deterministic mode is desired


In [17]:

#import cv2   # open cv
import torch
from matplotlib import pyplot as plt
import gymnasium as gym
from collections import deque
import numpy as np
from gymnasium.wrappers import RecordVideo


def train_car_racing():
    seed_everything(seed=Config.SEED)
    PATH_ROOT = make_all_paths(is_dynamic_root=True)
    write_json_to_file(dict(Config.__dict__), file_path=PATH_ROOT + "config.json")

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print('>> Using device:', device)

    agent = DQNAgent(frames=Config.N_FRAMES, action_space=Config.ACTION_SPACE, device=device,
                     hidden_dimension=Config.HIDDEN_DIMENSION_FC)

    # https://www.gymlibrary.dev/environments/box2d/car_racing/
    env = gym.make('CarRacing-v2', render_mode="rgb_array")  # , render_mode='human')
    env = RecordVideo(env, PATH_ROOT + 'videos', episode_trigger=lambda x: x % Config.UPDATE_TARGET_MODEL_FREQUENCY == 0)

    epi_total_rewards = []
    for e in range(Config.STARTING_EPISODE_TRAIN, Config.ENDING_EPISODE_TRAIN + 1):
        env.episode_id = e

        epi_total_reward = 0
        epi_negative_reward_counter = 0
        epi_time_frame_counter = 1
        epi_done = False

        init_state = env.reset(seed=e)[0]  # 96, 96, 3 pixels image RGB
        init_state = preprocess_frame_car(init_state)  # 96, 96 pixels image GRAY

        # (1) EVALUATE STATE: S
        state_queue = deque([init_state] * Config.N_FRAMES, maxlen=Config.N_FRAMES)
        # plot_state_car(np.array(state_queue))  # visualize S0

        while True:
            state_tensor = torch.Tensor(np.array(state_queue)).unsqueeze(0).to(device)
            action = agent.act(state_tensor)

            # (2) EXECUTE ACTION (for several steps)
            # (3) EVALUATE S' STATE, REWARD
            reward = 0
            for _ in range(Config.SKIP_FRAMES):
                # execute action
                next_state, r, epi_done, _, _ = env.step(action)
                # plot_frame_car(next_state)
                reward += r
                if epi_done:
                    break

            # (4) ADJUST REWARD
            # if getting negative reward 10 times after the tolerance steps, terminate this episode
            if epi_time_frame_counter > 100 and reward < 0:
                epi_negative_reward_counter += 1
            else:
                epi_negative_reward_counter = 0

            # extra bonus for the model if it uses full gas
            if action[1] == 1 and action[2] == 0:
                reward *= Config.GAS_WEIGHT

            epi_total_reward += reward

            # plot_state_car(np.array(state_queue), title="STATE 0")
            # process state S'
            next_state = preprocess_frame_car(next_state)
            next_state_queue = deque([frame for frame in state_queue], maxlen=Config.N_FRAMES)
            next_state_queue.append(next_state)
            # plot_state_car(np.array(next_state_queue), title="STATE 1")

            next_state_tensor = torch.Tensor(np.array(next_state_queue)).unsqueeze(0).to(device)

            # (5) STORE OBSERVATIONS
            # Memorizing saving state, action reward tuples
            agent.memorize(state_tensor, action, reward, next_state_tensor, epi_done)

            # S = S'
            state_queue = next_state_queue

            # early stop if the number of
            if epi_negative_reward_counter >= 25 or epi_total_reward < 0:
                break

            # (6) TRAIN ON BATCHES OF OBSERVATIONS
            # train the model with tuple, if there are enough tuples
            if len(agent.memory) > Config.TRAINING_BATCH_SIZE:
                agent.replay(Config.TRAINING_BATCH_SIZE)

            epi_time_frame_counter += 1
        epi_total_rewards += [epi_total_reward]

        # >>> ON EPISODE END
        # print stats
        stats_string = 'Episode: {}/{}, Scores(Time Frames): {}, Total Rewards: {:.2}, Epsilon: {:.2}'
        print(stats_string.format(
            e,
            Config.ENDING_EPISODE_TRAIN,
            epi_time_frame_counter,
            float(epi_total_reward),
            float(agent.epsilon))
        )

        if e % Config.UPDATE_TARGET_MODEL_FREQUENCY == 0:
            # plot rewards stats
            plt.plot(epi_total_rewards, label="cum rew", color="blue")
            plt.title("Rewards during episode episode")
            plt.savefig(PATH_ROOT + 'plots/reward_{}.pdf'.format(e))

            # save model frequently
            agent.save_model(PATH_ROOT + 'models/trial_{}.h5'.format(e))

            # swap model
            agent.update_target_model()
            write_json_to_file({"CUM_REW": epi_total_rewards}, PATH_ROOT + "/stats.json")

    env.close()


#train_car_racing()

## Second Approach ##

In [18]:
import torch
import torch.nn as nn

class CNN(nn.Module):
    def __init__(self, input_shape, num_classes):
        super(CNN, self).__init__()

        self.layers = nn.Sequential(
            nn.Conv2d(3, 15, kernel_size=(5, 5)),
            nn.ReLU(),
            nn.Conv2d(15, 20, kernel_size=(5, 5)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Conv2d(20, 30, kernel_size=(3, 3)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Flatten(),
            nn.Linear(30 * ((input_shape[0] - 12) // 4) * ((input_shape[1] - 12) // 4), 128),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, 96),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(96, num_classes),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        return self.layers(x)


In [19]:
# Import ResNet18 model
from torchvision.models import resnet18

# Load pre-trained ResNet18 model and extract features
resnet_model = resnet18(pretrained=True)
resnet_features = nn.Sequential(*list(resnet_model.children())[:-1])

# Create your model instance
model = CNN(resnet_features)

# Example forward pass
image_batch = torch.randn(64, 3, 96, 96)  # Example image batch
label_batch = torch.randint(5, (64,))  # Example label batch
output = model(image_batch)



In [20]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
criterion.to(device)

# Optimizer and Scheduler
optimizer = optim.RMSprop(model.parameters(), lr=0.0001)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)  # Optional learning rate scheduler


In [21]:
# Step 3: Training loop
num_epochs = 50

for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0

    for images, labels in train_loader:  # Assuming you have a DataLoader for your training set
        optimizer.zero_grad()  # Zero the gradients
        outputs = model(images)  # Forward pass
        loss = criterion(outputs, labels)  # Compute the loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights

        running_loss += loss.item()

    # Print the average loss for the epoch
    average_loss = running_loss / len(train_loader)
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {average_loss}')


Epoch 1/50, Loss: 1.5102678549289703
Epoch 2/50, Loss: 1.4958185875415801
Epoch 3/50, Loss: 1.4893801987171174
Epoch 4/50, Loss: 1.4854220819473267
Epoch 5/50, Loss: 1.4866083586215972
Epoch 6/50, Loss: 1.4780626440048217
Epoch 7/50, Loss: 1.4749259889125823
Epoch 8/50, Loss: 1.4737242865562439


: 

In [46]:

# After training, you can evaluate the model on your validation set
model.eval()  # Set the model to evaluation mode
correct = 0
total = 0

with torch.no_grad():
    for images, labels in val_loader:  # Assuming you have a DataLoader for your validation set
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f'Validation Accuracy: {accuracy}')


Validation Accuracy: 0.6569661695161877


In [8]:
torch.save(model.state_dict(), 'MyCNN.pth')

In [9]:

# Step 5: Model Evaluation
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in val_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f'Validation Accuracy: {accuracy}')


# Repeat steps 2-5 with a different architecture for Approach B
# ...

# Provide analysis and comments
# ...

Validation Accuracy: 0.6711531465987632
