In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [20]:
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset, random_split, WeightedRandomSampler
from torchvision import datasets, models, transforms
import torch.optim as optim
import os

# Data augmentation and normalization for training
# Just normalization for validation
data_transforms = {
    'train': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
        transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip()
    ]),
    'validation': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip()
    ])
} # Simplified version: removed data augmentation and normalization

data_dir = os.path.join("drive", "MyDrive", "CVML_project")
class_counts = {}
images_datasets = {}
dataset_sizes = {}
for x in ['train', 'validation']:
    dataset_path = os.path.join(data_dir, x)
    class_counts[x] = [len(os.listdir(os.path.join(dataset_path, y))) for y in ["class0", "class1"]]
    image_datasets[x] = datasets.ImageFolder(dataset_path, data_transforms[x])
    dataset_sizes[x] = len(image_datasets[x])

# Calculate class frequencies and weights
weighted_samplers = {}
for x in ['train', 'validation']:
    total_samples = sum(class_counts[x])
    weight = [(1.0 / count) for count in class_counts[x]]
    print(f"{x} weights:", weight)
    weighted_samplers[x] = WeightedRandomSampler(weight, total_samples)

dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=64,
                                              sampler=weighted_samplers[x], num_workers=2) for x in ['train', 'validation']}

# Use random_split to create datasets for training and validation
# train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

train weights: [3.838918960420746e-05, 0.0020920502092050207]
validation weights: [0.0001529753709652746, 0.009433962264150943]
cuda:0


In [21]:
# Use a pre-trained ResNet18 model
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_classes = 2

for param in model.parameters():
    param.requires_grad = False

# Modify the final fully connected layer to match the number of classes
model.fc = nn.Linear(model.fc.in_features, num_classes)

model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [22]:
# from tempfile import TemporaryDirectory
import time

def train_model(model, criterion, optimizer, scheduler, num_epochs=3):
    since = time.time()

    # Create a temporary directory to save training checkpoints
    # with TemporaryDirectory() as tempdir:
        # best_model_params_path = os.path.join(tempdir, 'best_model_params.pt')

        # torch.save(model.state_dict(), best_model_params_path)
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode
                class_correct = [0.0, 0.0]  # Accumulate correct predictions for each class
                class_total = [0, 0]    # Accumulate total count of images for each class

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                    else:
                        # Calculate accuracy for each class
                        correct = (preds == labels).squeeze()
                        for i in range(len(labels)):
                            label = labels[i]
                            class_correct[label] += correct[i].item()
                            class_total[label] += 1

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'validation' and epoch_acc > best_acc:
                best_acc = epoch_acc
                # deep copy the model
                # torch.save(model.state_dict(), best_model_params_path)

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')
    # Print accuracy for each class
    for i in range(2):  # Assuming two classes (0 and 1)
        accuracy = class_correct[i] / class_total[i] if class_total[i] != 0 else 0
        print(f'Accuracy for Class {i}: {accuracy:.4f}')

    # load best model weights
    # model.load_state_dict(torch.load(best_model_params_path))
    return model

train_model(model, criterion, optimizer, scheduler, num_epochs=5)

Epoch 0/4
----------
train Loss: 0.0080 Acc: 0.9952
validation Loss: 0.0000 Acc: 1.0000

Epoch 1/4
----------
train Loss: 0.0000 Acc: 1.0000
validation Loss: 0.0000 Acc: 1.0000

Epoch 2/4
----------
train Loss: 0.0000 Acc: 1.0000
validation Loss: 0.0000 Acc: 1.0000

Epoch 3/4
----------
train Loss: 0.0000 Acc: 1.0000
validation Loss: 0.0000 Acc: 1.0000

Epoch 4/4
----------
train Loss: 0.0000 Acc: 1.0000
validation Loss: 0.0000 Acc: 1.0000

Training complete in 7m 23s
Best val Acc: 1.000000
Accuracy for Class 0: 1.0000
Accuracy for Class 1: 0.0000


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  