
# SimCLR ‚Äî Tiny-ImageNet (Modded)
Modifikasi yang diterapkan:
1. **Augmentasi tambahan**: `RandomAffine`, `RandomSolarize`, `RandomEqualize`.
2. **Backbone baru**: support `resnet34` selain `resnet18`/`resnet50`.
3. **Projection head**: `Linear ‚Üí BatchNorm ‚Üí ReLU ‚Üí Linear`.
4. **Hyperparameter**: temperature diset `0.2` (mudah diubah).
5. **Output visual**: plot **loss**, **t-SNE**, dan **PCA**.

> Catatan: Pastikan path dataset **Tiny-ImageNet** sesuai dengan lingkungan Anda.


In [None]:
import os
import shutil
import sys
import csv
import yaml
import math
import random

import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from torch.cuda.amp import GradScaler, autocast

from tqdm import tqdm

import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA

import torchvision
from torchvision import datasets, models
from torchvision.datasets import ImageFolder
from torchvision.transforms import transforms, InterpolationMode

import torch.backends.cudnn as cudnn

# Reproducibility
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

print("PyTorch:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())

In [None]:
class GaussianBlur(object):
    """blur a single image on CPU"""
    def __init__(self, kernel_size):
        radias = kernel_size // 2
        kernel_size = radias * 2 + 1
        self.blur_h = nn.Conv2d(3, 3, kernel_size=(kernel_size, 1),
                                stride=1, padding=0, bias=False, groups=3)
        self.blur_v = nn.Conv2d(3, 3, kernel_size=(1, kernel_size),
                                stride=1, padding=0, bias=False, groups=3)
        self.k = kernel_size
        self.r = radias

        self.blur = nn.Sequential(
            nn.ReflectionPad2d(radias),
            self.blur_h,
            self.blur_v
        )

        self.pil_to_tensor = transforms.ToTensor()
        self.tensor_to_pil = transforms.ToPILImage()

    def __call__(self, img):
        img = self.pil_to_tensor(img).unsqueeze(0)

        sigma = np.random.uniform(0.1, 2.0)
        x = np.arange(-self.r, self.r + 1)
        x = np.exp(-np.power(x, 2) / (2 * sigma * sigma))
        x = x / x.sum()
        x = torch.from_numpy(x).view(1, -1).repeat(3, 1)

        self.blur_h.weight.data.copy_(x.view(3, 1, self.k, 1))
        self.blur_v.weight.data.copy_(x.view(3, 1, 1, self.k))

        with torch.no_grad():
            img = self.blur(img)
            img = img.squeeze()

        img = self.tensor_to_pil(img)
        return img


def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
    torch.save(state, filename)
    if is_best:
        shutil.copyfile(filename, 'model_best.pth.tar')


def save_config_file(model_checkpoints_folder, args):
    if not os.path.exists(model_checkpoints_folder):
        os.makedirs(model_checkpoints_folder)
        with open(os.path.join(model_checkpoints_folder, 'config.yml'), 'w') as outfile:
            yaml.dump(args, outfile, default_flow_style=False)


def accuracy(output, target, topk=(1,)):
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)
        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))
        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res


class BaseSimCLRException(Exception):
    pass


class InvalidBackboneError(BaseSimCLRException):
    pass


class InvalidDatasetSelection(BaseSimCLRException):
    pass

# augmentasi
kode ini bertugas memperkaya variasi data gambar sebelum masuk ke model. Proses augmentasi meliputi transformasi seperti crop acak, rotasi, flipping, perubahan warna, hingga normalisasi. Tujuannya adalah membuat model lebih robust terhadap berbagai kondisi input dan mencegah overfitting, karena model belajar dari beragam variasi gambar meskipun berasal dari dataset yang sama.

In [None]:
class ContrastiveLearningViewGenerator(object):
    """Generate two random views of one image"""
    def __init__(self, base_transform, n_views=2):
        self.base_transform = base_transform
        self.n_views = n_views

    def __call__(self, x):
        return [self.base_transform(x) for _ in range(self.n_views)]


