ResNeXt

Setup environment

In [None]:
import os
import csv
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split, Dataset,  Subset
import torch.optim as optim
from tqdm import tqdm
import torchvision
from torchvision import datasets, transforms
import torchvision.models as models
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd
from sklearn.model_selection import KFold
import numpy as np
from PIL import Image
%matplotlib inline

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("cuda:", torch.cuda.is_available())

# Directory paths in the local machine
test_data_path = "./Dataset/test"
combined_data_path = "./Dataset/train"

# for reproducibility and quick adjustments
torch.manual_seed(0)
BATCH_SIZE = 32
k_folds = 5
num_epochs = 10


Prep the training and validation datasets

In [None]:

# Define the Transformations, with data augmentation
transform_train = transforms.Compose([
    # transforms.RandomResizedCrop(size=224, scale=(0.5, 1.0), ratio=(0.75, 1.33), interpolation=transforms.InterpolationMode.BICUBIC),
    # transforms.RandomHorizontalFlip(),
    # transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    # transforms.RandomAffine(degrees=5, translate=(0.05, 0.05), scale=(0.95, 1.05)),
    # transforms.ToTensor(),
    # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.Resize((224, 224), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


# for validation without data augmentation
transform_val = transforms.Compose([
    transforms.Resize((224, 224), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create a dataset with training transform
full_dataset = datasets.ImageFolder(root=combined_data_path, transform=transform_train)

# Create a KFold object with k_folds number of splits
kfold = KFold(n_splits=k_folds, shuffle=True)

# # Split the dataset into train and validation sets
# num_train = len(full_dataset)
# num_val = int(0.2 * num_train)
# num_train -= num_val
# train_subset, val_subset = random_split(full_dataset, [num_train, num_val])

# # Manually set the transform for validation subset
# val_subset.dataset.transform = transform_val

# # Create DataLoaders for training and validation sets
# train_loader = DataLoader(dataset=train_subset, batch_size=BATCH_SIZE, shuffle=True)
# val_loader = DataLoader(dataset=val_subset, batch_size=BATCH_SIZE, shuffle=False)

# # Display training/validation set count
# print(f"Training image count: {len(train_subset)}")
# print(f"Validation image count: {len(val_subset)}")


Prep the testing dataset

In [None]:
# # For the test set, since there are no labels, we need to create a custom DataSet.
# class UnlabeledDataset(datasets.VisionDataset):
#     def __init__(self, root_dir, transform=None):
#         super(UnlabeledDataset, self).__init__(root_dir, transform=transform)
#         self.images = [os.path.join(root_dir, img) for img in os.listdir(root_dir)]

#     def __getitem__(self, index):
#         image_path = self.images[index]
#         image = datasets.folder.default_loader(image_path)  # default loader is PIL.Image.open
#         if self.transform:
#             image = self.transform(image)
#         return image, str(image_path)

#     def __len__(self):
#         return len(self.images)

# Custom dataset loader to handle unlabeled data.
class UnlabeledDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_list = os.listdir(root_dir)

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.image_list[idx])
        image = Image.open(img_name).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, self.image_list[idx]

test_dataset = UnlabeledDataset(root_dir=test_data_path, transform=transform_val)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)


Setup Model with ResNeXt

In [None]:
# resnext = models.resnext50_32x4d(pretrained=True)
resnext = models.resnext101_32x8d(pretrained=True)

class MyModel(nn.Module):
    def __init__(self, resnext):
        super(MyModel, self).__init__()
        self.resnext = resnext
        num_features = resnext.fc.in_features  # Get the number of inputs for the last layer
        self.resnext.fc = nn.Identity() # Repace the last fc layer with a place holder

        # Freeze all pretrained layers
        for param in self.resnext.parameters():
            param.requires_grad = False

        self.fc = nn.Sequential(
            nn.Dropout(0.5),  # 50% of the neurons are dropped out
            nn.Linear(num_features, 100)
        )

    def forward(self, x):
        x = self.resnext(x)
        x = self.fc(x)
        return x

# model = MyModel(resnext)
# model = model.to(device)

# # Load a pre-trained ResNeXt model (or resnext101_32x8d)
# model = models.resnext50_32x4d(pretrained=True)
# # model = models.resnext101_32x8d(pretrained=True)

# # Replace the last fully connected layer
# num_features = model.fc.in_features  # Get the number of inputs for the last layer
# model.fc = nn.Sequential(
#     nn.Dropout(0.5),  # 50% of the neurons are dropped out
#     nn.Linear(num_features, 100)
# )

# # Freeze all layers
# for param in model.parameters():
#     param.requires_grad = False

# # Unfreeze the last layer
# for param in model.fc.parameters():
#     param.requires_grad = True

# # # Unfreeze additional layers
# # for name, child in model.named_children():
# #     if name in ['layer4']:  # Unfreeze 'layer4'
# #         for param in child.parameters():
# #             param.requires_grad = True

# model = model.to(device)


Define Loss Function and Optimizer

In [None]:
# # Define the loss function
criterion = nn.CrossEntropyLoss()

# # Define the optimizer
# optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-2) # weight_decay is L2 regularization
# # optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)


Training and Validation Loop

In [None]:
# Keep track of training loss and validation accuracy
all_train_losses = [[] for _ in range(num_epochs)]
all_val_accuracies = [[] for _ in range(num_epochs)]

# num_epochs = 1  # debug

# Start the k-fold cross-validation
for fold, (train_ids, val_ids) in enumerate(kfold.split(full_dataset)):
    print(f'Starting fold {fold+1}/{k_folds}')
    train_losses = []
    val_accuracies = []

    #initiate model
    model = MyModel(resnext)
    optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-2) # weight_decay is L2 regularization
    model = model.to(device)

    # Split the dataset into training and validation subsets for the current fold
    train_subset = Subset(full_dataset, train_ids)
    val_subset = Subset(full_dataset, val_ids)

    # Manually set the transform for validation subset
    val_subset.dataset.transform = transform_val

    # Create data loaders for the current fold
    train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=False)

    # Training and validation loop for the current fold
    for epoch in range(num_epochs):
        # Training loop
        model.train()
        total_train_loss = 0
        for batch_idx, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        # Validation loop
        model.eval()
        total_val_accuracy = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                total_val_accuracy += (predicted == labels).sum().item()

        # Calculate average loss and accuracy for the current epoch
        avg_train_loss = total_train_loss / len(train_loader)
        avg_val_accuracy = total_val_accuracy / len(val_loader.dataset)
        train_losses.append(avg_train_loss)
        val_accuracies.append(avg_val_accuracy)

        print(f'Epoch [{epoch+1}/{num_epochs}], Fold [{fold+1}/{k_folds}], Loss: {avg_train_loss:.4f}, Validation Accuracy: {avg_val_accuracy:.2f}%')

    # Aggregate metrics across folds
    for i in range(num_epochs):
        all_train_losses[i].append(train_losses[i])
        all_val_accuracies[i].append(val_accuracies[i])
    # End of the fold
    print(f'Fold {fold+1} completed.\n')
    
# Calculate the average metrics across all folds
avg_train_losses = [np.mean(losses) for losses in all_train_losses]
avg_val_accuracies = [np.mean(acc) for acc in all_val_accuracies]

Plotting Training Loss and Training Accuracy

In [None]:
# Plotting the averaged training loss and validation accuracy
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(range(1, num_epochs + 1), avg_train_losses, label='Average Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training Loss Across Folds')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(range(1, num_epochs + 1), avg_val_accuracies, label='Average Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.title('Validation Accuracy Across Folds')
plt.legend()

plt.show()

# plt.figure(figsize=(12, 6))

# # Plot training loss
# plt.subplot(1, 2, 1)
# plt.plot(train_losses, label='Training Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.title('Training Loss')
# plt.legend()

# # Plot validation accuracy
# plt.subplot(1, 2, 2)
# plt.plot(val_accuracies, label='Validation Accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.title('Validation Accuracy')
# plt.legend()

# plt.show()


Eva;uate and create the predictions

In [None]:
# Model's evaluation mode
model.eval()

test_predictions = []

# Assuming the model is already in evaluation mode and device is defined
with torch.no_grad():
    for images, paths in tqdm(test_loader, desc='Predicting labels'):
        images = images.to(device)

        # Get predictions
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        predicted_labels = [full_dataset.classes[p] for p in predicted.cpu().numpy()]

        # Go through the batch and add to our prediction list, including image paths
        for path, label in zip(paths, predicted_labels):
            test_predictions.append((Path(path).name, label))  # Appending a tuple of filename and label

with open('./CSV Files/submission_kfold.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['ID', 'Label'])
    writer.writerows(test_predictions)  # Writing all predictions at once