1) Load Google Colab, Mount the Directory.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


2) Inspect the Source - Number of Images Per Class, Sizes of Images Per Class, Channels of Images Per Class, and Total Number of Images.

In [None]:
import os
from PIL import Image
from collections import Counter, defaultdict

# ── Configuration ──────────────────────────────────────────────────────────
src_dir = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Source - MedMNIST Labelled/Images"
# ── End configuration ──────────────────────────────────────────────────────

def inspect_directory(path):
    """
    Walk each class subfolder under `path`, count images,
    record channel modes and resolutions, then print a summary.
    """
    class_counts = Counter()
    class_modes  = defaultdict(Counter)
    class_res    = defaultdict(Counter)

    for cls in sorted(os.listdir(path)):
        cls_path = os.path.join(path, cls)
        if not os.path.isdir(cls_path):
            continue
        for fname in os.listdir(cls_path):
            if not fname.lower().endswith(('.png','.jpg','.jpeg','.bmp','.tiff')):
                continue
            class_counts[cls] += 1
            img_path = os.path.join(cls_path, fname)
            try:
                with Image.open(img_path) as img:
                    class_modes[cls][img.mode] += 1
                    class_res[cls][img.size] += 1
            except Exception as e:
                print(f"❌ Error opening {img_path}: {e}")

    total = sum(class_counts.values())
    print(f"\nInspection of `{path}`")
    print(f"{'Class':<12}  Images  Modes               Top Resolutions")
    print("-"*60)
    for cls, cnt in class_counts.items():
        modes_s = ", ".join(f"{m}:{n}" for m,n in class_modes[cls].items())
        top3    = class_res[cls].most_common(3)
        res_s   = ", ".join(f"{w}×{h}:{n}" for (w,h),n in top3)
        print(f"{cls:<12}  {cnt:<6}  {modes_s:<18}  {res_s}")
    print(f"\n→ Total images: {total}\n")

# Run the inspection
inspect_directory(src_dir)


Inspection of `/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Source - MedMNIST Labelled/Images`
Class         Images  Modes               Top Resolutions
------------------------------------------------------------
AbdomenCT     130     RGB:130             224×224:130
BreastMRI     130     RGB:130             224×224:130
CXR           130     RGB:130             224×224:130
ChestCT       130     RGB:130             224×224:130
HandXR        130     RGB:130             224×224:130
HeadCT        130     RGB:130             224×224:130

→ Total images: 780



3) Inspect the Target (unlabelled) dataset if it has 780 images, 224x224 size, and 3 channels.

In [None]:
import os
from PIL import Image
from collections import Counter

# ── Configuration ──────────────────────────────────────────────────────────
unlabelled_dir = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Target - VS Unlabelled/Images"
# ── End configuration ──────────────────────────────────────────────────────

# Counters for total, modes, and resolutions
total_images = 0
mode_counts  = Counter()
size_counts  = Counter()

# Iterate through all images
for fname in os.listdir(unlabelled_dir):
    if not fname.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
        continue
    total_images += 1
    img_path = os.path.join(unlabelled_dir, fname)
    try:
        with Image.open(img_path) as img:
            mode_counts[img.mode] += 1
            size_counts[img.size] += 1
    except Exception as e:
        print(f"❌ Error opening {img_path}: {e}")

# Display results
print(f"\nInspection of unlabelled target directory: `{unlabelled_dir}`")
print(f"→ Total images: {total_images}\n")

print("Channel modes:")
for mode, cnt in mode_counts.items():
    print(f"  {mode}: {cnt}")

print("\nImage resolutions:")
for (w, h), cnt in size_counts.most_common():
    print(f"  {w}×{h}: {cnt}")


Inspection of unlabelled target directory: `/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Target - VS Unlabelled/Images`
→ Total images: 780

Channel modes:
  RGB: 780

Image resolutions:
  224×224: 780


4) Let us Normalize the source.

In [None]:
import os
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from tqdm import tqdm

# ── Configuration ────────────────────────────────────────────────────────────
data_dir = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Source - MedMNIST Labelled/Images"
batch_size = 64   # for batch-level stats
num_workers = 4
# ── End configuration ─────────────────────────────────────────────────────────

# 1) Pre-normalization: load as Tensor [0,1] but no Normalize
pre_transform = transforms.Compose([
    transforms.ToTensor()   # converts to [C,H,W] in [0.0,1.0]
])
pre_ds = datasets.ImageFolder(data_dir, transform=pre_transform)
pre_loader = DataLoader(pre_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers)

# Compute channel sums & squared sums
n_channels = 3
cnt = 0
sum_ = torch.zeros(n_channels)
sum_sq = torch.zeros(n_channels)

for imgs, _ in tqdm(pre_loader, desc="Pre-norm stats"):
    # imgs shape: [B, C, H, W]
    b, c, h, w = imgs.shape
    cnt += b * h * w
    sum_ += imgs.sum(dim=[0,2,3])
    sum_sq += (imgs ** 2).sum(dim=[0,2,3])

mean_pre = sum_ / cnt
var_pre = (sum_sq / cnt) - (mean_pre ** 2)
std_pre = torch.sqrt(var_pre)

print("Pre-normalization mean:", mean_pre)
print("Pre-normalization std: ", std_pre)


# 2) One-batch post-normalization: take first batch with Normalize()
normalize = transforms.Normalize(mean=mean_pre.tolist(), std=std_pre.tolist())
batch_transform = transforms.Compose([
    transforms.ToTensor(),
    normalize
])
batch_ds = datasets.ImageFolder(data_dir, transform=batch_transform)
batch_loader = DataLoader(batch_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers)

# Get one batch
imgs_batch, _ = next(iter(batch_loader))  # [B, C, H, W]
mean_batch = imgs_batch.mean(dim=[0,2,3])
std_batch  = imgs_batch.std(dim=[0,2,3])

print("\nOne-batch post-norm mean:", mean_batch)
print("One-batch post-norm std: ", std_batch)


# 3) Full-dataset post-normalization: entire loader with Normalize
post_loader = DataLoader(batch_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers)

cnt2 = 0
sum2 = torch.zeros(n_channels)
sum2_sq = torch.zeros(n_channels)

for imgs, _ in tqdm(post_loader, desc="Post-norm full stats"):
    b, c, h, w = imgs.shape
    cnt2 += b * h * w
    sum2 += imgs.sum(dim=[0,2,3])
    sum2_sq += (imgs ** 2).sum(dim=[0,2,3])

mean_post = sum2 / cnt2
var_post = (sum2_sq / cnt2) - (mean_post ** 2)
std_post = torch.sqrt(var_post)

print("\nPost-normalization mean:", mean_post)
print("Post-normalization std: ", std_post)

Pre-norm stats: 100%|██████████| 13/13 [00:05<00:00,  2.32it/s]


Pre-normalization mean: tensor([0.3542, 0.3542, 0.3542])
Pre-normalization std:  tensor([0.2794, 0.2794, 0.2794])