class ContrastiveLearningDataset:
    def __init__(self, root_folder):
        self.root_folder = root_folder

    @staticmethod
    def get_simclr_pipeline_transform(size, s=1):
        """Augmentasi (modifikasi): tambah RandomAffine, Solarize, Equalize"""
        color_jitter = transforms.ColorJitter(0.8 * s, 0.8 * s, 0.8 * s, 0.2 * s)
        data_transforms = transforms.Compose([
            transforms.RandomResizedCrop(size=size, interpolation=InterpolationMode.BICUBIC),
            transforms.RandomHorizontalFlip(),
            transforms.RandomApply([color_jitter], p=0.8),
            transforms.RandomGrayscale(p=0.2),
            GaussianBlur(kernel_size=int(0.1 * size)),
            # --- tambahan ---
            transforms.RandomAffine(degrees=20, translate=(0.1, 0.1)),
            transforms.RandomSolarize(threshold=128, p=0.2),
            transforms.RandomEqualize(p=0.2),
            transforms.ToTensor(),
            
        
        ])
        return data_transforms

    def get_dataset(self, name, n_views):
        valid_datasets = {
            'cifar10': lambda: datasets.CIFAR10(self.root_folder, train=True,
                                                transform=ContrastiveLearningViewGenerator(
                                                    self.get_simclr_pipeline_transform(32),
                                                    n_views),
                                                download=True),
            'stl10': lambda: datasets.STL10(self.root_folder, split='unlabeled',
                                            transform=ContrastiveLearningViewGenerator(
                                                self.get_simclr_pipeline_transform(96),
                                                n_views),
                                            download=True),
            'tinyimagenet': lambda: ImageFolder(
                root=os.path.join(self.root_folder, 'tiny-imagenet-200', 'train'),
                transform=ContrastiveLearningViewGenerator(
                    self.get_simclr_pipeline_transform(64),
                    n_views))
        }
        try:
            dataset_fn = valid_datasets[name]
        except KeyError:
            raise InvalidDatasetSelection()
        else:
            return dataset_fn()

# backbone encoder dan projection head
Encoder ini mengubah gambar mentah menjadi representasi fitur berdimensi tinggi yang lebih bermakna. Fitur inilah yang nantinya digunakan untuk proses pembelajaran representasi. Backbone biasanya dipilih yang sudah teruji performanya, sehingga hasil ekstraksi fitur lebih optimal.

Kemudian pada projection head, lapisan tambahan berupa multilayer perceptron (MLP) digunakan untuk memetakan fitur hasil encoder ke ruang embedding yang lebih kecil. Ruang ini berguna untuk membandingkan kemiripan antar sampel dalam proses contrastive learning. Dengan adanya projection head, representasi fitur menjadi lebih terstruktur dan siap digunakan untuk perhitungan loss.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models


class ResNetSimCLR(nn.Module):
    def __init__(self, base_model, out_dim):
        super(ResNetSimCLR, self).__init__()
        self.resnet_dict = {
            "resnet18": models.resnet18(weights=None),
            "resnet34": models.resnet34(weights=None),   # backbone baru
            "resnet50": models.resnet50(weights=None),
        }
        self.backbone = self._get_basemodel(base_model)
        dim_mlp = self.backbone.fc.in_features

 # Projection head baru: Linear -> BN -> ReLU -> Linear(out_dim)
        self.backbone.fc = nn.Sequential(
        nn.Linear(dim_mlp, dim_mlp*2),
        nn.BatchNorm1d(dim_mlp*2),
        nn.ReLU(inplace=True),
        nn.Linear(dim_mlp*2, args.out_dim)
    )


    def _get_basemodel(self, model_name):
        try:
            model = self.resnet_dict[model_name]
        except KeyError:
            raise InvalidBackboneError("Backbone invalid. Gunakan resnet18, resnet34, atau resnet50.")
        else:
            return model

    def forward(self, x):
        return self.backbone(x)

