In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.tensorboard import SummaryWriter
import numpy as np
from tqdm import tqdm
from datetime import datetime
import random
import os
import time

In [2]:
# Set random seed for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

In [None]:
# CutMix function
# Used from the link given in the paper: https://github.com/clovaai/CutMix-PyTorch/blob/master/train.py
def cutmix(inputs, labels, alpha=1.0):
    """Applies CutMix augmentation."""
    lam = np.random.beta(alpha, alpha)
    batch_size = inputs.size(0)
    rand_index = torch.randperm(batch_size).to(inputs.device)
    shuffled_labels = labels[rand_index]
    
    bbx1, bby1, bbx2, bby2 = rand_bbox(inputs.size(), lam)
    inputs[:, :, bbx1:bbx2, bby1:bby2] = inputs[rand_index, :, bbx1:bbx2, bby1:bby2]
    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (inputs.size()[-1] * inputs.size()[-2]))
    
    return inputs, labels, shuffled_labels, lam

In [4]:
def rand_bbox(size, lam):
    """Generate a random bounding box for CutMix."""
    W = size[2]
    H = size[3]
    cut_rat = np.sqrt(1.0 - lam)
    cut_w = int(W * cut_rat)
    cut_h = int(H * cut_rat)

    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    return bbx1, bby1, bbx2, bby2

In [5]:
# Train function
def train_nn(model, train_loader, criterion, optimizer, device, cutmix_prob=0.5, cutmix_alpha=1.0):
    """Function to train the neural network for one epoch with optional CutMix."""
    model.train()
    running_loss = 0.0
    correct_labels = 0
    total_labels = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        
        if np.random.rand() < cutmix_prob:
            inputs, labels_a, labels_b, lam = cutmix(inputs, labels, alpha=cutmix_alpha)
            outputs = model(inputs)
            loss = lam * criterion(outputs, labels_a) + (1 - lam) * criterion(outputs, labels_b)
        else:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        predicted_labels = torch.argmax(outputs, dim=1)
        total_labels += labels.size(0)
        correct_labels += torch.sum(predicted_labels == labels).item()

    train_loss = running_loss / len(train_loader)
    train_accuracy = correct_labels / total_labels
    
    return train_loss, train_accuracy

In [6]:
# Evaluation function
def evaluate_nn(model, test_loader, criterion, device):
    """Function to evaluate the neural network on the test data."""
    model.eval()
    running_loss = 0.0
    correct_labels = 0
    total_labels = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            predicted_labels = torch.argmax(outputs, dim=1)
            total_labels += labels.size(0)
            correct_labels += torch.sum(predicted_labels == labels).item()

    test_loss = running_loss / len(test_loader)
    test_accuracy = correct_labels / total_labels
    return test_loss, test_accuracy

In [10]:
def prepare_model(model_name, num_classes, feature_extractor=False):
    """
    Prepare a model for training, either as a feature extractor or for fine-tuning.
    """
    # Load the pre-trained model
    if model_name == "resnet":
        model = models.resnet50(pretrained=True)
    elif model_name == "squeezenet":
        model = models.squeezenet1_1(pretrained=True)
    elif model_name == "convnext":
        model = models.convnext_base(pretrained=True)
    elif model_name == "vit":
        model = models.vit_b_16(pretrained=True)
    else:
        raise ValueError(f"Unsupported model name: {model_name}")
    
    # Modify the last layer for the dataset
    if model_name == "squeezenet":
        model.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=1)
    elif model_name == "convnext":
        model.classifier[2] = nn.Linear(model.classifier[2].in_features, num_classes)
    elif model_name == "vit":
        model.heads.head = nn.Linear(model.heads.head.in_features, num_classes)
    else:  # ResNet
        model.fc = nn.Linear(model.fc.in_features, num_classes)

    # If using as a fixed feature extractor, freeze all layers except the final layer
    if feature_extractor:
        for param in model.parameters():
            param.requires_grad = False
        
        # Enable gradients only for the last layer
        if model_name == "squeezenet":
            for param in model.classifier[1].parameters():
                param.requires_grad = True
        elif model_name == "convnext":
            for param in model.classifier[2].parameters():
                param.requires_grad = True
        elif model_name == "vit":
            for param in model.heads.head.parameters():
                param.requires_grad = True
        else:  # ResNet
            for param in model.fc.parameters():
                param.requires_grad = True

    return model