One-batch post-norm mean: tensor([0.3144, 0.3144, 0.3144])
One-batch post-norm std:  tensor([0.3413, 0.3413, 0.3413])


Post-norm full stats: 100%|██████████| 13/13 [00:04<00:00,  2.78it/s]


Post-normalization mean: tensor([3.1939e-08, 3.1939e-08, 3.1939e-08])
Post-normalization std:  tensor([1.0000, 1.0000, 1.0000])





5) Let us build the VGG-19 architecture.
Inspired from : https://arxiv.org/abs/1409.1556

In [None]:
import torch
import torch.nn as nn

# Classic VGG-19 "E" configuration: 16 conv layers + 5 max pooling
cfg_vgg19 = [
    64, 64, "M",
    128, 128, "M",
    256, 256, 256, 256, "M",
    512, 512, 512, 512, "M",
    512, 512, 512, 512, "M"
]

def make_layers(cfg):
    layers = []
    in_channels = 3
    for v in cfg:
        if v == "M":
            layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
        else:
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = v
    return nn.Sequential(*layers)

class VGG19(nn.Module):
    def __init__(self, num_classes=6, init_weights=True):
        super().__init__()
        self.features = make_layers(cfg_vgg19)
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        self.classifier = nn.Sequential(
            nn.Linear(512*7*7, 4096),
            nn.ReLU(True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(0.5),
            nn.Linear(4096, num_classes)
        )
        if init_weights:
            self._initialize_weights()
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

# Instantiate the model for 6 classes
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = VGG19(num_classes=6).to(device)

# Visualize model summary (install torchinfo if needed)
try:
    from torchinfo import summary
except ImportError:
    import sys
    # Install torchinfo using pip for the current python executable
    !{sys.executable} -m pip install torchinfo
    from torchinfo import summary

summary(model, input_size=(1, 3, 224, 224))

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


Layer (type:depth-idx)                   Output Shape              Param #
VGG19                                    [1, 6]                    --
├─Sequential: 1-1                        [1, 512, 7, 7]            --
│    └─Conv2d: 2-1                       [1, 64, 224, 224]         1,792
│    └─ReLU: 2-2                         [1, 64, 224, 224]         --
│    └─Conv2d: 2-3                       [1, 64, 224, 224]         36,928
│    └─ReLU: 2-4                         [1, 64, 224, 224]         --
│    └─MaxPool2d: 2-5                    [1, 64, 112, 112]         --
│    └─Conv2d: 2-6                       [1, 128, 112, 112]        73,856
│    └─ReLU: 2-7                         [1, 128, 112, 112]        --
│    └─Conv2d: 2-8                       [1, 128, 112, 112]        147,584
│    └─ReLU: 2-9                         [1, 128, 112, 112]        --
│    └─MaxPool2d: 2-10                   [1, 128, 56, 56]          --
│    └─Conv2d: 2-11                      [1, 256, 56, 56]          29

6) Let us prepare the data loaders for split.

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

import torch
import os
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Subset
from torch.utils.data.dataset import random_split

# Paths
source_dir = '/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Source - MedMNIST Labelled/Images'

# Precomputed normalization stats
mean = [0.3542, 0.3542, 0.3542]
std  = [0.2794, 0.2794, 0.2794]

# Transforms
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(8),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])
val_test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

# 1. Load base dataset (no transform yet)
base_ds = ImageFolder(source_dir, transform=None)

# 2. Compute split sizes and indices
n_total = len(base_ds)
n_train = int(0.75 * n_total)
n_val   = int(0.10 * n_total)
n_test  = n_total - n_train - n_val

# Use random_split for reproducible indices
train_ds_idx, val_ds_idx, test_ds_idx = random_split(
    list(range(n_total)),
    [n_train, n_val, n_test],
    generator=torch.Generator().manual_seed(1906525)
)

# 3. Create Subsets with appropriate transforms
train_ds = Subset(
    ImageFolder(source_dir, transform=train_transform), train_ds_idx
)
val_ds   = Subset(
    ImageFolder(source_dir, transform=val_test_transform), val_ds_idx
)
test_ds  = Subset(
    ImageFolder(source_dir, transform=val_test_transform), test_ds_idx
)

# 4. DataLoaders
batch_size  = 32
num_workers = 4

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  num_workers=num_workers)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, num_workers=num_workers)

# 5. Sanity check
print("Classes:", base_ds.classes)
print(f"Train size: {len(train_ds)}, Val size: {len(val_ds)}, Test size: {len(test_ds)}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Classes: ['AbdomenCT', 'BreastMRI', 'CXR', 'ChestCT', 'HandXR', 'HeadCT']
Train size: 585, Val size: 78, Test size: 117


7) Let us start modelling for the source MedMNIST dataset samples with VGG-19 and save the checkpoints.

In [None]:
# ─── 0. Mount & Imports ─────────────────────────────────────────────────────
from google.colab import drive
drive.mount('/content/drive')

import os, random, numpy as np
import torch, torch.nn as nn, torch.backends.cudnn as cudnn
import pandas as pd, seaborn as sns, matplotlib.pyplot as plt

from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Subset
from torch.utils.data.dataset import random_split
from torch.optim import SGD, Adam, RMSprop, Adagrad, AdamW
import torch.optim.lr_scheduler as lr_scheduler

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score,
    f1_score, roc_auc_score, mean_squared_error,
    confusion_matrix
)

# ─── 1. Determinism ─────────────────────────────────────────────────────────
SEED = 1906525
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
cudnn.deterministic = True
cudnn.benchmark = False

# ─── 2. Paths & Hyperparams ─────────────────────────────────────────────────
SRC_DIR = '/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Source - MedMNIST Labelled/Images'

CKPT_DIR    = '/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Source- Training/Checkpoints'
CM_DIR      = '/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Source- Training/Confusion Matrices'
METRICS_DIR = '/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Source- Training/Performance Metrics'
os.makedirs(CKPT_DIR, exist_ok=True)
os.makedirs(CM_DIR, exist_ok=True)
os.makedirs(METRICS_DIR, exist_ok=True)

MEAN = [0.3542, 0.3542, 0.3542]
STD  = [0.2794, 0.2794, 0.2794]

train_tf = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(8),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD)
])
val_tf = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD)
])

OPTIMIZERS   = [SGD, Adam, RMSprop, Adagrad, AdamW]
BATCH_SIZE   = 32
EPOCHS_LIST  = [5, 10, 20, 40, 80]
LR           = 1e-3
DEVICE       = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ─── 3. Prepare DataLoaders ─────────────────────────────────────────────────
base_ds = ImageFolder(SRC_DIR, transform=None)
n = len(base_ds)
n_train = int(0.75 * n)
n_val   = int(0.10 * n)
n_test  = n - n_train - n_val

idx_train, idx_val, idx_test = random_split(
    list(range(n)), [n_train, n_val, n_test],
    generator=torch.Generator().manual_seed(SEED)
)