# loss function
berbasis contrastive, seperti NT-Xent Loss. Loss ini mengukur seberapa mirip representasi dua gambar augmentasi dari gambar yang sama dibandingkan dengan gambar lain. Jika representasi dua augmentasi dari gambar yang sama semakin dekat, loss akan semakin kecil. Mekanisme ini melatih model untuk mengelompokkan representasi yang relevan dan menjauhkan yang tidak relevan.

In [None]:
class SimCLR:
    def __init__(self, model, optimizer, scheduler, args):
        self.model = model.to(args.device)
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.args = args

    def nt_xent_loss(self, z1, z2, temperature=0.5):
        z1 = F.normalize(z1, dim=1)
        z2 = F.normalize(z2, dim=1)
        N = z1.size(0)
        z = torch.cat([z1, z2], dim=0)  # [2N, D]
        sim = torch.matmul(z, z.t())    # [2N, 2N]
        mask = torch.eye(2 * N, dtype=torch.bool, device=z.device)
        sim = sim[~mask].view(2 * N, 2 * N - 1)
        pos = torch.sum(z1 * z2, dim=-1)
        pos = torch.cat([pos, pos], dim=0)
        sim = sim / temperature
        denom = torch.logsumexp(sim, dim=1)
        loss = -pos / temperature + denom
        return loss.mean()

    def train(self, train_loader):
        self.model.train()
        loss_history = []
        for epoch in range(1, self.args.epochs + 1):
            running = 0.0
            for step, (views, _) in enumerate(train_loader, start=1):
                x1, x2 = views
                x1, x2 = x1.to(self.args.device), x2.to(self.args.device)
                z1 = self.model(x1)
                z2 = self.model(x2)
                loss = self.nt_xent_loss(z1, z2, temperature=self.args.temperature)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                running += loss.item()
                if step % self.args.log_every_n_steps == 0:
                    print(f"Epoch {epoch}/{self.args.epochs} Step {step}/{len(train_loader)} Loss {running/self.args.log_every_n_steps:.4f}")
                    running = 0.0
            self.scheduler.step()
            loss_history.append(loss.item())
            print(f"[Epoch {epoch}] Loss: {loss.item():.4f}")
        return loss_history

# training loop
proses pelatihan dilakukan secara iteratif dalam beberapa epoch. Di setiap iterasi, batch data diambil, diaugmentasi, diekstraksi fiturnya oleh backbone encoder, lalu diproyeksikan oleh projection head. Kemudian loss dihitung dan bobot model diperbarui menggunakan optimizer. Proses ini diulang terus hingga model belajar menghasilkan representasi yang stabil dan berkualitas.

In [None]:
import torch
from types import SimpleNamespace
import torch.backends.cudnn as cudnn
from torchvision import datasets, transforms

# ======================
# Setup Argumen
# ======================
args = SimpleNamespace()
args.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Ganti path dataset Tiny-ImageNet kalau berbeda
# Struktur: tiny-imagenet-200/{train, val}
args.data = '/kaggle/input/tiny-imagenet'

cudnn.deterministic = True
cudnn.benchmark = True

args.dataset_name = 'tinyimagenet'
args.n_views = 2
args.batch_size = 512
args.out_dim = 256
args.lr = 0.0003
args.weight_decay = 1e-4
args.arch = 'resnet50'   # bisa diganti 'resnet18'
args.workers = 2
args.gpu_index = 0
args.log_dir = './logs/simclr'
args.fp16_precision = True
args.epochs = 10
args.temperature = 0.5
args.seed = 1
args.log_every_n_steps = 50

# ======================
# Dataset
# ======================
dataset = ContrastiveLearningDataset(args.data)

# Train dataset (self-supervised pretraining)
train_dataset = dataset.get_dataset(args.dataset_name, args.n_views)
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=min(args.batch_size, 512),   # 512 disarankan
    shuffle=True,
    num_workers=max(2, args.workers, 4),    # 4‚Äì8 biasanya oke di P100
    pin_memory=True,
    persistent_workers=True,
    prefetch_factor=4,
    drop_last=True
)

