In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        os.path.join(dirname, filename)

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Library

In [None]:
import os, random, time, math, sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from PIL import Image
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset, random_split
from torchvision import transforms as T
from torchvision.datasets import ImageFolder
import torchvision.models as models

from sklearn.metrics import classification_report, roc_curve, roc_auc_score
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.preprocessing import label_binarize
from sklearn.manifold import TSNE
import umap.umap_ as umap



# Device Settings

In [None]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
DATA_DIR = "/kaggle/input/monkeypox-skin-lesion-dataset/Augmented Images/Augmented Images"
SAVE_DIR = "/kaggle/working"
os.makedirs(SAVE_DIR, exist_ok=True)

print(f"Device: {DEVICE}")


# DATA SETTINGS

In [None]:
SEED = 42
BATCH_SIZE_SSL = 128         # for SimCLR pairs
BATCH_SIZE_SUP = 128         # for supervised (linear eval / test)
EPOCHS_SSL = 2000             # SimCLR pretraining epochs
EPOCHS_LINEAR = 2000           # linear eval epochs
TEMPERATURE = 0.5           # NT-Xent temperature
FEATURE_DIM = 2048          # ResNet-50 penultimate layer output
PROJ_DIM = 128              # projection head output
IMG_SIZE = 224
LR_SSL = 3e-4
LR_LINEAR = 3e-4

torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

# Utils

In [None]:
def set_seed(seed=SEED):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

def plot_curve(values, title, xlab="Epoch", ylab="Value"):
    plt.figure()
    plt.plot(range(1, len(values)+1), values)
    plt.title(title)
    plt.xlabel(xlab)
    plt.ylabel(ylab)
    plt.grid(True)
    plt.show()


# SimCLR Transformation

In [None]:
class SimCLRTransform:
    """
    Returns two augmented views for each image.
    """
    def __init__(self, size=224):
        s = 1.0
        color_jitter = T.ColorJitter(0.8*s, 0.8*s, 0.8*s, 0.2*s)
        self.train_transform = T.Compose([
            T.RandomResizedCrop(size=size, scale=(0.2, 1.0)),
            T.RandomHorizontalFlip(p=0.5),
            T.RandomApply([color_jitter], p=0.8),
            T.RandomGrayscale(p=0.2),
            T.GaussianBlur(kernel_size=3),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225]),
        ])

    def __call__(self, x):
        return self.train_transform(x), self.train_transform(x)

# For supervised loaders (linear eval / test)
SUPERVISED_TRANSFORM = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]),
])

# Contrastive Pair Dataset

In [None]:
class ContrastivePairDataset(Dataset):
    def __init__(self, image_folder: ImageFolder, transform_pair):
        self.imgs = image_folder.imgs
        self.loader = image_folder.loader
        self.transform_pair = transform_pair

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        path, _ = self.imgs[idx]
        img = self.loader(path)
        if not isinstance(img, Image.Image):
            img = Image.open(path).convert("RGB")
        v1, v2 = self.transform_pair(img)
        return v1, v2


# Data Load & Split (20-80)

In [None]:
full_dataset_supervised = ImageFolder(root=DATA_DIR, transform=SUPERVISED_TRANSFORM)
CLASS_TO_IDX = full_dataset_supervised.class_to_idx
IDX_TO_CLASS = {v:k for k,v in CLASS_TO_IDX.items()}
NUM_CLASSES = len(CLASS_TO_IDX)
print("Classes:", CLASS_TO_IDX)

n_total = len(full_dataset_supervised)
n_train_ssl = int(0.4 * n_total)
n_test = n_total - n_train_ssl
ssl_train_sup, test_sup = random_split(full_dataset_supervised, [n_train_ssl, n_test], generator=torch.Generator().manual_seed(SEED))

# SSL Contrastive dataset uses the same images but returns pairs
ssl_train_contrast = ContrastivePairDataset(
    ImageFolder(root=DATA_DIR),  # raw loader
    transform_pair=SimCLRTransform(size=IMG_SIZE)
)

ssl_train_indices = ssl_train_sup.indices if hasattr(ssl_train_sup, 'indices') else list(range(n_train_ssl))
ssl_train_subset = Subset(ssl_train_contrast, ssl_train_indices)

ssl_train_loader = DataLoader(ssl_train_subset, batch_size=BATCH_SIZE_SSL, shuffle=True, num_workers=2, drop_last=True)