train_ds = Subset(ImageFolder(SRC_DIR, transform=train_tf), idx_train)
val_ds   = Subset(ImageFolder(SRC_DIR, transform=val_tf),   idx_val)
test_ds  = Subset(ImageFolder(SRC_DIR, transform=val_tf),   idx_test)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=4)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

# ─── 4. VGG-19 Definition ────────────────────────────────────────────────────
cfg = [64,64,'M',128,128,'M',256,256,256,256,'M',512,512,512,512,'M',512,512,512,512,'M']
def make_layers(cfg):
    layers, in_c = [], 3
    for v in cfg:
        if v == 'M':
            layers.append(nn.MaxPool2d(2,2))
        else:
            layers += [nn.Conv2d(in_c, v, 3, padding=1), nn.ReLU(True)]
            in_c = v
    return nn.Sequential(*layers)

class VGG19(nn.Module):
    def __init__(self, num_classes=6):
        super().__init__()
        self.features = make_layers(cfg)
        self.avgpool  = nn.AdaptiveAvgPool2d((7,7))
        self.classifier = nn.Sequential(
            nn.Linear(512*7*7, 4096), nn.ReLU(True), nn.Dropout(0.5),
            nn.Linear(4096, 4096),    nn.ReLU(True), nn.Dropout(0.5),
            nn.Linear(4096, num_classes)
        )
        self._init_weights()
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        return self.classifier(x)
    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None: nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01); nn.init.constant_(m.bias, 0)

def evaluate_split(model, loader, criterion=None, plot_cm=False, cm_name=None):
    model.eval()
    all_preds, all_labels, all_probs = [], [], []
    total_loss, total_samples = 0.0, 0
    with torch.no_grad():
        for x,y in loader:
            x, y = x.to(DEVICE), y.to(DEVICE)
            logits = model(x)
            if criterion:
                total_loss += criterion(logits, y).item() * x.size(0)
            probs = torch.softmax(logits,1).cpu().numpy()
            preds = logits.argmax(1).cpu().numpy()
            all_probs.extend(probs)
            all_preds.extend(preds)
            all_labels.extend(y.cpu().numpy())
            total_samples += x.size(0)

    acc = accuracy_score(all_labels, all_preds)
    prec = precision_score(all_labels, all_preds, average='macro', zero_division=0)
    rec = recall_score(all_labels, all_preds, average='macro', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)
    try:
        auc = roc_auc_score(all_labels, all_probs, multi_class='ovo')
    except:
        auc = float('nan')
    mse = mean_squared_error(all_labels, all_preds)

    cm = confusion_matrix(all_labels, all_preds, labels=list(range(len(loader.dataset.dataset.classes))))
    total = cm.sum()
    specs = []
    for i in range(cm.shape[0]):
        tp = cm[i,i]
        fn = cm[i,:].sum() - tp
        fp = cm[:,i].sum() - tp
        tn = total - tp - fp - fn
        specs.append(tn/(tn+fp) if (tn+fp) > 0 else 0.)
    spec = float(np.mean(specs))

    if plot_cm and cm_name:
        plt.figure(figsize=(8,6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                    xticklabels=loader.dataset.dataset.classes,
                    yticklabels=loader.dataset.dataset.classes)
        plt.xlabel('Predicted'); plt.ylabel('Actual')
        plt.tight_layout()
        plt.savefig(os.path.join(CM_DIR, cm_name))
        plt.close()

    loss = total_loss / total_samples if criterion else None
    return acc, prec, rec, spec, f1, auc, mse, loss

# ─── 5. Training & Logging ─────────────────────────────────────────────────
master = []

for opt_cls in OPTIMIZERS:
    for num_epochs in EPOCHS_LIST:
        print(f"\n▶ Run: {opt_cls.__name__}_ep{num_epochs}")
        model = VGG19(num_classes=len(base_ds.classes)).to(DEVICE)
        criterion = nn.CrossEntropyLoss()
        optimizer = opt_cls(model.parameters(), lr=LR)
        scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.1, patience=5)

        def get_grad_norm():
            model.eval(); model.zero_grad()
            x,y = next(iter(train_loader))
            loss = criterion(model(x.to(DEVICE)), y.to(DEVICE))
            loss.backward()
            return sum(p.grad.norm().item()**2 for p in model.parameters() if p.grad is not None)**0.5

        grad_before = get_grad_norm()

        for epoch in range(1, num_epochs+1):
            model.train()
            for x,y in train_loader:
                x,y = x.to(DEVICE), y.to(DEVICE)
                optimizer.zero_grad()
                loss = criterion(model(x), y)
                loss.backward()
                optimizer.step()

            _,_,_,_,_,_,_, val_loss = evaluate_split(model, val_loader, criterion)
            scheduler.step(val_loss)

            if epoch == 40:
                evaluate_split(model, test_loader, plot_cm=True, cm_name=f"{opt_cls.__name__}_ep40.png")

        train_acc, train_prec, train_rec, spec, f1, auc, train_mse, _ = evaluate_split(model, train_loader)
        val_acc, val_prec, val_rec, val_spec, val_f1, val_auc, val_mse, _ = evaluate_split(model, val_loader)
        test_acc, test_prec, test_rec, test_spec, test_f1, test_auc, test_mse, _ = evaluate_split(model, test_loader)
        grad_after = get_grad_norm()

        print(f"Completed {opt_cls.__name__} for {num_epochs} epochs → Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}")

        torch.save(model.state_dict(), os.path.join(CKPT_DIR, f"{opt_cls.__name__}_ep{num_epochs}.pth"))

        master.append({
            'Optimizer': opt_cls.__name__, 'Epochs': num_epochs,
            'train_acc': train_acc, 'val_acc': val_acc, 'test_acc': test_acc,
            'train_mse': train_mse, 'val_mse': val_mse, 'test_mse': test_mse,
            'precision': test_prec, 'sensitivity': test_rec,
            'specificity': test_spec, 'f1': test_f1, 'auc': test_auc,
            'grad_before': grad_before, 'grad_after': grad_after
        })

metrics_file = os.path.join(METRICS_DIR, 'VGG_19_sample_source.xlsx')
pd.DataFrame(master).to_excel(metrics_file, index=False)
print("Training completed, saved the results for MedMNIST (samples) with VGG‑19 model.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

▶ Run: SGD_ep5
Completed SGD for 5 epochs → Train Acc: 0.4957, Test Acc: 0.4615

▶ Run: SGD_ep10
Completed SGD for 10 epochs → Train Acc: 0.8291, Test Acc: 0.8034

▶ Run: SGD_ep20
Completed SGD for 20 epochs → Train Acc: 0.9556, Test Acc: 0.9402

▶ Run: SGD_ep40
Completed SGD for 40 epochs → Train Acc: 0.8427, Test Acc: 0.8462

▶ Run: SGD_ep80
Completed SGD for 80 epochs → Train Acc: 0.9795, Test Acc: 0.9658

▶ Run: Adam_ep5
Completed Adam for 5 epochs → Train Acc: 0.8427, Test Acc: 0.8034

▶ Run: Adam_ep10
Completed Adam for 10 epochs → Train Acc: 0.1658, Test Acc: 0.1538

▶ Run: Adam_ep20
Completed Adam for 20 epochs → Train Acc: 0.9880, Test Acc: 0.9658

▶ Run: Adam_ep40
Completed Adam for 40 epochs → Train Acc: 0.1692, Test Acc: 0.1709

▶ Run: Adam_ep80
Completed Adam for 80 epochs → Train Acc: 0.8274, Test Acc: 0.8120

▶ Run: RMSprop_ep5
Completed RMSpr

8) Let us inspect the target (unlabelled) dataset's channels and image sizes.