In [39]:
def run_training(
    model_name,
    train_loader,
    test_loader,
    lr,
    num_epochs,
    num_classes,
    cutmix_alpha=1.0,
    cutmix_prob=0.5,
    weight_decay=1e-4,
    log_dir="runs",
    save_path="saved_models"
):
    """
    Run training for a model.

    Args:
        model_name (str): Name of the model.
        train_loader: DataLoader for training data.
        test_loader: DataLoader for testing data.
        lr (float): Learning rate.
        num_epochs (int): Number of epochs.
        num_classes (int): Number of classes in the dataset.
        alpha (float): Mixup parameter.
        cutmix_prob (float): Probability of applying CutMix.
        weight_decay (float): Weight decay for optimizer.
        log_dir (str): Directory for TensorBoard logs.
        save_path (str): Directory to save the trained model.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Prepare the model
    model = prepare_model(model_name, num_classes)
    model = model.to(device)

    # Define optimizer and learning rate scheduler
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

    # Loss function
    criterion = nn.CrossEntropyLoss()


    # Training loop
    train_losses, train_accuracies = [], []
    test_losses, test_accuracies = [], []
    learning_rates = []

    # TensorBoard setup
    writer = SummaryWriter(log_dir=log_dir)

    for epoch in tqdm(range(num_epochs)):
        # Training
        train_loss, train_accuracy = train_nn(model, train_loader, criterion, optimizer, device, cutmix_alpha, cutmix_prob)
        # Evaluation
        test_loss, test_accuracy = evaluate_nn(model, test_loader, criterion, device)
        
        # Get current learning rate
        current_lr = optimizer.param_groups[0]['lr']
        learning_rates.append(current_lr)
        
        # Update scheduler
        scheduler.step()
        
        # Store metrics
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        test_losses.append(test_loss)
        test_accuracies.append(test_accuracy)

        # Logging to TensorBoard
        writer.add_scalar("Loss/Train", train_loss, epoch)
        writer.add_scalar("Loss/Test", test_loss, epoch)
        writer.add_scalar("Accuracy/Train", train_accuracy, epoch)
        writer.add_scalar("Accuracy/Test", test_accuracy, epoch)
        writer.add_scalar("Learning_Rate", current_lr, epoch)

        # Logging to console
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
        print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")
        print(f"Learning Rate: {current_lr:.6f}")



    # Save the model
    os.makedirs(save_path, exist_ok=True)
    model_save_path = os.path.join(save_path, f"{model_name}_final.pth")
    torch.save(model.state_dict(), model_save_path)
    print(f"Model saved to {model_save_path}")

    # Close TensorBoard writer
    writer.close()

    return model, train_losses, train_accuracies, test_losses, test_accuracies, learning_rates

In [51]:
def combine_training(
    model_name,
    train_loader,
    test_loader,
    lr,
    fixed_num_epochs,
    full_num_epochs,
    num_classes,
    freeze_feature_extractor=False,
    full_finetune=False,
    weight_decay=1e-4,
    cutmix_alpha=1.0,
    cutmix_prob=0.5,
    log_dir="runs",
    save_path="saved_models"
):
    """
    Combined training function for using the model as a fixed feature extractor
    and/or fully fine-tuning the model.

    Args:
        model_name (str): Name of the model.
        train_loader: DataLoader for training data.
        test_loader: DataLoader for testing data.
        lr (float): Learning rate.
        num_epochs (int): Number of epochs.
        num_classes (int): Number of classes in the dataset.
        freeze_feature_extractor (bool): Whether to freeze feature extractor layers.
        full_finetune (bool): Whether to fine-tune the entire model.
        weight_decay (float): Weight decay for optimizer.
        cutmix_alpha (float): Mixup parameter.
        cutmix_prob (float): Probability of applying CutMix.
        log_dir (str): Directory for TensorBoard logs.
        save_path (str): Directory to save the trained model.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Number of classes
    num_classes = len(train_loader.dataset.classes)

    # Prepare the model
    model = prepare_model(model_name, num_classes)

    # writer = SummaryWriter(log_dir)

    # If freeze_feature_extractor is enabled
    if freeze_feature_extractor:
        print(f"Using {model_name} as a fixed feature extractor.")
        for param in model.parameters():
            param.requires_grad = False

        # # Unfreeze the classifier layer
        # for param in model.fc.parameters():
        #     param.requires_grad = True

        num_logits = model.fc.in_features
        model.fc = nn.Linear(num_logits, num_classes)

        # Training the model as a fixed feature extractor
        # optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=lr, weight_decay=weight_decay)
        # scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

        print("Training with fixed feature extractor...")
        start_time = time.time()
        model, train_losses, train_accuracies, test_losses, test_accuracies, learning_rates = run_training(
            model_name=model_name,
            train_loader=train_loader,
            test_loader=test_loader,
            lr=lr,
            num_epochs=fixed_num_epochs,
            num_classes=num_classes,
            cutmix_alpha=cutmix_alpha,
            cutmix_prob=cutmix_prob,
            weight_decay=weight_decay,
            log_dir=f"{log_dir}/fixed_feature_extractor",
            save_path=f"{save_path}/fixed_feature_extractor",
        )
        elapsed_time = time.time() - start_time
        # writer.add_scalar("Training Time/Fixed Feature Extractor", elapsed_time)
        print(f"Fixed feature extractor training completed in {elapsed_time:.2f} seconds.")

    # If full_finetune is enabled
    if full_finetune:
        print(f"Fine-tuning the entire {model_name} model.")
        for param in model.parameters():
            param.requires_grad = True

        # Training the model with full fine-tuning
        # optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
        # scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

        print("Training with full fine-tuning...")
        start_time = time.time()
        model, train_losses, train_accuracies, test_losses, test_accuracies, learning_rates = run_training(
            model_name=model_name,
            train_loader=train_loader,
            test_loader=test_loader,
            lr=lr,
            num_epochs=full_num_epochs,
            num_classes=num_classes,
            cutmix_alpha=cutmix_alpha,
            cutmix_prob=cutmix_prob,
            weight_decay=weight_decay,
            log_dir=f"{log_dir}/full_finetune",
            save_path=f"{save_path}/full_finetune",
        )
        elapsed_time = time.time() - start_time
        # writer.add_scalar("Training Time/Full Fine-Tuning", elapsed_time)
        print(f"Full fine-tuning completed in {elapsed_time:.2f} seconds.")

    # Close TensorBoard writer
    # writer.close()