# Validation dataset (evaluasi setelah training)
val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
])

# Path val Tiny-ImageNet
val_dataset = datasets.ImageFolder(root=f"{args.data}/tiny-imagenet-200/val", transform=val_transform)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=args.batch_size,
    shuffle=False,
    num_workers=args.workers,
    pin_memory=True
)

# ======================
# Model + Optimizer + Scheduler
# ======================
model = ResNetSimCLR(base_model=args.arch, out_dim=args.out_dim)
optimizer = torch.optim.Adam(model.parameters(), args.lr, weight_decay=args.weight_decay)

# Cosine LR scheduler
total_steps = args.epochs * len(train_loader)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=total_steps)


# ======================
# Training SimCLR
# ======================
simclr = SimCLR(model=model, optimizer=optimizer, scheduler=scheduler, args=args)

# Pretraining di train_loader
loss_history = simclr.train(train_loader)

# ======================
# (Optional) Evaluasi pakai val_loader
# ======================
# Contoh evaluasi: ambil embedding dan hitung loss / akurasi kNN
# val_loss = simclr.evaluate(val_loader)


In [None]:

plt.figure()
plt.plot(range(1, len(loss_history)+1), loss_history, marker='o')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss ‚Äî SimCLR (Modded)")
plt.grid(True)
plt.show()


In [None]:
import os
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torchvision.transforms import InterpolationMode
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import shutil
from pathlib import Path
from torchvision.models import resnet50, ResNet50_Weights
from sklearn.model_selection import train_test_split

# ============================================================
# üîπ 1. Ekstraksi fitur backbone (pra-projection)
# ============================================================
@torch.no_grad()
def extract_backbone_features(model, x):
    """Ambil fitur sebelum projection head (h)."""
    old_fc = model.backbone.fc       # ‚úÖ ambil fc dari backbone
    model.backbone.fc = nn.Identity()
    h = model.backbone(x)            # ‚úÖ forward backbone
    model.backbone.fc = old_fc       # balikin fc
    return h


@torch.no_grad()
def collect_features_backbone(dataloader, model, device, max_batches=None):
    """
    Kumpulkan fitur backbone (pra-projection head) dari dataset.
    - dataloader: ImageFolder return (x, y)
    - model: SimCLR backbone
    """
    model.eval()
    feats, labels = [], []
    for i, (inputs, y) in enumerate(dataloader, start=1):
        x = inputs.to(device)
        h = extract_backbone_features(model, x)  # ‚úÖ backbone features
        feats.append(h.cpu())
        labels.append(y)
        if max_batches is not None and i >= max_batches:
            break
    return torch.cat(feats).numpy(), torch.cat(labels).numpy()



# ============================================================
# üîπ 2. Transformasi evaluasi sederhana
# ============================================================
eval_tf = transforms.Compose([
    transforms.Resize((64, 64), interpolation=InterpolationMode.BICUBIC),
    transforms.ToTensor()
])

# ============================================================
# üîπ 3. Load dataset Tiny-ImageNet val ‚Üí ImageFolder style
# ============================================================
data_root = args.data
val_images = os.path.join(data_root, "tiny-imagenet-200", "val", "images")

if os.path.isdir(val_images):
    print("üîß Menyiapkan val set...")

    val_dir = Path(data_root) / "tiny-imagenet-200/val"
    ann_file = val_dir / "val_annotations.txt"
    target_base = Path("/kaggle/working/val_split")
    target_base.mkdir(exist_ok=True)

    # bikin struktur val_split/class_name/*.JPEG (sekali saja)
    if not any(target_base.iterdir()):
        with open(ann_file, "r") as f:
            for line in f:
                parts = line.strip().split("\t")
                img_name, class_name = parts[0], parts[1]
                class_dir = target_base / class_name
                class_dir.mkdir(exist_ok=True)
                shutil.copy(val_dir / "images" / img_name, class_dir / img_name)

    val_images = target_base