In [None]:
import os
from pathlib import Path
from collections import Counter
from PIL import Image

TARGET_DIR = Path("/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Target - VS Unlabelled/Images")

IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
files = [f for f in TARGET_DIR.rglob("*") if f.suffix.lower() in IMAGE_EXTS and f.is_file()]

print(f"Total image files: {len(files)}")

mode_counter = Counter()
size_counter = Counter()
dimensions = []

for f in files:
    try:
        with Image.open(f) as img:
            mode = img.mode  # e.g., 'RGB', 'L', etc.
            width, height = img.size
            mode_counter[mode] += 1
            size_counter[(width, height)] += 1
            dimensions.append((width, height))
    except Exception as e:
        print(f"Error with {f}: {e}")

print("\nImage modes (counts):")
for mode, count in mode_counter.items():
    print(f"  Mode: {mode}, Count: {count}")

print("\nUnique image dimensions (width × height) and their counts:")
for (w, h), count in size_counter.most_common():
    print(f"  {w}×{h}: {count} images")

# Function to get channels from mode
def channels_from_mode(mode):
    # As noted on StackOverflow, image.getbands() gives accurate channel count
    return len(Image.new(mode, (1,1)).getbands())

print("\nChannels per mode:")
for mode in mode_counter:
    print(f"  Mode: {mode}, Channels: {channels_from_mode(mode)}")

# Optional: Compute min/max/avg dimensions
# if dimensions:
#     ws = [w for w, _ in dimensions]
#     hs = [h for _, h in dimensions]
#     print(f"\nWidth — min: {min(ws)}, max: {max(ws)}, avg: {sum(ws)/len(ws):.2f}")
#     print(f"Height — min: {min(hs)}, max: {max(hs)}, avg: {sum(hs)/len(hs):.2f}")

Total image files: 780

Image modes (counts):
  Mode: RGB, Count: 780

Unique image dimensions (width × height) and their counts:
  224×224: 780 images

Channels per mode:
  Mode: RGB, Channels: 3


9) Let us do normalization for the target (unlabelled) dataset.

In [None]:
import os
from pathlib import Path
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

class UnlabeledImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = Path(root_dir)
        self.transform = transform
        self.image_paths = [p for p in self.root_dir.iterdir() if p.suffix.lower() in {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = Image.open(self.image_paths[idx]).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img

# 1. Define paths
root = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Target - VS Unlabelled/Images"

# 2. Base loader to compute pre-normalization mean & std
base_ds = UnlabeledImageDataset(root, transform=transforms.ToTensor())
base_loader = DataLoader(base_ds, batch_size=64, shuffle=False, num_workers=2)

sum_ = torch.zeros(3)
sum_sq = torch.zeros(3)
total_pixels = 0

for imgs in base_loader:
    batch_pixels = imgs.size(0) * imgs.size(2) * imgs.size(3)
    total_pixels += batch_pixels
    sum_ += imgs.sum(dim=[0, 2, 3])
    sum_sq += (imgs ** 2).sum(dim=[0, 2, 3])

mean_pre = sum_ / total_pixels
std_pre = torch.sqrt(sum_sq / total_pixels - mean_pre**2)

print("Pre-normalization mean:", mean_pre)
print("Pre-normalization std :", std_pre)

# 3. Transformation including normalization
norm_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean_pre.tolist(), std_pre.tolist())
])

# 4. Loader for post-normalization checks
norm_ds = UnlabeledImageDataset(root, transform=norm_transform)
norm_loader = DataLoader(norm_ds, batch_size=64, shuffle=False, num_workers=2)

# 5. One-batch check
imgs = next(iter(norm_loader))
print("One-batch post-normalization mean:", imgs.mean(dim=[0, 2, 3]))
print("One-batch post-normalization std :", imgs.std(dim=[0, 2, 3]))

# 6. Full dataset post-normalization stats
sum_norm = torch.zeros(3)
sum_norm_sq = torch.zeros(3)
total_pixels = 0

for imgs in norm_loader:
    batch_pixels = imgs.size(0) * imgs.size(2) * imgs.size(3)
    total_pixels += batch_pixels
    sum_norm += imgs.sum(dim=[0, 2, 3])
    sum_norm_sq += (imgs ** 2).sum(dim=[0, 2, 3])

mean_post = sum_norm / total_pixels
std_post = torch.sqrt(sum_norm_sq / total_pixels - mean_post**2)

print("Post-normalization mean (whole dataset):", mean_post)
print("Post-normalization std  (whole dataset):", std_post)

Pre-normalization mean: tensor([0.2554, 0.2554, 0.2554])
Pre-normalization std : tensor([0.2900, 0.2900, 0.2900])
One-batch post-normalization mean: tensor([-0.4267, -0.4267, -0.4267])
One-batch post-normalization std : tensor([0.8331, 0.8331, 0.8331])
Post-normalization mean (whole dataset): tensor([-5.6691e-08, -5.6691e-08, -5.6691e-08])
Post-normalization std  (whole dataset): tensor([1.0000, 1.0000, 1.0000])


10) Let us do Domain Adaptation with DANN using VGG-19's feature extractor for 10 epochs in training.

Inspired by: https://jmlr.org/papers/v17/15-239.html

In [None]:
import os, random, numpy as np, pandas as pd
from PIL import Image
from tqdm import tqdm

import torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, datasets

import matplotlib.pyplot as plt, seaborn as sns
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, roc_auc_score

# ─── Reproducibility ──────────────────────────────────────
SEED = 1906525
os.environ["PYTHONHASHSEED"] = str(SEED)
random.seed(SEED); np.random.seed(SEED)
torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ─── Paths ───────────────────────────────
SRC_DIR = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Source - MedMNIST Labelled/Images"
CKPT_DIR = "/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/Top-5 Souce Checkpoints"
TGT_IMG_ROOT = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Target - VS Unlabelled/Images"
TGT_CSV = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Target - VS Unlabelled/Target sample labels.csv"

CONF_DIR = "/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/VGG_19-DANN/Confusion Matrices"
METRICS_DIR = "/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/VGG_19-DANN/Performance Metrics"
os.makedirs(CONF_DIR, exist_ok=True); os.makedirs(METRICS_DIR, exist_ok=True)

