In [1]:
# imports
import numpy as np # linear algebra
from PIL import Image # images
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# getting the transfer model
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import ImageFolder
from torch.utils.data.dataset import random_split
import os
from tqdm import tqdm
from google.colab import drive
drive.mount('/content/drive')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Mounted at /content/drive


In [2]:
learning_rate = 0.0005
weight_decay = 0.00005
batch_size = 32
image_size = 224 # Updated image size
image_channels = 3
num_epochs = 100 # short training for demo

In [3]:
torch.manual_seed(42)

# Define the directory where your dataset is stored
train_dir = "/content/drive/MyDrive/ucsc-cse-144-Winter-2024-final-project/train/train"
test_dir = "/content/drive/MyDrive/ucsc-cse-144-Winter-2024-final-project/test/test"

# Define transforms to apply to the images
train_transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.20, contrast=0.20, saturation=0.20, hue=0.1),
    # transforms.RandomGrayscale(p=0.1),
    # transforms.GaussianBlur(kernel_size=3),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Use ImageFolder to create a dataset
train_dataset = ImageFolder(root=train_dir, transform=train_transform)

# Split dataset into training and validation sets
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])



# Create a DataLoader to iterate over the dataset
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_data_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [4]:
print("Train dataset size:", len(train_dataset))
print("Validation dataset size:", len(val_dataset))

Train dataset size: 800
Validation dataset size: 200


In [5]:
class TransferSEResNet(nn.Module):
    def __init__(self, n_size, num_classes=100, reduction=16, weight_decay=1e-5):
        super(TransferSEResNet, self).__init__()

        # L2 regularization
        self.weight_decay = weight_decay

        # added resnet model
        resnet_weights = models.ResNet50_Weights.DEFAULT
        self.resnet = models.resnet50(weights=resnet_weights)

        # freeze everything except the last layer
        for param in self.resnet.parameters():
            param.requires_grad = False

        for param in self.resnet.layer4.parameters():
            param.requires_grad = True

        # Keep the original number of output classes in the ResNet's fc layer
        # We'll add a dropout layer before this fc layer
        self.num_classes = self.resnet.fc.in_features
        self.resnet.fc = nn.Identity()
        dropout_rate = 0.50

        self.fc1 = nn.Linear(self.num_classes, 1024)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.bn1 = nn.BatchNorm1d(1024)
        self.fc2 = nn.Linear(1024, 512)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.bn2 = nn.BatchNorm1d(512)
        self.fc_out = nn.Linear(512, num_classes)


    def forward(self, x):
        x = self.resnet(x)
        x = torch.relu(self.fc1(x))
        x = self.dropout1(x)
        x = self.bn1(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.bn2(x)
        x = self.fc_out(x)
        return x

    def l2_regularization_loss(self):
        l2_loss = 0.0
        for param in self.parameters():
            l2_loss += torch.norm(param, p=2)
        return self.weight_decay * l2_loss


In [6]:
def se_resnet(num_classes=100):
    model = TransferSEResNet(3, num_classes=num_classes, weight_decay=weight_decay)
    return model

In [7]:
model = se_resnet(num_classes=100)

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 148MB/s]


TransferSEResNet(
  (resnet): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
  

In [8]:
# Create and train the PyTorch model

def run_experiment(model, train_loader, val_loader, num_epochs=10, learning_rate=0.001, weight_decay=0.0001):
    # Initialize variables for early stopping
    best_val_loss = np.Inf
    patience = 5  # Number of epochs to wait before early stopping
    counter = 0  # Counter for tracking the number of epochs since the last improvement

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    model = model.to(device)
    criterion = criterion.to(device)

    for epoch in range(num_epochs):
        running_loss = 0.0
        train_correct, train_total = 0, 0
        model.train()
        for inputs, labels in tqdm(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Calculate L2 regularization loss
            l2_loss = model.l2_regularization_loss()

            loss += l2_loss

            loss.backward()
            _, predicted = torch.max(outputs.data, 1)
            optimizer.step()
            running_loss += loss.item()
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_accuracy = train_correct / train_total

        model.eval()
        val_running_loss = 0.0
        val_correct, val_total = 0, 0
        with torch.no_grad():
            for inputs, labels in tqdm(val_loader):
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                v_loss = criterion(outputs, labels)
                val_running_loss += v_loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss = val_running_loss / len(val_loader)
        val_accuracy = val_correct / val_total

        # Check for early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            counter = 0  # Reset counter since there's an improvement
            # Save the model if you want
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            counter += 1
            if counter >= patience:
                print("Early stopping...")
                break

        print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss}, Train Accuracy: {train_accuracy * 100:.2f}%')
        print(f'Epoch [{epoch + 1}/{num_epochs}], Val Loss: {val_loss}, Validation Accuracy: {val_accuracy * 100:.2f}%')

model = se_resnet(num_classes=100)
run_experiment(model, train_data_loader, val_data_loader, num_epochs=num_epochs, learning_rate=learning_rate, weight_decay=weight_decay)

 40%|████      | 10/25 [02:30<03:46, 15.08s/it]


KeyboardInterrupt: 

In [None]:
sample_submission = pd.read_csv("/content/drive/MyDrive/ucsc-cse-144-Winter-2024-final-project/sample_submission.csv")

index = train_dataset.dataset.class_to_idx
class_list = list(index.keys())
index_list = list(index.values())

# Iterate through image IDs
model.load_state_dict(torch.load('best_model.pth'))
model.eval()
for idx, row in tqdm(sample_submission.iterrows()):
    image_id = row['ID']

    # Load and preprocess the image
    image_path = os.path.join(test_dir, image_id)
    image = Image.open(image_path).convert('RGB')
    image = test_transform(image).unsqueeze(0).to(device)  # Add batch dimension

    # Perform inference
    with torch.no_grad():
        output = model(image)

    # Process the model output to get the predicted label
    predicted_label_int = output.argmax(dim=1).item()  # Assuming single label prediction
    pred = index_list.index(predicted_label_int)
    predicted_label_str = class_list[pred]

    # Update the label in the sample submission DataFrame
    sample_submission.at[idx, 'Label'] = predicted_label_str

# Save the updated sample submission CSV
sample_submission.to_csv('/content/drive/MyDrive/ucsc-cse-144-Winter-2024-final-project/submission.csv', index=False)