# ResNeXT Experiment Notebook
summary_goes_here

## Import Libraries

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import sampler
import torchvision.transforms as T
import numpy as np
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import matplotlib.pyplot as plt

**Making sure the GPU is available for increased speed for computations**

In [2]:
USE_GPU = True
num_class = 100
dtype = torch.float32 # we will be using float throughout this tutorial

if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

# Constant to control how frequently we print train loss
print_every = 100

print('using device:', device)

using device: cuda


## Preparing The Dataset
We run the first cell to initialize our dataloader with our dataset, but also to find the mean and standard deviation inside the images of the dataset in order to normalize the dataset correctly.

In [6]:
# Define transformations
transform = transforms.Compose([
    transforms.ToTensor(),  # Convert PIL image to tensor
    # No resizing
    # No normalization for now
])

# Define paths to your train, val, and test folders
train_path = './archive/train'
val_path = './archive/test'
test_path = './archive/test'

# Create dataset instances
train_dataset = datasets.ImageFolder(train_path, transform=transform)
val_dataset = datasets.ImageFolder(val_path, transform=transform)
test_dataset = datasets.ImageFolder(test_path, transform=transform)

# Calculate mean and std for normalization
mean = [0., 0., 0.]
std = [0., 0., 0.]
for img, _ in train_dataset:
    for i in range(3):  # Iterate over channels
        mean[i] += img[i, :, :].mean()
        std[i] += img[i, :, :].std()

num_samples = len(train_dataset)
for i in range(3):  # Calculate mean and std per channel
    mean[i] /= num_samples
    std[i] /= num_samples

print(f"mean: {mean}")
print(f"std: {std}")

# Define transformations with calculated mean and std for normalization
transform = transforms.Compose([
    transforms.ToTensor(),  # Convert PIL image to tensor
    transforms.Normalize(mean=mean, std=std)  # Normalize images
])

# Apply the new transformation to the datasets
train_dataset = datasets.ImageFolder(train_path, transform=transform)
val_dataset = datasets.ImageFolder(val_path, transform=transform)
test_dataset = datasets.ImageFolder(test_path, transform=transform)

mean: [tensor(0.5077), tensor(0.5077), tensor(0.5077)]
std: [tensor(0.2120), tensor(0.2120), tensor(0.2120)]


In [26]:
# Define paths to your train, val, and test folders
train_path = './archive/train'
val_path = './archive/test'
test_path = './archive/test'

# Define transformations with calculated mean and std for normalization
transform = transforms.Compose([
    transforms.ToTensor(),  # Convert PIL image to tensor
    transforms.Normalize((0.5077, 0.5077, 0.5077), (0.2120, 0.2120, 0.2120))  # Normalize images
])

# Apply the new transformation to the datasets
train_dataset = datasets.ImageFolder(train_path, transform=transform)
val_dataset = datasets.ImageFolder(val_path, transform=transform)
test_dataset = datasets.ImageFolder(test_path, transform=transform)

# Define batch size
batch_size = 64

# Create data loaders
loader_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
loader_val = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
loader_test = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

## Function Definitions

In [55]:
def flatten(x):
    N = x.shape[0] # read in N, C, H, W
    return x.view(N, -1)  # "flatten" the C * H * W values into a single vector per image

def check_accuracy_part34(loader, model):
    if loader.dataset:
        print('Checking accuracy on test set')   
    num_correct = 0
    num_samples = 0
    model.eval()  # set model to evaluation mode
    with torch.no_grad():
        for (x, y) in loader:
            x = x.to(device=device, dtype=dtype)  # move to device, e.g. GPU
            y = y.to(device=device, dtype=torch.long)
            scores = model(x)
            _, preds = scores.max(1)
            num_correct += (preds == y).sum()
            num_samples += preds.size(0)
        acc = float(num_correct) / num_samples
        print('Got %d / %d correct (%.2f)' % (num_correct, num_samples, 100 * acc))
    return acc

def train_part34(model, optimizer, epochs=1):
    """
    Train a model on FER-2013 using the PyTorch Module API.
    
    Inputs:
    - model: A PyTorch Module giving the model to train.
    - optimizer: An Optimizer object we will use to train the model
    - epochs: (Optional) A Python integer giving the number of epochs to train for
    
    Returns: The accuracy of the model
    """
    #print(f"print_every: {print_every}")
    loss_list = []
    model = model.to(device=device)  # move the model parameters to CPU/GPU
    for e in range(epochs):
        for t, (x, y) in enumerate(loader_train):
            model.train()  # put model to training mode
            x = x.to(device=device, dtype=dtype)  # move to device, e.g. GPU
            y = y.to(device=device, dtype=torch.long)

            scores = model(x)
            loss = F.cross_entropy(scores, y)

            # Zero out all of the gradients for the variables which the optimizer
            # will update.
            optimizer.zero_grad()

            # This is the backwards pass: compute the gradient of the loss with
            # respect to each  parameter of the model.
            loss.backward()
            loss_list.append(loss.item())
            # Actually update the parameters of the model using the gradients
            # computed by the backwards pass.
            optimizer.step()
            #print(f"t+1: {t+1}")
            if (t + 1) % print_every == 0:
                print('Epoch %d, Iteration %d, loss = %.4f' % (e, t + 1, loss.item()))
                check_accuracy_part34(loader_test, model)
                print()        
    # Plotting the loss history during the training
    plt.plot(loss_list)
    plt.xlabel('Iterations')
    plt.ylabel('Loss')
    plt.title('Plot of Loss Over Iterations')
    plt.show()
    return check_accuracy_part34(loader_test, model)


# Experiment Starts Here

In [53]:
model = None
optimizer = None