# ─── Normalization (per-branch) ──────────────────────────
mean_src = [0.3542, 0.3542, 0.3542]; std_src  = [0.2794, 0.2794, 0.2794]
mean_tgt = [0.2554, 0.2554, 0.2554]; std_tgt  = [0.2900, 0.2900, 0.2900]

tx_src = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean_src, std_src)])
tx_tgt = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean_tgt, std_tgt)])

# ─── VGG‑19 features builder (cfg E) ─────────────────────
cfgs = {"E":[64,64,"M",128,128,"M",256,256,256,256,"M",512,512,512,512,"M",512,512,512,512,"M"]}
def make_layers(cfg):
    layers, in_c = [], 3
    for v in cfg:
        if v == "M": layers.append(nn.MaxPool2d(2,2))
        else:
            layers += [nn.Conv2d(in_c, v, 3, padding=1), nn.ReLU(True)]
            in_c = v
    return nn.Sequential(*layers)

# ─── GRL (λ scales reversed gradient; don't also weight the loss) ─────────
from torch.autograd import Function
class GradReverse(Function):
    @staticmethod
    def forward(ctx, x, lambd):
        ctx.lambd = lambd
        return x.view_as(x)
    @staticmethod
    def backward(ctx, grad_output):
        return grad_output.neg() * ctx.lambd, None
def grad_reverse(x, lambd=1.0): return GradReverse.apply(x, lambd)

# ─── DANN head (shared features; task head; domain head) ──────────────────
class DANN(nn.Module):
    def __init__(self, features, num_classes, bottleneck_dim=256):
        super().__init__()
        self.features = features
        self.avgpool = nn.AdaptiveAvgPool2d((7,7))
        self.bottleneck = nn.Sequential(
            nn.Linear(512*7*7, bottleneck_dim),
            nn.BatchNorm1d(bottleneck_dim),
            nn.ReLU(True),
        )
        self.cls_head = nn.Linear(bottleneck_dim, num_classes)
        self.dom_head = nn.Sequential(nn.Linear(bottleneck_dim,100), nn.ReLU(True), nn.Linear(100,2))
    def forward(self, x, lambda_=0.0):
        f = self.features(x)
        f = self.avgpool(f); f = torch.flatten(f, 1)
        z = self.bottleneck(f)
        y_logits = self.cls_head(z)                      # task logits
        z_rev = grad_reverse(z, lambda_)
        d_logits = self.dom_head(z_rev)                  # domain logits
        return y_logits, d_logits

# ─── Load VGG‑19 features from our full classifier checkpoints ───────────
def load_vgg19_features(ckpt_path, num_classes):
    features = make_layers(cfgs["E"])
    model = nn.Sequential(
        features, nn.AdaptiveAvgPool2d((7,7)), nn.Flatten(),
        nn.Linear(512*7*7, 4096), nn.ReLU(True), nn.Dropout(),
        nn.Linear(4096, 4096), nn.ReLU(True), nn.Dropout(),
        nn.Linear(4096, num_classes)
    )
    sd = torch.load(ckpt_path, map_location=device)
    # allow missing final classifier if shapes mismatch
    try: model.load_state_dict(sd, strict=True)
    except Exception:
        # drop last layer keys if needed
        for k in list(sd.keys()):
            if k.endswith("9.weight") or k.endswith("9.bias"): del sd[k]
        model.load_state_dict(sd, strict=False)
    return model[0]  # features

# ─── Datasets & loaders ───────────────────────────────────────────────────
# Source labeled
src_ds = datasets.ImageFolder(SRC_DIR, transform=tx_src)
CLASS_NAMES = src_ds.classes
CLASS_TO_IDX = {c:i for i,c in enumerate(CLASS_NAMES)}
N_CLASSES = len(CLASS_NAMES)

def src_loader(bs=32, shuffle=True, num_workers=2):
    return DataLoader(src_ds, batch_size=bs, shuffle=shuffle, num_workers=num_workers, pin_memory=True)

# Target unlabeled from flat folder + CSV for eval
class TargetFlatCSV(Dataset):
    def __init__(self, root, csv_path, transform):
        self.root = root; self.transform = transform
        df = pd.read_csv(csv_path)
        self.names = df.iloc[:,0].astype(str).tolist()
        self.cls_names = df.iloc[:,1].astype(str).tolist()
        # map to indices using source class vocabulary
        self.cls_idx = [CLASS_TO_IDX[c] for c in self.cls_names]
    def __len__(self): return len(self.names)
    def __getitem__(self, i):
        fp = os.path.join(self.root, self.names[i])
        img = Image.open(fp).convert("RGB")
        return self.transform(img), self.cls_idx[i], self.names[i]

tgt_ds = TargetFlatCSV(TGT_IMG_ROOT, TGT_CSV, tx_tgt)

def tgt_loader(bs=32, shuffle=True, num_workers=2):
    return DataLoader(tgt_ds, batch_size=bs, shuffle=shuffle, num_workers=num_workers, pin_memory=True)

# ─── Metrics helpers ───────────────────────────────────────────────────────
def save_confusion_matrix(y_true, y_pred, class_names, save_path):
    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    plt.figure(figsize=(10,8))
    ax = sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                     xticklabels=class_names, yticklabels=class_names,
                     cbar=False, linewidths=.5)
    ax.set_xlabel('Predicted'); ax.set_ylabel('Actual'); ax.set_title('Confusion Matrix (Target)')
    plt.tight_layout(); plt.savefig(save_path, dpi=220); plt.close()
    return cm

def specificity_from_cm(cm):
    # per-class specificity: TN / (TN + FP)
    spec = []
    for k in range(cm.shape[0]):
        TP = cm[k,k]
        FP = cm[:,k].sum() - TP
        FN = cm[k,:].sum() - TP
        TN = cm.sum() - (TP+FP+FN)
        spec.append( TN / (TN + FP + 1e-12) )
    return np.array(spec)

def macro_auc(y_true, y_proba, n_classes):
    # y_true: ints; y_proba: (N, C) softmax probs
    y_true_oh = np.eye(n_classes)[np.asarray(y_true)]
    return roc_auc_score(y_true_oh, y_proba, average="macro", multi_class="ovr")  # sklearn API
    # ref: sklearn roc_auc_score supports multiclass ovr/ovo. See docs.  # noqa