# Supervised loaders for downstream / test
train_sup_loader = DataLoader(Subset(full_dataset_supervised, ssl_train_indices),
                              batch_size=BATCH_SIZE_SUP, shuffle=True, num_workers=2)
test_loader = DataLoader(test_sup, batch_size=BATCH_SIZE_SUP, shuffle=False, num_workers=2)

print(f"Total: {n_total} | SSL Train: {len(ssl_train_subset)} | Test: {len(test_sup)}")

# For downstream we also want a val split from the SSL-train portion
val_ratio = 0.2
n_train_down = int((1 - val_ratio) * len(ssl_train_indices))
n_val_down = len(ssl_train_indices) - n_train_down
train_down_indices, val_down_indices = random_split(ssl_train_indices, [n_train_down, n_val_down],
                                                    generator=torch.Generator().manual_seed(SEED))
train_down_loader = DataLoader(Subset(full_dataset_supervised, train_down_indices),
                               batch_size=BATCH_SIZE_SUP, shuffle=True, num_workers=2)
val_down_loader = DataLoader(Subset(full_dataset_supervised, val_down_indices),
                             batch_size=BATCH_SIZE_SUP, shuffle=False, num_workers=2)


# SimCLR setup

In [None]:
class ProjectionHead(nn.Module):
    def __init__(self, in_dim=FEATURE_DIM, proj_dim=PROJ_DIM, hidden_dim=2048):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden_dim),
            nn.ReLU(inplace=True),
            nn.Linear(hidden_dim, proj_dim)
        )

    def forward(self, x):
        return self.net(x)

class SimCLR(nn.Module):
    def __init__(self, proj_dim=PROJ_DIM):
        super().__init__()
        backbone = models.resnet50(weights=None)  # self-supervised from scratch
        backbone.fc = nn.Identity()               # take 2048-d features
        self.encoder = backbone
        self.projector = ProjectionHead(in_dim=FEATURE_DIM, proj_dim=proj_dim)

    def forward(self, x):
        h = self.encoder(x)            # [B, 2048]
        z = self.projector(h)          # [B, proj_dim]
        z = F.normalize(z, dim=1)
        return h, z

# NT-Xent (InfoNCE) loss
def nt_xent_loss(z_i, z_j, temperature=0.5):
    """
    z_i, z_j: [B, D] normalized
    Returns: scalar loss
    """
    batch_size = z_i.size(0)
    z = torch.cat([z_i, z_j], dim=0)  # [2B, D]
    sim = torch.matmul(z, z.T)        # [2B, 2B], cosine since z normalized

    # remove self-similarity
    mask = torch.eye(2*batch_size, dtype=torch.bool, device=z.device)
    sim = sim.masked_fill(mask, -9e15)

    # positives: diagonal across halves
    positives = torch.cat([torch.arange(batch_size, 2*batch_size),
                           torch.arange(0, batch_size)]).to(z.device)
    numerator = torch.exp(sim[torch.arange(2*batch_size), positives] / temperature)

    denominator = torch.sum(torch.exp(sim / temperature), dim=1)
    loss = -torch.log(numerator / denominator).mean()
    return loss


In [None]:
simclr = SimCLR(proj_dim=PROJ_DIM).to(DEVICE)
opt_ssl = torch.optim.Adam(simclr.parameters(), lr=LR_SSL)

ssl_loss_history = []

print("Starting SimCLR pretraining...")
for epoch in range(1, EPOCHS_SSL + 1):
    simclr.train()
    running = 0.0
    for v1, v2 in tqdm(ssl_train_loader, desc=f"SSL Epoch {epoch}/{EPOCHS_SSL}"):
        v1, v2 = v1.to(DEVICE), v2.to(DEVICE)
        _, z1 = simclr(v1)
        _, z2 = simclr(v2)
        loss = nt_xent_loss(z1, z2, temperature=TEMPERATURE)
        opt_ssl.zero_grad()
        loss.backward()
        opt_ssl.step()
        running += loss.item() * v1.size(0)
    epoch_loss = running / (len(ssl_train_loader.dataset))
    ssl_loss_history.append(epoch_loss)
    print(f"SSL Epoch {epoch}: loss={epoch_loss:.4f}")

