# Transfer Learning – ImageNet-R

## Instalacja zależności

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

!pip install -q torchmetrics codecarbon pytorch_lightning

## Przygotowanie środowiska i danych

In [None]:
from google.colab import drive, files
files.upload() # Służy do umieszczenia pliku kaggle.json
drive.mount('/content/drive')

!mkdir -p /content/transfer
!tar -xvf /content/drive/MyDrive/imagenet-r.tar -C /content/transfer

!mkdir -p ~/.kaggle
!mv /content/kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download sautkin/imagenet1kvalid
!unzip /content/imagenet1kvalid.zip -d /content/imagenet1kv1_valid

## Importy

In [None]:
import os, time, subprocess
import numpy as np, pandas as pd
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import ImageFolder
from torchvision.models import (
    efficientnet_v2_s, EfficientNet_V2_S_Weights,
    resnet50, ResNet50_Weights,
    densenet201, DenseNet201_Weights,
    convnext_tiny, ConvNeXt_Tiny_Weights,
    mobilenet_v3_large, MobileNet_V3_Large_Weights
)
import pytorch_lightning as pl
from pytorch_lightning import LightningDataModule, LightningModule
from sklearn.metrics import accuracy_score, f1_score
from codecarbon import EmissionsTracker

torch.backends.cudnn.benchmark = True

## Konfiguracja modeli

In [None]:
MODEL_CONFIGS = {
    "efficientnet_v2_s":  (efficientnet_v2_s,   EfficientNet_V2_S_Weights.IMAGENET1K_V1),
    "resnet50":           (resnet50,            ResNet50_Weights.IMAGENET1K_V1),
    "densenet201":        (densenet201,         DenseNet201_Weights.IMAGENET1K_V1),
    "convnext_tiny":      (convnext_tiny,       ConvNeXt_Tiny_Weights.IMAGENET1K_V1),
    "mobilenet_v3_large": (mobilenet_v3_large,  MobileNet_V3_Large_Weights.IMAGENET1K_V1),
}

## LightningDataModule

In [None]:
class TLDataModule(LightningDataModule):
    def __init__(self, data_dir, transform, batch_size):
        super().__init__()
        self.data_dir = data_dir
        self.transform = transform
        self.batch_size = batch_size

    def setup(self, stage=None):
        full_dataset = ImageFolder(self.data_dir, transform=self.transform)
        total_len = len(full_dataset)
        train_len = int(0.8 * total_len)
        val_len = total_len - train_len
        self.train_ds, self.val_ds = random_split(
            full_dataset, [train_len, val_len], generator=torch.Generator().manual_seed(42)
        )

    def train_dataloader(self):
        return DataLoader(self.train_ds, batch_size=self.batch_size, shuffle=True,
                          num_workers=4, pin_memory=True, prefetch_factor=2)

    def val_dataloader(self):
        return DataLoader(self.val_ds, batch_size=self.batch_size, shuffle=False,
                          num_workers=4, pin_memory=True, prefetch_factor=2)

## TransferModel (LightningModule)

In [None]:
class TransferModel(LightningModule):
    def __init__(self, model_fn, weights, num_classes):
        super().__init__()
        self.model = model_fn(weights=weights)
        for param in self.model.parameters():
            param.requires_grad = False

        if hasattr(self.model, "classifier"):
            if isinstance(self.model.classifier, torch.nn.Sequential):
                in_feat = self.model.classifier[-1].in_features
                self.model.classifier[-1] = torch.nn.Linear(in_feat, num_classes)
            elif isinstance(self.model.classifier, torch.nn.Linear):
                in_feat = self.model.classifier.in_features
                self.model.classifier = torch.nn.Linear(in_feat, num_classes)
            else:
                raise ValueError(f"Unsupported classifier type: {type(self.model.classifier)}")
        elif hasattr(self.model, "fc"):
            in_feat = self.model.fc.in_features
            self.model.fc = torch.nn.Linear(in_feat, num_classes)
        elif hasattr(self.model, "head"):
            in_feat = self.model.head.in_features
            self.model.head = torch.nn.Linear(in_feat, num_classes)
        else:
            raise ValueError("Nieobsługiwana architektura – brak standardowej głowy klasyfikatora")

        self.loss_fn = torch.nn.CrossEntropyLoss()
        self.preds, self.labels = [], []

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        preds = logits.argmax(1)
        self.preds.append(preds.cpu())
        self.labels.append(y.cpu())

    def on_validation_epoch_end(self):
        preds = torch.cat(self.preds)
        labels = torch.cat(self.labels)
        acc = accuracy_score(labels, preds)
        f1 = f1_score(labels, preds, average="macro")
        self.log("val_accuracy", acc)
        self.log("val_f1", f1)
        self.preds, self.labels = [], []

    def configure_optimizers(self):
        return torch.optim.AdamW(filter(lambda p: p.requires_grad, self.parameters()), lr=1e-3)

## Funkcja `run_transfer_learning()`

In [None]:
def run_transfer_learning(model_name, model_fn, weights, data_dir, batch_size):
    transform = weights.transforms()
    dm = TLDataModule(data_dir, transform, batch_size)
    dm.setup()

    module = TransferModel(model_fn, weights, num_classes=len(dm.train_ds.dataset.classes))

    params_total = sum(p.numel() for p in module.parameters())
    params_train = sum(p.numel() for p in module.parameters() if p.requires_grad)

    if os.path.exists("/tmp/.codecarbon.lock"):
        os.remove("/tmp/.codecarbon.lock")
    tracker = EmissionsTracker(project_name=f"TL_{model_name}", log_level="error")

    torch.cuda.empty_cache()
    torch.cuda.reset_peak_memory_stats()

    tracker.start()

    t0 = time.time()
    trainer = pl.Trainer(max_epochs=1, accelerator="gpu", devices=1,
                         precision="16-mixed", logger=False,
                         enable_checkpointing=False)
    trainer.fit(module, dm)
    train_time = time.time() - t0

    t0 = time.time()
    trainer.validate(module, datamodule=dm)
    val_time = time.time() - t0

    co2 = tracker.stop()
    peak_mem = torch.cuda.max_memory_allocated() / 1024**2

    print(f"=== {model_name} ===")
    print(f"Total params:     {params_total:,}")
    print(f"Trainable params: {params_train:,}")
    print(f"CO₂:               {co2:.4f} kg")
    print(f"Train time:        {train_time:.1f} s")
    print(f"Val time:          {val_time:.1f} s")
    print(f"Peak GPU memory:   {peak_mem:.0f} MiB")

    torch.cuda.empty_cache()

## Uruchomienie eksperymentu

In [None]:
data_dir = "/content/transfer/imagenet-r"
batch_size = 128

for name, (fn, weights) in MODEL_CONFIGS.items():
    run_transfer_learning(name, fn, weights, data_dir, batch_size)