# ─── Training + evaluation for one (ckpt, lambda) ──────────────────────────
def train_and_eval_dann(ckpt_path, ckpt_name, lambda_val, epochs=10, bs=32, lr=1e-4):
    # loaders
    Ls = src_loader(bs=bs, shuffle=True)
    Lt = tgt_loader(bs=bs, shuffle=True)

    # model/opt
    feats = load_vgg19_features(ckpt_path, N_CLASSES)
    model = DANN(feats, N_CLASSES).to(device)
    opt = optim.Adam(model.parameters(), lr=lr)
    ce = nn.CrossEntropyLoss()

    model.train()
    it_tgt = iter(Lt)
    for ep in range(epochs):
        for x_s, y_s in Ls:
            x_s, y_s = x_s.to(device), y_s.to(device)
            try:
                x_t, _, _ = next(it_tgt)
            except StopIteration:
                it_tgt = iter(Lt)
                x_t, _, _ = next(it_tgt)
            x_t = x_t.to(device)

            # Classifier on source only
            y_logits_s, _ = model(x_s, lambda_=0.0)
            L_cls = ce(y_logits_s, y_s)

            # Domain on concatenated (source=0, target=1) with GRL scaling = lambda_val
            x_dom = torch.cat([x_s, x_t], dim=0)
            _, d_logits = model(x_dom, lambda_=lambda_val)  # scale via GRL only
            d_labels = torch.cat([
                torch.zeros(x_s.size(0), dtype=torch.long),
                torch.ones(x_t.size(0), dtype=torch.long)
            ], dim=0).to(device)
            L_dom = ce(d_logits, d_labels)

            loss = L_cls + L_dom  # GRL already applies λ to the gradient
            opt.zero_grad(); loss.backward(); opt.step()

    # ── Evaluate on ALL target images (order from dataset)
    model.eval()
    all_true, all_pred, all_prob = [], [], []
    with torch.no_grad():
        for xb, yb, _names in DataLoader(tgt_ds, batch_size=64, shuffle=False, num_workers=2):
            xb = xb.to(device)
            y_logits, _ = model(xb, lambda_=0.0)
            probs = torch.softmax(y_logits, dim=1)
            pred = probs.argmax(dim=1)
            all_true.extend(yb.tolist())
            all_pred.extend(pred.cpu().tolist())
            all_prob.append(probs.cpu().numpy())
    all_prob = np.concatenate(all_prob, axis=0)

    # ── Metrics
    correct = int(np.sum(np.array(all_true)==np.array(all_pred)))
    total = len(all_true)
    acc = correct / total
    miss = 1.0 - acc

    # macro precision/recall(FN=Sensitivity)/F1
    prec, rec, f1, _ = precision_recall_fscore_support(all_true, all_pred, labels=list(range(N_CLASSES)), average='macro', zero_division=0)
    # macro specificity from confusion matrix
    cm = save_confusion_matrix(all_true, all_pred, CLASS_NAMES,
                               os.path.join(CONF_DIR, f"{ckpt_name}_{lambda_val}.png"))
    spec_macro = specificity_from_cm(cm).mean()

    # macro AUC (OvR)
    try:
        auc_macro = macro_auc(all_true, all_prob, N_CLASSES)
    except Exception:
        auc_macro = float('nan')  # if some classes never appear / degenerate

    row = {
        "Checkpoint": ckpt_name,
        "GAN Type": "DANN",
        "DA Hyperparameter": "lambda",
        "DA Hyperparameter Value": lambda_val,
        "Correctly Identified Images": correct,
        "Incorrectly Identified Images": total - correct,
        "Image Classification Accuracy": acc,
        "Image Miss Rate": miss,
        "Precision (macro)": prec,
        "Sensitivity/Recall (macro)": rec,
        "Specificity (macro)": spec_macro,
        "F1-Score (macro)": f1,
        "AUC-ROC (macro OvR)": auc_macro
    }
    return row

# ─── Run all checkpoints × lambdas ─────────────────────────────────────────
lambda_vals = [0.01, 0.05, 0.1, 0.5, 1.0]
ckpt_files = [f for f in os.listdir(CKPT_DIR) if f.endswith(".pth")]
results = []

for fname in tqdm(ckpt_files, desc="Checkpoints x lambdas (DANN)"):
    ckpt_path = os.path.join(CKPT_DIR, fname)
    ckpt_name = os.path.splitext(fname)[0].replace(" ", "").replace("-", "").replace("__", "_")
    for lam in lambda_vals:
        row = train_and_eval_dann(ckpt_path, ckpt_name, lam, epochs=10, bs=32, lr=1e-4)
        results.append(row)

# ─── Save Excel with 'Checkpoint' before 'DA Hyperparameter' ──────────────
df = pd.DataFrame(results)

# enforce column order
cols = list(df.columns)
# Move 'Checkpoint' to before 'DA Hyperparameter'
for col in ["Checkpoint","GAN Type","DA Hyperparameter","DA Hyperparameter Value"]:
    assert col in cols, f"Missing column {col}"

ordered = (
    ["Checkpoint","GAN Type","DA Hyperparameter","DA Hyperparameter Value"] +
    [c for c in df.columns if c not in ["Checkpoint","GAN Type","DA Hyperparameter","DA Hyperparameter Value"]]
)
df = df[ordered]

save_xlsx = os.path.join(METRICS_DIR, "VGG19_DANN_metrics.xlsx")
df.to_excel(save_xlsx, index=False)
print(f"✅ Saved metrics to: {save_xlsx}")
print(f"✅ Confusion matrices in: {CONF_DIR}")

Checkpoints x lambdas (DANN): 100%|██████████| 5/5 [1:32:38<00:00, 1111.79s/it]


✅ Saved metrics to: /content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/VGG_19-DANN/Performance Metrics/VGG19_DANN_metrics.xlsx
✅ Confusion matrices in: /content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/VGG_19-DANN/Confusion Matrices


11) Let us do Domain Adaptation with ADDA using VGG-19's feature extractor.

ADDA is inspired by https://openaccess.thecvf.com/content_cvpr_2017/papers/Tzeng_Adversarial_Discriminative_Domain_CVPR_2017_paper.pdf.

In [None]:
import os, re, random, numpy as np, pandas as pd
from copy import deepcopy
from PIL import Image
from tqdm import tqdm

import torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, datasets

import matplotlib.pyplot as plt, seaborn as sns
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, roc_auc_score

# ───────────────────────────── Reproducibility ─────────────────────────────
SEED = 1906525
os.environ["PYTHONHASHSEED"] = str(SEED)
random.seed(SEED); np.random.seed(SEED)
torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ──────────────────────────────── Paths ────────────────────────────────────
SRC_DIR  = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Source - MedMNIST Labelled/Images"
CKPT_DIR = "/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/Top-5 Souce Checkpoints"

TGT_IMG_ROOT = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Target - VS Unlabelled/Images"
TGT_CSV      = "/content/drive/MyDrive/Research Project 2025/Preprocessed Datasets/Samples/Target - VS Unlabelled/Target sample labels.csv"

CONF_DIR    = "/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/VGG_19-ADDA/Confusion Matrices"
METRICS_DIR = "/content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/VGG_19-ADDA/Performance Metrics"
os.makedirs(CONF_DIR, exist_ok=True); os.makedirs(METRICS_DIR, exist_ok=True)

# ─────────────────────────── Normalization (per branch) ────────────────────
mean_src = [0.3542, 0.3542, 0.3542]; std_src  = [0.2794, 0.2794, 0.2794]
mean_tgt = [0.2554, 0.2554, 0.2554]; std_tgt  = [0.2900, 0.2900, 0.2900]