# Save encoder weights
torch.save(simclr.encoder.state_dict(), os.path.join(SAVE_DIR, "simclr_resnet50_encoder.pth"))
print("Saved SimCLR encoder to:", os.path.join(SAVE_DIR, "simclr_resnet50_encoder.pth"))


In [None]:
def extract_features(dataloader, encoder):
    encoder.eval()
    feats, labs = [], []
    with torch.no_grad():
        for imgs, labels in dataloader:
            imgs = imgs.to(DEVICE)
            h = encoder(imgs)  # [B, 2048]
            feats.append(h.cpu().numpy())
            labs.append(labels.numpy())
    feats = np.concatenate(feats, axis=0)
    labs = np.concatenate(labs, axis=0)
    return feats, labs

def subset_for_vis(dataloader, max_samples=1000):
    xs, ys = [], []
    total = 0
    for imgs, labels in dataloader:
        if total >= max_samples:
            break
        take = min(imgs.size(0), max_samples - total)
        xs.append(imgs[:take])
        ys.append(labels[:take])
        total += take
    X = torch.cat(xs, dim=0)
    y = torch.cat(ys, dim=0)
    return DataLoader(list(zip(X, y)), batch_size=BATCH_SIZE_SUP, shuffle=False)

# Build a small vis set from the train_down set
vis_loader = subset_for_vis(train_down_loader, max_samples=1000)

# Reload a plain ResNet50 encoder and load weights (ensures clean .eval() model)
encoder_vis = models.resnet50(weights=None)
encoder_vis.fc = nn.Identity()
encoder_vis.load_state_dict(torch.load(os.path.join(SAVE_DIR, "simclr_resnet50_encoder.pth"), map_location=DEVICE))
encoder_vis = encoder_vis.to(DEVICE)
encoder_vis.eval()

vis_features, vis_labels = extract_features(vis_loader, encoder_vis)

# t-SNE (2D)
print("Computing t-SNE on train features...")
tsne = TSNE(n_components=2, init="pca", learning_rate="auto", perplexity=30, random_state=SEED)
tsne_2d = tsne.fit_transform(vis_features)
plt.figure(figsize=(6,5))
for c in np.unique(vis_labels):
    idx = vis_labels == c
    plt.scatter(tsne_2d[idx,0], tsne_2d[idx,1], s=8, label=IDX_TO_CLASS[c], alpha=0.7)
plt.legend()
plt.title("SimCLR Train Features (t-SNE)")
plt.grid(True)
plt.show()

In [None]:
print("Computing UMAP on train features...")
reducer = umap.UMAP(n_components=2, random_state=SEED)
umap_2d = reducer.fit_transform(vis_features)
plt.figure(figsize=(6,5))
for c in np.unique(vis_labels):
    idx = vis_labels == c
    plt.scatter(umap_2d[idx,0], umap_2d[idx,1], s=8, label=IDX_TO_CLASS[c], alpha=0.7)
plt.legend()
plt.title("SimCLR Train Features (UMAP)")
plt.grid(True)
plt.show()

****SSL Loss Curve****

In [None]:
plot_curve(ssl_loss_history, "SimCLR Training Loss", "Epoch", "NT-Xent Loss")


****Downstream Linear Evaluation****

In [None]:
# Freeze encoder, train linear classifier on train_down, validate on val_down
encoder_linear = models.resnet50(weights=None)
encoder_linear.fc = nn.Identity()
encoder_linear.load_state_dict(torch.load(os.path.join(SAVE_DIR, "simclr_resnet50_encoder.pth"), map_location=DEVICE))
encoder_linear = encoder_linear.to(DEVICE)
for p in encoder_linear.parameters():
    p.requires_grad = False
encoder_linear.eval()

classifier = nn.Linear(FEATURE_DIM, NUM_CLASSES).to(DEVICE)
opt_lin = torch.optim.Adam(classifier.parameters(), lr=LR_LINEAR)
criterion_ce = nn.CrossEntropyLoss()

lin_train_losses, lin_val_losses = [], []
lin_train_accs, lin_val_accs = [], []