class MyResNet(nn.Module):    
    def __init__(self):
        super(MyResNet, self).__init__()
        kernel_size = 5
        self.conv1 = nn.Sequential(
                            nn.Conv2d(3, 64, (7,7), stride=2, padding=3),
                            nn.BatchNorm2d(64),
                            nn.MaxPool2d(kernel_size, stride=2, padding=1),
                            nn.ReLU(),
                        )
        self.conv2_x = nn.Sequential(
                            nn.Conv2d(64, 64, kernel_size, padding=2),
                            #nn.BatchNorm2d(64),
                            #nn.ReLU(),
                            #nn.Conv2d(64, 64, kernel_size, padding=2),
                            #nn.ReLU(),
                        )
        self.conv3_x = nn.Sequential(
                            nn.BatchNorm2d(64),
                            nn.ReLU(),
                            nn.Conv2d(64, 128, kernel_size, padding=2),
                            #nn.BatchNorm2d(128),
                            #nn.ReLU(),
                            #nn.Conv2d(128, 128, kernel_size, padding=2),
                            #nn.ReLU(),
                        )
        self.conv4_x = nn.Sequential(
                            nn.BatchNorm2d(128),
                            nn.ReLU(),
                            nn.Conv2d(128, 256, kernel_size, padding=2),
                            #nn.BatchNorm2d(256),
                            #nn.ReLU(),
                            #nn.Conv2d(256, 256, kernel_size, padding=2),
                            #nn.ReLU(),
                        )
        self.conv5_x = nn.Sequential(
                            nn.BatchNorm2d(256),
                            nn.ReLU(),
                            nn.Conv2d(256, 512, kernel_size, stride=2, padding=2),
                            #nn.BatchNorm2d(512),
                            #nn.ReLU(),
                            #nn.Conv2d(512, 512, kernel_size, padding=2),
                            #nn.ReLU(),
                        )
        self.conv6_x = nn.Sequential(
                            #nn.BatchNorm2d(256),
                            nn.BatchNorm2d(512),
                            nn.ReLU(),
                            nn.Conv2d(512, 1024, kernel_size, stride=2, padding=2),
                            #nn.BatchNorm2d(1024),
                            #nn.ReLU(),
                            #nn.Conv2d(1024, 1024, kernel_size, padding=2),
                            #nn.ReLU(),
                        )
        self.last_x = nn.Sequential(
                            #nn.BatchNorm2d(512),
                            nn.BatchNorm2d(1024),
                            nn.ReLU(),
                            nn.MaxPool2d((2, 2)),
                            nn.Flatten(),
                            nn.Linear(1024, 1000),
                            nn.Linear(1000, 100),
                        )
        self.skip_connection1 = nn.Sequential(
                            nn.Conv2d(64, 64, (1,1), stride=1, padding=0),
                        )
        self.skip_connection2 = nn.Sequential(
                            nn.Conv2d(64, 128, (1,1), stride=1, padding=0),
                        )
        self.skip_connection3 = nn.Sequential(
                            nn.Conv2d(128, 256, (1,1), stride=1, padding=0),
                        )
        self.skip_connection4 = nn.Sequential(
                            nn.Conv2d(256, 512, (1,1), stride=2, padding=0),
                        )
        self.skip_connection5 = nn.Sequential(
                            nn.Conv2d(512, 1024, (1,1), stride=2, padding=0),
                        )
        
    def forward(self, x):
        scores = None
        output = self.conv1(x)
        skip = self.skip_connection1(output)
        
        output = self.conv2_x(output)
        output += skip
        output2 = output
        output3 = output
        output4 = output
        skip = self.skip_connection2(output)
        skip2 = self.skip_connection2(output2)
        skip3 = self.skip_connection2(output3)
        
        output = self.conv3_x(output)
        output2 = self.conv3_x(output2)
        output3 = self.conv3_x(output3)
        output += skip
        output2 += skip2
        output3 += skip3
        skip = self.skip_connection3(output)
        skip2 = self.skip_connection3(output2)
        skip3 = self.skip_connection3(output3)
        
        output = self.conv4_x(output)
        output2 = self.conv4_x(output2)
        output3 = self.conv4_x(output3)
        output += skip
        output2 += skip2
        output3 += skip3
        skip = self.skip_connection4(output)
        skip2 = self.skip_connection4(output2)
        skip3 = self.skip_connection4(output3)
        
        output = self.conv5_x(output)
        output2 = self.conv5_x(output2)
        output3 = self.conv5_x(output3)
        skip = self.skip_connection5(output)
        skip2 = self.skip_connection5(output2)
        skip3 = self.skip_connection5(output3)
        
        output = self.conv6_x(output)
        output2 = self.conv6_x(output2)
        output3 = self.conv6_x(output3)
        output += output2 + output3 + skip + skip2 + skip3
        
        scores = self.last_x(output)
        return scores

model = MyResNet()
optimizer = optim.Adam(params=model.parameters(), lr=0.00005)
print(f"Batch size: {batch_size}")

print_every = 100
train_part34(model, optimizer, epochs=10)
print_every = 100

Batch size: 64
Epoch 0, Iteration 10, loss = 1.8521
Checking accuracy on test set
Got 1770 / 7178 correct (24.66)

Epoch 0, Iteration 20, loss = 1.7418
Checking accuracy on test set
Got 1660 / 7178 correct (23.13)

Epoch 0, Iteration 30, loss = 1.7022
Checking accuracy on test set
Got 2239 / 7178 correct (31.19)

Epoch 0, Iteration 40, loss = 1.7655
Checking accuracy on test set


KeyboardInterrupt: 

In [54]:
# Checks your accuracy
best_model = model
check_accuracy_part34(loader_test, best_model)

Checking accuracy on test set
Got 2440 / 7178 correct (33.99)


0.3399275564224018