In [44]:
data_dir = '/home/ahmad/courses/cuda_lab/MA-INF-4308-Lab-Vision-Systems/Assignment-3/dataset'
# log_dir = './runs/human_robot_classifier_' + datetime.now().strftime('%Y%m%d_%H%M%S')

In [45]:
print(log_dir)

./runs/human_robot_classifier_20241118_032834


In [46]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
# Set random seed
set_seed()

In [29]:
# Define transformations
train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.RandomApply([
        transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1)
    ], p=0.8),
    # AutoAugment(AutoAugmentPolicy.IMAGENET),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [30]:
# Load datasets
train_dataset = datasets.ImageFolder(os.path.join(data_dir, 'train'), train_transforms)
val_dataset = datasets.ImageFolder(os.path.join(data_dir, 'val'), val_transforms)

In [31]:
train_dataset.classes, val_dataset.classes

(['person', 'robot'], ['person', 'robot'])

In [32]:
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)

In [None]:
combine_training(
    model_name="resnet",
    train_loader=train_loader,
    test_loader=val_loader,
    lr=0.001,
    fixed_num_epochs=5,
    full_num_epochs=10,
    num_classes=2,
    freeze_feature_extractor=True,
    full_finetune=False,
    log_dir="logs",
    save_path="models"
)



Using resnet as a fixed feature extractor.
Training with fixed feature extractor...


 20%|█████████                                    | 1/5 [01:21<05:27, 81.98s/it]

Epoch 1/5
Train Loss: 0.7334, Train Accuracy: 0.6912
Test Loss: 0.7313, Test Accuracy: 0.9000
Learning Rate: 0.001000