print("Starting linear evaluation...")
for epoch in range(1, EPOCHS_LINEAR + 1):
    # Train classifier
    classifier.train()
    running_loss, correct, total = 0.0, 0, 0
    for imgs, labels in tqdm(train_down_loader, desc=f"Linear Epoch {epoch}/{EPOCHS_LINEAR}"):
        imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
        with torch.no_grad():
            feats = encoder_linear(imgs)
        logits = classifier(feats)
        loss = criterion_ce(logits, labels)
        opt_lin.zero_grad()
        loss.backward()
        opt_lin.step()

        running_loss += loss.item() * imgs.size(0)
        preds = logits.argmax(1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    train_loss = running_loss / total
    train_acc = correct / total
    lin_train_losses.append(train_loss)
    lin_train_accs.append(train_acc)

    # Validate
    classifier.eval()
    running_loss, correct, total = 0.0, 0, 0
    all_preds, all_labels = [], []
    with torch.no_grad():
        for imgs, labels in val_down_loader:
            imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
            feats = encoder_linear(imgs)
            logits = classifier(feats)
            loss = criterion_ce(logits, labels)
            running_loss += loss.item() * imgs.size(0)
            preds = logits.argmax(1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            all_preds.append(preds.cpu().numpy())
            all_labels.append(labels.cpu().numpy())

    val_loss = running_loss / total
    val_acc = correct / total
    lin_val_losses.append(val_loss)
    lin_val_accs.append(val_acc)
    print(f"Linear Epoch {epoch}: TrainLoss={train_loss:.4f} Acc={train_acc:.3f} | ValLoss={val_loss:.4f} Acc={val_acc:.3f}")


****Downstream classification report****

In [None]:
val_y = np.concatenate(all_labels)
val_pred = np.concatenate(all_preds)
print("\nDownstream Validation Classification Report:")
print(classification_report(val_y, val_pred, target_names=[IDX_TO_CLASS[i] for i in range(NUM_CLASSES)]))


****Downstream curves****

In [None]:
plt.figure(figsize=(12,4))
plt.subplot(1,2,1)
plt.plot(lin_train_losses, label='Train Loss')
plt.plot(lin_val_losses, label='Val Loss')
plt.title('Linear Eval Loss')
plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.grid(True); plt.legend()

plt.subplot(1,2,2)
plt.plot(lin_train_accs, label='Train Acc')
plt.plot(lin_val_accs, label='Val Acc')
plt.title('Linear Eval Accuracy')
plt.xlabel('Epoch'); plt.ylabel('Accuracy'); plt.grid(True); plt.legend()
plt.tight_layout(); plt.show()


****Testing****

In [None]:
classifier.eval()
test_labels_all, test_probs_all = [], []
test_preds_all = []

with torch.no_grad():
    for imgs, labels in test_loader:
        imgs = imgs.to(DEVICE); labels = labels.to(DEVICE)
        feats = encoder_linear(imgs)
        logits = classifier(feats)
        probs = F.softmax(logits, dim=1)
        preds = logits.argmax(1)

        test_labels_all.append(labels.cpu().numpy())
        test_probs_all.append(probs.cpu().numpy())
        test_preds_all.append(preds.cpu().numpy())

test_labels_all = np.concatenate(test_labels_all)
test_probs_all = np.concatenate(test_probs_all)
test_preds_all = np.concatenate(test_preds_all)

print("\nTest Classification Report:")
print(classification_report(test_labels_all, test_preds_all, target_names=[IDX_TO_CLASS[i] for i in range(NUM_CLASSES)]))


****ROC curve****

In [None]:
fpr, tpr, _ = roc_curve(test_labels_all, test_probs_all[:,1])
auc = roc_auc_score(test_labels_all, test_probs_all[:,1])
plt.figure()
plt.plot(fpr, tpr, label=f"AUC={auc:.3f}")
plt.plot([0,1], [0,1], '--')
plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
plt.title("ROC Curve (Test)")
plt.grid(True); plt.legend(); plt.show()


In [None]:
# t-SNE on TEST features (visual check of separability)
print("Computing t-SNE on TEST features...")
test_features, test_labels_for_vis = extract_features(test_loader, encoder_linear)
tsne_test = TSNE(n_components=2, init="pca", learning_rate="auto", perplexity=30, random_state=SEED)
tsne_test_2d = tsne_test.fit_transform(test_features)
plt.figure(figsize=(6,5))
for c in np.unique(test_labels_for_vis):
    idx = test_labels_for_vis == c
    plt.scatter(tsne_test_2d[idx,0], tsne_test_2d[idx,1], s=12, label=IDX_TO_CLASS[c], alpha=0.75)
plt.legend(); plt.title("Test Features (t-SNE)"); plt.grid(True); plt.show()