tx_src = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean_src, std_src)])
tx_tgt = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean_tgt, std_tgt)])

# ─────────────────────────── VGG-19 blocks we need ─────────────────────────
# Encoder = conv features + avgpool + Flatten + FC4096 + FC4096   (outputs 4096-d)
# ClassifierHead = Linear(4096 -> num_classes)
cfgs = {"E":[64,64,"M",128,128,"M",256,256,256,256,"M",512,512,512,512,"M",512,512,512,512,"M"]}

def make_layers(cfg):
    layers, in_c = [], 3
    for v in cfg:
        if v == "M":
            layers.append(nn.MaxPool2d(2,2))
        else:
            layers += [nn.Conv2d(in_c, v, 3, padding=1), nn.ReLU(True)]
            in_c = v
    return nn.Sequential(*layers)

class VGG19Encoder(nn.Module):
    def __init__(self, features):
        super().__init__()
        self.features = features
        self.avgpool = nn.AdaptiveAvgPool2d((7,7))
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512*7*7, 4096), nn.ReLU(True), nn.Dropout(),
            nn.Linear(4096, 4096), nn.ReLU(True), nn.Dropout()
        )
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = self.fc(x)          # (N, 4096)
        return x

class ClassifierHead(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.fc = nn.Linear(4096, num_classes)
    def forward(self, z):
        return self.fc(z)

class Discriminator(nn.Module):
    # Domain: source=1, target=0  (BCEWithLogits)
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(4096, 1024), nn.ReLU(True), nn.Dropout(0.5),
            nn.Linear(1024, 1024), nn.ReLU(True), nn.Dropout(0.5),
            nn.Linear(1024, 1)
        )
    def forward(self, z):
        return self.net(z).view(-1)

# Convenience: full source model structure to load checkpoints strictly
class _FullVGG19ForLoad(nn.Module):
    # features → avgpool → Flatten → 4096 → 4096 → num_classes
    def __init__(self, num_classes):
        super().__init__()
        self.features = make_layers(cfgs["E"])
        self.avgpool = nn.AdaptiveAvgPool2d((7,7))
        self.seq = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512*7*7, 4096), nn.ReLU(True), nn.Dropout(),
            nn.Linear(4096, 4096), nn.ReLU(True), nn.Dropout(),
            nn.Linear(4096, num_classes)
        )
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        return self.seq(x)

def load_src_encoder_and_classifier(ckpt_path, num_classes):
    full = _FullVGG19ForLoad(num_classes)
    sd = torch.load(ckpt_path, map_location='cpu')
    # strict load; if last layer size mismatched in old ckpt, relax just that
    try:
        full.load_state_dict(sd, strict=True)
    except Exception:
        # drop final layer if mismatch, then load non-strict
        for k in list(sd.keys()):
            if k.endswith(".6.weight") or k.endswith(".6.bias"):
                del sd[k]
        full.load_state_dict(sd, strict=False)

    # split into Encoder (till penultimate layer) + Classifier head
    encoder = VGG19Encoder(full.features)
    # copy fc weights from the loaded model
    encoder.avgpool.load_state_dict(full.avgpool.state_dict())
    # copy Flatten + FC4096 + FC4096 from 'seq' excluding final (index 6)
    # full.seq: [Flatten, Lin, ReLU, Drop, Lin, ReLU, Drop, Lin(num_classes)]
    enc_seq = nn.Sequential(*list(full.seq.children())[:-1])  # drop final Linear
    encoder.fc.load_state_dict(enc_seq.state_dict())

    classifier = ClassifierHead(num_classes)
    # grab last Linear from full.seq
    classifier.fc.load_state_dict(list(full.seq.children())[-1].state_dict())
    return encoder.to(device), classifier.to(device)

# ───────────────────────────── Datasets & Loaders ──────────────────────────
src_ds = datasets.ImageFolder(SRC_DIR, transform=tx_src)
CLASS_NAMES = src_ds.classes
CLASS_TO_IDX = {c:i for i,c in enumerate(CLASS_NAMES)}
N_CLASSES = len(CLASS_NAMES)

def src_loader(bs=32, shuffle=True, num_workers=2):
    return DataLoader(src_ds, batch_size=bs, shuffle=shuffle, num_workers=num_workers, pin_memory=True)

class TargetFlatCSV(Dataset):
    # CSV: [image_name, class_name]
    def __init__(self, root, csv_path, transform):
        self.root = root; self.transform = transform
        df = pd.read_csv(csv_path)
        self.names = df.iloc[:,0].astype(str).tolist()
        self.cls_names = df.iloc[:,1].astype(str).tolist()
        self.cls_idx = [CLASS_TO_IDX[c] for c in self.cls_names]
    def __len__(self): return len(self.names)
    def __getitem__(self, i):
        fp = os.path.join(self.root, self.names[i])
        img = Image.open(fp).convert("RGB")
        return self.transform(img), self.cls_idx[i], self.names[i]

tgt_ds = TargetFlatCSV(TGT_IMG_ROOT, TGT_CSV, tx_tgt)

def tgt_loader(bs=32, shuffle=True, num_workers=2):
    return DataLoader(tgt_ds, batch_size=bs, shuffle=shuffle, num_workers=num_workers, pin_memory=True)

# ──────────────────────────────── Metrics utils ────────────────────────────
def save_confusion_matrix(y_true, y_pred, class_names, save_path):
    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    plt.figure(figsize=(10,8))
    ax = sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                     xticklabels=class_names, yticklabels=class_names,
                     cbar=False, linewidths=.5)
    ax.set_xlabel('Predicted'); ax.set_ylabel('Actual'); ax.set_title('Confusion Matrix (Target)')
    plt.tight_layout(); plt.savefig(save_path, dpi=220); plt.close()
    return cm

def specificity_from_cm(cm):
    spec = []
    for k in range(cm.shape[0]):
        TP = cm[k,k]
        FP = cm[:,k].sum() - TP
        FN = cm[k,:].sum() - TP
        TN = cm.sum() - (TP+FP+FN)
        spec.append( TN / (TN + FP + 1e-12) )
    return np.array(spec)

def macro_auc(y_true, y_proba, n_classes):
    # y_proba: (N, C) softmax probs
    y_true_oh = np.eye(n_classes)[np.asarray(y_true)]
    return roc_auc_score(y_true_oh, y_proba, average="macro", multi_class="ovr")

# ─────────────────────────────── ADDA Training ─────────────────────────────
def infinite(dl):
    while True:
        for b in dl: yield b

def lr_name(lr):
    # exact
    mapping = {1e-3:"1e-3", 5e-4:"5e-4", 2e-4:"2e-4", 1e-4:"1e-4", 5e-5:"5e-5"}
    return mapping.get(lr, f"{lr:.0e}".replace("e-0","e-"))

def sanitize(s):
    # keep letters, digits and underscores for ckpt names
    s = s.replace(" ", "_")
    s = re.sub(r"[^A-Za-z0-9_]+", "", s)
    return s

