<a href="https://colab.research.google.com/github/ecflorui/genesys-lab/blob/main/Resnet101-Training_Experiment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import glob
import random
import numpy as np
from PIL import Image
from tqdm import tqdm
from collections import defaultdict

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms, models
from sklearn.model_selection import train_test_split
from timm.data import Mixup  # for mixup regularization

# === Paths and Constants ===
v5_path = "/content/drive/MyDrive/v5"
v6_path = "/content/drive/MyDrive/v6 (1)"
image_ext = '.jpg'
save_path = 'best_model_resnet101.pth'
target_class_size = 400
batch_size = 32
epochs = 30
patience = 5

# === Dataset with Class Balancing ===
class BalancedImageLabelDataset(Dataset):
    def __init__(self, root_dirs, transform=None, target_size=400):
        self.samples = []
        self.labels = []
        self.transform = transform
        class_dict = defaultdict(list)

        for root in root_dirs:
            for image_file in glob.glob(os.path.join(root, f'*{image_ext}')):
                npy_file = image_file.replace(image_ext, '.npy')
                if os.path.exists(npy_file):
                    label_vec = np.load(npy_file)
                    label = int(np.argmax(label_vec))
                    class_dict[label].append((image_file, label))

        for label, items in class_dict.items():
            if len(items) >= target_size:
                chosen = items[:target_size]
            else:
                extra = random.choices(items, k=target_size - len(items))
                chosen = items + extra
            self.samples.extend(chosen)
            self.labels.extend([label] * target_size)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

# === Transforms ===
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.RandomGrayscale(p=0.1),
    transforms.GaussianBlur(3),
    transforms.RandomErasing(p=0.25, scale=(0.02, 0.2), ratio=(0.3, 3.3), value='random'),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])

# === Dataset Split ===
full_dataset = BalancedImageLabelDataset([v5_path, v6_path])
labels = full_dataset.labels
indices = list(range(len(full_dataset)))

train_indices, val_indices = train_test_split(
    indices, test_size=0.2, stratify=labels, random_state=42
)

train_ds = Subset(full_dataset, train_indices)
val_ds = Subset(full_dataset, val_indices)

full_dataset.transform = None
train_ds.dataset.transform = train_transform
val_ds.dataset.transform = val_transform

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)

# === Model ===
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
base_model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT)

# Check input features to final layer:
num_features = base_model.fc.in_features
print(f"Final FC layer input features: {num_features}")

# Replace final fc with custom head:
base_model.fc = nn.Sequential(
    nn.Dropout(0.5),
    nn.Linear(num_features, 256),
    nn.ReLU(inplace=True),
    nn.BatchNorm1d(256),
    nn.Linear(256, 4)  # 4 classes
)
model = base_model.to(device)

# === Loss, Optimizer, Scheduler ===
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

# === Mixup ===
mixup_fn = Mixup(
    mixup_alpha=0.2, cutmix_alpha=1.0,
    prob=1.0, switch_prob=0.5, mode='batch',
    label_smoothing=0.1, num_classes=4
)

# === Evaluation Function ===
def evaluate(loader):
    model.eval()
    correct, total, loss_sum = 0, 0, 0
    with torch.no_grad():
        for imgs, labels in loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss_sum += loss.item() * imgs.size(0)
            preds = torch.argmax(outputs, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    return loss_sum / total, correct / total

# === Training Loop with AMP, Mixup, and Early Stopping ===
best_val_acc = 0.0
patience_counter = 0
scaler = torch.cuda.amp.GradScaler()

for epoch in range(epochs):
    model.train()
    loop = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}')
    for imgs, labels in loop:
        imgs, labels = imgs.to(device), labels.to(device)

        # Apply mixup
        if mixup_fn is not None:
            imgs, labels = mixup_fn(imgs, labels)

        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            outputs = model(imgs)
            loss = criterion(outputs, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

    train_loss, train_acc = evaluate(train_loader)
    val_loss, val_acc = evaluate(val_loader)
    scheduler.step()

    print(f"Epoch {epoch+1}: Train Acc={train_acc:.4f}, Val Acc={val_acc:.4f}, Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
        torch.save(model.state_dict(), save_path)
        print("Saved best model.")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"Early stopping triggered at epoch {epoch+1}.")
            break


  scaler = torch.cuda.amp.GradScaler()


Final FC layer input features: 2048


  with torch.cuda.amp.autocast():
Epoch 1/30: 100%|██████████| 40/40 [03:46<00:00,  5.67s/it]


Epoch 1: Train Acc=0.7453, Val Acc=0.6844, Train Loss=0.9689, Val Loss=1.0353
Saved best model.


Epoch 2/30: 100%|██████████| 40/40 [03:46<00:00,  5.67s/it]


Epoch 2: Train Acc=0.8906, Val Acc=0.8156, Train Loss=0.6443, Val Loss=0.7497
Saved best model.


Epoch 3/30: 100%|██████████| 40/40 [03:49<00:00,  5.74s/it]


Epoch 3: Train Acc=0.9453, Val Acc=0.8562, Train Loss=0.5514, Val Loss=0.6785
Saved best model.


Epoch 4/30: 100%|██████████| 40/40 [03:52<00:00,  5.82s/it]


Epoch 4: Train Acc=0.9477, Val Acc=0.8500, Train Loss=0.5084, Val Loss=0.6770


Epoch 5/30: 100%|██████████| 40/40 [03:53<00:00,  5.84s/it]


Epoch 5: Train Acc=0.9609, Val Acc=0.8844, Train Loss=0.4747, Val Loss=0.5943
Saved best model.


Epoch 6/30: 100%|██████████| 40/40 [03:50<00:00,  5.77s/it]


Epoch 6: Train Acc=0.9805, Val Acc=0.8938, Train Loss=0.4284, Val Loss=0.5610
Saved best model.


Epoch 7/30: 100%|██████████| 40/40 [03:51<00:00,  5.78s/it]


Epoch 7: Train Acc=0.9914, Val Acc=0.9219, Train Loss=0.3992, Val Loss=0.5388
Saved best model.


Epoch 8/30: 100%|██████████| 40/40 [03:49<00:00,  5.74s/it]


Epoch 8: Train Acc=0.9945, Val Acc=0.9094, Train Loss=0.3935, Val Loss=0.5380


Epoch 9/30: 100%|██████████| 40/40 [03:49<00:00,  5.73s/it]


Epoch 9: Train Acc=0.9992, Val Acc=0.9125, Train Loss=0.3785, Val Loss=0.5334


Epoch 10/30: 100%|██████████| 40/40 [03:49<00:00,  5.74s/it]


Epoch 10: Train Acc=0.9922, Val Acc=0.9281, Train Loss=0.3946, Val Loss=0.5268
Saved best model.


Epoch 11/30: 100%|██████████| 40/40 [03:48<00:00,  5.71s/it]


KeyboardInterrupt: 