### Replication of Alexnet. 
#### Due to limited computational resources the replciation is in limited scale. In particular, only the network were trained to only recognize first 20 classes of images. But this can be easily relaxed given sufficient computational power.

In [None]:
import numpy as np
from functools import partial
from typing import Any, Optional

import os
import cv2
import time

import pandas as pd
import torch.nn.init as init

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import transforms

import matplotlib.pyplot as plt

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Neural Network Architecture.
class AlexNet(nn.Module):
    def __init__(self, num_classes: int = 1000, dropout: float = 0.5) -> None:
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=2),
            nn.BatchNorm2d(96),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(96, 256, kernel_size=5, padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(256, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(p=dropout),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
    
    # Xavier Initialization
    def initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                # Initialize weights for convolutional and linear layers
                init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    # Initialize biases if they exist
                    init.constant_(m.bias, 0)



In [None]:
# Handle Exception for greyscale images
class ConvertToRGB(object):
    def __call__(self, img):
        if img.shape[0] == 1:  # Check if the image has only one channel
            img = torch.stack([img[0]] * 3, dim=0)  # Convert single channel to RGB
        return img 

# Preprocessing Images
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    ConvertToRGB(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

In [None]:
# the images were stored locally
image_path = "/Users/Limit/imagenet-object-localization-challenge_100"
filenames_image_path = []
label_train = []
root_image = []
counter = 0
current_label = 0

for root, _, filenames in os.walk(image_path):
    current_root = root
    for i in filenames:
        counter += 1
        # Print whenever one classes of images finished reading (each class has 1300 images)
        if ((counter) %1300 == 0):
            current_label += 1
            print(counter)
        # get labels
        label_train.append(current_label)
        temp = current_root + "\\" + i
        filenames_image_path.append(temp)
true_label = 0    
correct_labels = 0
start_time = time.time()

print("image loading complete")

counter_1=0
x_train = []
# Due to limited computational resources, only train with first 20 classes of images.
for i in range(26000):
    image_name = filenames_image_path[i] 
    input_image = Image.open(image_name)
    input_tensor = preprocess(input_image)
    input_batch = input_tensor
    # move the input and model to GPU for speed if available
    if torch.cuda.is_available():
        input_batch = input_batch.to('cuda')
        model_alex_given.to('cuda')
    
    x_train.append(input_batch)
    counter_1 += 1
    # print the counter whenever finished processing the corresponding class
    if ((counter_1+1) %1300 == 0):
            counter_1 += 1
            print(counter_1)
    
print('image processing compelte')

y_train = label_train[:26000]

In [None]:
# Define dataset class
class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], torch.tensor(self.labels[idx])
    


In [None]:
# Define the loss function
criterion = nn.CrossEntropyLoss()
# Initialize model
model = AlexNet().to(device)
model.initialize_weights()
# Define the optimizer
optimizer = optim.SGD(model.parameters(), lr=0.005, weight_decay=5e-4)

# Prepare the data
train_data = x_train  # List of input tensors
train_labels = y_train  # List of corresponding labels
dataset = CustomDataset(train_data, train_labels)
dataloader = DataLoader(dataset, batch_size=128, shuffle=True)

# Train the models
num_epochs = 10
for epoch in range(num_epochs):
    current_loss = 0.0
    for inputs, labels in dataloader:
        
        optimizer.zero_grad()
        outputs = model(inputs)
        # get the gradient before the update
        Before = list(model.parameters())[0].clone()

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        current_loss += loss.item()
        # get the gradient after the update
        After = list(model.parameters())[0].clone()
    # Print the current learning detail    
    for param_group in optimizer.param_groups:
        print("Learning rate:", param_group['lr'])
    print()
    print('another way to print learning rate:')
    for group in optimizer.param_groups:
        for p in group['params']:
            print(p.grad)  # Print gradients
    print('end of p.grad')
    print()
    # Verify whether gradient is computed successfully
    print(torch.equal(Before.data, After.data))
    print(f'Epoch {epoch+1} finished')
    epoch_loss = current_loss / len(dataset)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    print('**************************************************************')
    print()

In [None]:
# read test_label location
labels_path = '/Users/Limit/imagenet_annot/validation_set_labels.csv'
labels_df = pd.read_csv(labels_path)
# find the labels of only the first 20 classes
labels_df_leq_20 = labels_df[labels_df['label'] <= 20]
labels_validation_images = labels_df_leq_20['label'].tolist()

In [None]:
model.eval()
# find the test images
image_path = "/Users/Limit/imagenet-object-localization-challenge_validation/val"
filenames_image_path = []
for root, _, filenames in os.walk(image_path):
    for i in filenames:
        if (i.split('.')[0] in labels_df_leq_20['ImageId'].tolist()):
            filenames_image_path.append(i)
true_label = 0    
counter = 0
correct_labels = 0
start_time = time.time()
grab_980_max_val = []
for i in range(len(filenames_image_path)):
    counter +=1
    image_name = image_path + '/' + filenames_image_path[i]
    input_image = Image.open(image_name)

    input_tensor = preprocess(input_image)
    input_batch = input_tensor.unsqueeze(0) # create a mini-batch as expected by the model

    # move the input and model to GPU for speed if available
    if torch.cuda.is_available():
        input_batch = input_batch.to('cuda')
        model.to('cuda')
    
    if (counter%100 == 0):
        print("currently at", counter, 'current time is', time.time() - start_time)
    with torch.no_grad():
        output = model(input_batch)

    # print the prediction results in this format
    print('********************************************************************************')
    print('Predicting Test Sample', counter, ':   Prediction is Correct?')
    if (torch.argmax(output[0]).item() == labels_validation_images[i]):
        print('Yes')
        correct_labels += 1
    else:
        print('No')
    prob_softmax = torch.softmax(output[0], dim = 0)
    print("first 20 classes probability:", prob_softmax[:20])
    print()
    print('max probability is', torch.max(prob_softmax, dim = 0))
    print()
    print('the max probability of the rest of 980 dim is')
    print(torch.topk(prob_softmax[20:], k=4))
    print('End*****************************************************************************')
    print()
    
    grab_980_max_val.append(torch.topk(prob_softmax[20:], k=4)[1][1:])
    # The output has unnormalized scores. To get probabilities, can run a softmax on it.
    probabilities = torch.nn.functional.softmax(output[0], dim=0)

print('the overall testing error is')
print(correct_labels/counter)

In [None]:
# Save the weight as the output
torch.save(model.state_dict(), 'model_weights_alexnet_replication.pth')