else:
    print("‚ö†Ô∏è val/images tidak ditemukan, fallback pakai train/")
    val_images = os.path.join(data_root, "tiny-imagenet-200", "train")

# ============================================================
# üîπ 4. Dataset & Dataloader
# ============================================================
val_ds = ImageFolder(str(val_images), transform=eval_tf)
val_loader = torch.utils.data.DataLoader(val_ds, batch_size=512, shuffle=False, num_workers=args.workers)

# ============================================================
# üîπ 5. Ekstraksi fitur backbone
# ============================================================
feats, labels = collect_features_backbone(val_loader, model, args.device, max_batches=20)

# ============================================================
# üîπ 6. Visualisasi t-SNE & PCA
# ============================================================
# t-SNE
tsne = TSNE(n_components=2, perplexity=30, n_iter=500, verbose=1, init="pca", learning_rate="auto")
emb_tsne = tsne.fit_transform(feats)
plt.figure()
plt.scatter(emb_tsne[:, 0], emb_tsne[:, 1], c=labels, s=5, alpha=0.6)
plt.title("t-SNE of backbone features (h)")
plt.xlabel("dim 1"); plt.ylabel("dim 2")
plt.show()

# PCA
pca = PCA(n_components=2)
emb_pca = pca.fit_transform(feats)
plt.figure()
plt.scatter(emb_pca[:, 0], emb_pca[:, 1], c=labels, s=5, alpha=0.6)
plt.title("PCA of backbone features (h)")
plt.xlabel("PC 1"); plt.ylabel("PC 2")
plt.show()

# ============================================================
# üîπ 7. Evaluasi Kuantitatif (Linear Probe & k-NN)
# ============================================================
# NOTE: idealnya pakai split train/val. 
# Kalau dataset train tersedia, ganti val_loader jadi train_loader untuk Xtr/Ytr.
# Di sini contoh: kita pakai val saja untuk demonstrasi (train=val).
# ‚úÖ Perbaikan: bagi data train/test agar tidak overfitting
Xtr, Xte, Ytr, Yte = train_test_split(
    feats, labels, test_size=0.2, random_state=42, stratify=labels
)

scaler = StandardScaler(with_mean=True, with_std=True)
Xtr_s = scaler.fit_transform(Xtr)
Xte_s = scaler.transform(Xte)

# Linear Probe (LogReg)
clf = LogisticRegression(max_iter=2000, n_jobs=-1, multi_class="multinomial")
clf.fit(Xtr_s, Ytr)
print("Linear probe acc:", accuracy_score(Yte, clf.predict(Xte_s)))

# k-NN dengan cosine distance
knn = KNeighborsClassifier(n_neighbors=20, metric="cosine")
knn.fit(Xtr_s, Ytr)
print("kNN acc:", accuracy_score(Yte, knn.predict(Xte_s)))



# ============================================================
# üîπ 8. Catatan tuning (opsional)
# ============================================================
#Tips eksperimen:
#- Temperature sweep: coba args.temperature = 0.5 (default 0.5 biasanya stabil), lalu bandingkan 0.3 / 0.7.
#- Augmentasi: kurangi solarize/equalize (p=0.05‚Äì0.1), random affine derajat kecil.
#- Projection head: gunakan MLP 2-layer + BatchNorm (contoh di bawah):

out_dim = 256  

# Backbone ResNet50 tanpa pretrained
backbone = resnet50(weights=None)

dim_mlp = backbone.fc.in_features  # ambil dimensi feature terakhir

backbone.fc = nn.Sequential(
    nn.Linear(dim_mlp, dim_mlp*2),
    nn.BatchNorm1d(dim_mlp*2),
    nn.ReLU(inplace=True),
    nn.Linear(dim_mlp*2, out_dim)
)



In [None]:
print("Train unique labels:", np.unique(Ytr))
print("Val unique labels:", np.unique(Yte))