@torch.no_grad()
def eval_on_target(tgt_encoder, src_classifier, bs=64):
    tgt_encoder.eval(); src_classifier.eval()
    y_true, y_pred, probs = [], [], []

    for xb, yb, _ in DataLoader(tgt_ds, batch_size=bs, shuffle=False, num_workers=2):
        xb = xb.to(device)
        feats = tgt_encoder(xb)
        logits = src_classifier(feats)
        p = torch.softmax(logits, dim=1)
        pred = p.argmax(dim=1)

        y_true.extend(yb.tolist())
        y_pred.extend(pred.cpu().tolist())
        probs.append(p.cpu().numpy())

    probs = np.concatenate(probs, axis=0) if len(probs)>0 else np.zeros((0, N_CLASSES))
    return y_true, y_pred, probs

def train_adda_for_ckpt_lr(ckpt_path, ckpt_name, lr, epochs=10, bs=32): #change epochs
    # Phase A (already done offline): Source pretraining
    # Load source encoder & classifier from checkpoint; freeze them.
    src_encoder, src_classifier = load_src_encoder_and_classifier(ckpt_path, N_CLASSES)
    for p in list(src_encoder.parameters()) + list(src_classifier.parameters()):
        p.requires_grad_(False)
    src_encoder.eval(); src_classifier.eval()

    # Phase B (ADDA): initialize target encoder from source encoder; train adversarially
    tgt_encoder = deepcopy(src_encoder).train().to(device)
    D = Discriminator().to(device)

    opt_G = optim.Adam(tgt_encoder.parameters(), lr=lr, betas=(0.5, 0.999))
    opt_D = optim.Adam(D.parameters(),           lr=lr, betas=(0.5, 0.999))
    bce   = nn.BCEWithLogitsLoss()

    Ls = src_loader(bs=bs, shuffle=True)
    Lt = tgt_loader(bs=bs, shuffle=True)
    it_s, it_t = infinite(Ls), infinite(Lt)

    steps_per_epoch = min(len(Ls), len(Lt))

    for ep in range(epochs):
        tgt_encoder.train(); D.train()
        loop = tqdm(range(steps_per_epoch), desc=f"[ADDA] {ckpt_name} | lr={lr_name(lr)} | ep {ep+1}/{epochs}", leave=False)
        for _ in loop:
            # ── sample source & target
            xs, ys = next(it_s); xt, _, _ = next(it_t)
            xs = xs.to(device); xt = xt.to(device)

            with torch.no_grad():
                z_s = src_encoder(xs)    # fixed
            z_t = tgt_encoder(xt)        # trainable

            # ── (1) Train Discriminator: D(z_s)=1, D(z_t)=0
            opt_D.zero_grad(set_to_none=True)
            d_src = D(z_s.detach()); lbl_src = torch.ones_like(d_src)
            d_tgt = D(z_t.detach()); lbl_tgt = torch.zeros_like(d_tgt)
            loss_D = (bce(d_src, lbl_src) + bce(d_tgt, lbl_tgt)) * 0.5
            loss_D.backward(); opt_D.step()

            # ── (2) Train Target Encoder (Generator side): fool D → want D(z_t)=1
            opt_G.zero_grad(set_to_none=True)
            d_tgt_for_g = D(z_t)               # reuse latest z_t
            lbl_trick   = torch.ones_like(d_tgt_for_g)
            loss_G = bce(d_tgt_for_g, lbl_trick)
            loss_G.backward(); opt_G.step()
            loop.set_postfix({"loss_D": float(loss_D), "loss_G": float(loss_G)})

    # ── Evaluate target using frozen source classifier
    y_true, y_pred, y_prob = eval_on_target(tgt_encoder, src_classifier, bs=64)

    total = len(y_true)
    correct = int(np.sum(np.array(y_true)==np.array(y_pred)))
    acc = correct / (total if total>0 else 1.0)
    miss = 1.0 - acc

    prec, rec, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, labels=list(range(N_CLASSES)), average='macro', zero_division=0
    )

    cm = save_confusion_matrix(
        y_true, y_pred, CLASS_NAMES,
        os.path.join(CONF_DIR, f"{sanitize(ckpt_name)}_{lr_name(lr)}.png")
    )
    spec_macro = specificity_from_cm(cm).mean()

    try:
        auc_macro = macro_auc(y_true, y_prob, N_CLASSES)
    except Exception:
        auc_macro = float('nan')  # handle degenerate cases safely

    row = {
        "Checkpoint": ckpt_name,
        "GAN Type": "ADDA",
        "DA Hyperparameter": "lr",
        "DA Hyperparameter Value": lr_name(lr),
        "Correctly Identified Images": correct,
        "Incorrectly Identified Images": total - correct,
        "Image Classification Accuracy": acc,
        "Classification Rate": acc,
        "Image Miss Rate": miss,
        "Precision (macro)": prec,
        "Sensitivity/Recall (macro)": rec,
        "Specificity (macro)": spec_macro,
        "F1-Score (macro)": f1,
        "AUC-ROC (macro OvR)": auc_macro
    }
    return row

# ───────────────────────────── Run: ckpts × lr grid ────────────────────────
lr_vals = [1e-3, 5e-4, 2e-4, 1e-4, 5e-5]
ckpt_files = [f for f in os.listdir(CKPT_DIR) if f.endswith(".pth")]
results = []

for fname in ckpt_files:
    ckpt_path = os.path.join(CKPT_DIR, fname)
    ckpt_name = os.path.splitext(fname)[0]
    for lr in lr_vals:
        row = train_adda_for_ckpt_lr(ckpt_path, ckpt_name, lr, epochs=10, bs=32)
        results.append(row)

# ───────────────────────────── Save Excel (append) ─────────────────────────
df_new = pd.DataFrame(results)

xlsx_path = os.path.join(METRICS_DIR, "VGG19_ADDA_metrics.xlsx")
if os.path.exists(xlsx_path):
    try:
        df_old = pd.read_excel(xlsx_path)
        df_out = pd.concat([df_old, df_new], ignore_index=True)
    except Exception:
        df_out = df_new.copy()
else:
    df_out = df_new.copy()

# Ensure the column ordering: 'Checkpoint' before 'DA Hyperparameter'
ordered = (
    ["Checkpoint","GAN Type","DA Hyperparameter","DA Hyperparameter Value"] +
    [c for c in df_out.columns if c not in ["Checkpoint","GAN Type","DA Hyperparameter","DA Hyperparameter Value"]]
)
df_out = df_out[ordered]
df_out.to_excel(xlsx_path, index=False)

print(f"✅ Saved metrics to: {xlsx_path}")
print(f"✅ Confusion matrices saved in: {CONF_DIR}")



✅ Saved metrics to: /content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/VGG_19-ADDA/Performance Metrics/VGG19_ADDA_metrics.xlsx
✅ Confusion matrices saved in: /content/drive/MyDrive/Research Project 2025/Results/Samples/VGG-19/Domain Adaptation/VGG_19-ADDA/Confusion Matrices
