In [17]:
import os
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms

# Użycie biblioteki Lightly do transformacji i komponentów SSL
import pytorch_lightning as pl
from lightly.transforms import SimCLRTransform, DINOTransform, MAETransform
from lightly.transforms.moco_transform import MoCoV2Transform

In [18]:
# Wykrycie urządzenia do trenowania (GPU jeśli dostępne)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)

Using device: cuda


In [19]:
pl.seed_everything(42)  # Ustawienie ziarna dla powtarzalności

Seed set to 42


42

# 1. Przygotowanie danych

In [20]:
def compute_cifar_stats(name="cifar100", root="data", batch_size=5000):
    ds_class = getattr(torchvision.datasets, name.upper())
    ds = ds_class(root, train=True, download=True, transform=transforms.ToTensor())
    loader = torch.utils.data.DataLoader(ds, batch_size=batch_size, num_workers=2, shuffle=False)
    ch_sum = torch.zeros(3)
    ch_sum_sq = torch.zeros(3)
    n_pixels = 0
    for imgs, _ in loader:
        b, c, h, w = imgs.shape
        n_pixels += b * h * w
        ch_sum    += imgs.sum(dim=[0,2,3])
        ch_sum_sq += (imgs**2).sum(dim=[0,2,3])
    mean = ch_sum / n_pixels
    std  = torch.sqrt(ch_sum_sq / n_pixels - mean**2)
    return mean.tolist(), std.tolist()

In [21]:
cifar_mean, cifar_std = compute_cifar_stats()

In [22]:
# --- Przygotowanie zbiorów danych CIFAR10 i CIFAR100 ---
# Transformacje dla trenowania nadzorowanego (baseline i linear probe): 
# losowe przycięcie i odbicie (augmentacja) + normalizacja.
train_transform_supervised = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=cifar_mean, std=cifar_std)
])

# Transformacja dla zbioru testowego (tylko skalowanie do tensoru i normalizacja).
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=cifar_mean, std=cifar_std)
])

# Transformacje dla metod samonadzorowanych:
# - Dla SimCLR/MoCo/BYOL: dwie zaugmentowane wersje obrazu.
transform_simclr = SimCLRTransform(input_size=32)   # input_size=32 dla CIFAR
transform_moco = MoCoV2Transform(input_size=32)  # input_size=32 dla CIFAR
# - Dla DINO: transformacja generująca 2 widoki globalne i 6 lokalnych (domyślnie).
transform_dino = DINOTransform(global_crop_size=32, local_crop_size=16,  # dopasowanie do mniejszych obrazków
                               global_crop_scale=(0.5, 1.0), local_crop_scale=(0.2, 0.5))
# - Dla MAE/SimMIM: jedna widok z losowymi augmentacjami (proste augmentacje).
transform_mae = MAETransform()

# Ładowanie danych CIFAR100 (train i test)
train_dataset_cifar100 = torchvision.datasets.CIFAR100(root='./data', train=True, download=True,
                                                      transform=None)  # transform ustawimy później per metoda
test_dataset_cifar100 = torchvision.datasets.CIFAR100(root='./data', train=False, download=True,
                                                     transform=test_transform)
# Ładowanie danych CIFAR10 (train i test)
train_dataset_cifar10 = torchvision.datasets.CIFAR10(root='./data', train=True, download=True,
                                                    transform=None)
test_dataset_cifar10 = torchvision.datasets.CIFAR10(root='./data', train=False, download=True,
                                                   transform=test_transform)

# Dataloader dla ewaluacji (testy) – tutaj wykorzystamy go do obliczania cech i ewaluacji
test_loader_cifar100 = torch.utils.data.DataLoader(test_dataset_cifar100, batch_size=256, shuffle=False)
test_loader_cifar10 = torch.utils.data.DataLoader(test_dataset_cifar10, batch_size=256, shuffle=False)

# (Opcjonalnie) Przygotowanie zbioru ImageNet-1k, jeżeli dostępny na dysku:
imagenet_train_dir = '/path/to/ImageNet/train'  # <-- Uwaga: ustawić poprawną ścieżkę jeśli dane dostępne
imagenet_val_dir = '/path/to/ImageNet/val'
imagenet_train_dataset = None
imagenet_val_dataset = None
if os.path.exists(imagenet_train_dir):
    # Transformacje dla ImageNet: wymiary 224x224 jak w standardowych modelach
    transform_simclr_imagenet = SimCLRTransform(input_size=224)
    transform_dino_imagenet = DINOTransform()  # domyślne parametry dla DINO (224 global, 96 lokal)
    transform_mae_imagenet = MAETransform()
    # transformacje dla baseline i linear eval na ImageNet (przycięcie centralne dla val)
    train_transform_supervised_imnet = transforms.Compose([
        transforms.RandomResizedCrop(224, scale=(0.2, 1.0)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.485, 0.456, 0.406),
                             std=(0.229, 0.224, 0.225))
    ])
    val_transform_imnet = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.485, 0.456, 0.406),
                             std=(0.229, 0.224, 0.225))
    ])
    # Używamy ImageFolder do wczytania danych z katalogu
    imagenet_train_dataset = torchvision.datasets.ImageFolder(root=imagenet_train_dir,
                                                              transform=None)  # transform ustawimy dynamicznie
    imagenet_val_dataset = torchvision.datasets.ImageFolder(root=imagenet_val_dir,
                                                            transform=val_transform_imnet)
    print("ImageNet datasets prepared.")
else:
    print("ImageNet data not found, skipping ImageNet training in this run.")




ImageNet data not found, skipping ImageNet training in this run.


# 2. Definicje modeli i trenowanie metod self supervised

In [23]:
from timm.models.vision_transformer import vit_base_patch32_224

from lightly.loss import NTXentLoss, DINOLoss
from lightly.models.modules.heads import SimCLRProjectionHead, DINOProjectionHead, BYOLProjectionHead, BYOLPredictionHead,\
    MoCoProjectionHead
from lightly.models import utils
from lightly.models.utils import update_momentum, deactivate_requires_grad,\
    batch_shuffle, batch_unshuffle

from lightly.models.modules import MAEDecoderTIMM, MaskedVisionTransformerTIMM

## 2.1. Trenowanie masked autoencoder (MAE/SimMIM) na zbiorze nieoznaczonym

In [24]:
def pretrain_masked_autoencoder(train_dataset, epochs=20, batch_size=128, lr=1.5e-4):
    """
    Trenuje model typu Masked Autoencoder na podanym zbiorze danych.
    Zwraca wytrenowany encoder (backbone) oraz cały model (encoder+decoder).
    """
    # Ustawiamy transformację dla datasetu (MAETransform przygotowuje random crop i normalizację)
    train_dataset.transform = transform_mae
    train_loader = torch.utils.data.DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True, 
        drop_last=True,
        num_workers=4,
    )
    # Tworzymy model - ViT jako encoder, prosta warstwa liniowa jako decoder (SimMIM styl)
    vit = vit_base_patch32_224(pretrained=False)
    # Dostosowanie: zmieniamy rozmiar wejścia patch (CIFAR obraz 32x32, patch 16 -> 2x2 patchy, to za mało)
    # Alternatywnie: powiększamy obrazy CIFAR do 224 wewnątrz transformacji by użyć ViT patch16.
    # (Tutaj zakładamy, że transformacja MAETransform może wewnętrznie robić resize do 224; jeśli nie, warto dodać Resize(224) do transformacji dla CIFAR.)
    # Budujemy model maskowanego autoenkodera:
    class MAE(pl.LightningModule):
        def __init__(self, vit, lr=1.5e-4):
            super().__init__()

            decoder_dim = 512
            self.mask_ratio = 0.75
            self.patch_size = vit.patch_embed.patch_size[0]
            self.lr = lr
            self.backbone = MaskedVisionTransformerTIMM(vit=vit)
            self.sequence_length = self.backbone.sequence_length
            self.decoder = MAEDecoderTIMM(
                num_patches=vit.patch_embed.num_patches,
                patch_size=self.patch_size,
                embed_dim=vit.embed_dim,
                decoder_embed_dim=decoder_dim,
                decoder_depth=1,
                decoder_num_heads=16,
                mlp_ratio=4.0,
                proj_drop_rate=0.0,
                attn_drop_rate=0.0,
            )
            self.criterion = nn.MSELoss()

        def forward_encoder(self, images, idx_keep=None):
            return self.backbone.encode(images=images, idx_keep=idx_keep)

        def forward_decoder(self, x_encoded, idx_keep, idx_mask):
            # build decoder input
            batch_size = x_encoded.shape[0]
            x_decode = self.decoder.embed(x_encoded)
            x_masked = utils.repeat_token(
                self.decoder.mask_token, (batch_size, self.sequence_length)
            )
            x_masked = utils.set_at_index(x_masked, idx_keep, x_decode.type_as(x_masked))

            # decoder forward pass
            x_decoded = self.decoder.decode(x_masked)

            # predict pixel values for masked tokens
            x_pred = utils.get_at_index(x_decoded, idx_mask)
            x_pred = self.decoder.predict(x_pred)
            return x_pred

        def training_step(self, batch, batch_idx):
            views = batch[0]
            images = views[0]  # views contains only a single view
            batch_size = images.shape[0]
            idx_keep, idx_mask = utils.random_token_mask(
                size=(batch_size, self.sequence_length),
                mask_ratio=self.mask_ratio,
                device=images.device,
            )
            x_encoded = self.forward_encoder(images=images, idx_keep=idx_keep)
            x_pred = self.forward_decoder(
                x_encoded=x_encoded, idx_keep=idx_keep, idx_mask=idx_mask
            )

            # get image patches for masked tokens
            patches = utils.patchify(images, self.patch_size)
            # must adjust idx_mask for missing class token
            target = utils.get_at_index(patches, idx_mask - 1)

            loss = self.criterion(x_pred, target)
            # Log metrics
            self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True)
            return loss

        def configure_optimizers(self):
            optim = torch.optim.AdamW(self.parameters(), lr=self.lr)
            return optim


    # Inicjalizacja modelu i ustawienie na urządzenie
    mae_model = MAE(vit=vit, lr=lr)

    accelerator = 'gpu' if device == 'cuda' else 'cpu'

    trainer = pl.Trainer(
        max_epochs=epochs,
        accelerator=accelerator,
        devices=1,
        precision='16-mixed' if device == 'cuda' else 32,
        log_every_n_steps=20,
        enable_checkpointing= True,
    )

    print(">>> Trenowanie Masked Autoencoder przez {} epok...".format(epochs))
    trainer.fit(mae_model, train_loader)

    return mae_model.backbone, mae_model 


## 2.2. Kontrastywne

### 2.2.1. Trenowanie metody SimCLR (kontrastywna) na zbiorze nieoznaczonym

In [25]:
def pretrain_simclr(train_dataset, epochs=20, batch_size=128, lr=6e-2):
    """
    Trenuje model SimCLR (ResNet18 + projection head) na podanym zbiorze danych.
    Zwraca wytrenowany backbone (ResNet bez klasyfikatora).
    """
    # Ustawienie transformacji dwóch widoków na dataset
    train_dataset.transform = transform_simclr
    train_loader = torch.utils.data.DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True, 
        drop_last=True,
        num_workers=4,
    )
    
    class SimCLR(pl.LightningModule):
        def __init__(self):
            super().__init__()

            # create a ResNet backbone and remove the classification head
            resnet = torchvision.models.resnet18(pretrained=False)
            self.backbone = nn.Sequential(*list(resnet.children())[:-1])

            hidden_dim = resnet.fc.in_features
            self.projection_head = SimCLRProjectionHead(hidden_dim, hidden_dim, 128)

            self.criterion = NTXentLoss()

        def forward(self, x):
            h = self.backbone(x).flatten(start_dim=1)
            z = self.projection_head(h)
            return z

        def training_step(self, batch, batch_idx):
            (x0, x1), _, _ = batch
            z0 = self.forward(x0)
            z1 = self.forward(x1)
            loss = self.criterion(z0, z1)
            self.log("train_loss", loss)
            return loss

        def configure_optimizers(self):
            optim = torch.optim.SGD(
                self.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4
            )
            scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, epochs)
            return [optim], [scheduler]
        

    simclr_model = SimCLR()
    accelerator = 'gpu' if device == 'cuda' else 'cpu'

    trainer = pl.Trainer(
        max_epochs=epochs,
        devices=1,
        accelerator=accelerator,
        precision='16-mixed' if device == 'cuda' else 32,
        log_every_n_steps=20,
        enable_checkpointing=True,
    )

    print(">>> Trenowanie SimCLR przez {} epok...".format(epochs))
    trainer.fit(simclr_model, train_loader)
    return simclr_model.backbone

### 2.2.2. Trenowanie metody MoCo (Momentum Contrast) na zbiorze nieoznaczonym

In [26]:
def pretrain_moco(train_dataset, epochs=20, batch_size=128, lr=0.06, memory_bank_size=4096):
    """
    Trenuje model MoCo v2 (ResNet18 z encoderem kluczy i kolejką) na podanym zbiorze danych.
    Zwraca wytrenowany backbone (encoder zapytań).
    """
    # Ustawienie transformacji dwóch widoków (tak jak SimCLR)
    train_dataset.transform = transform_moco
    train_loader = torch.utils.data.DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True, 
        drop_last=True,
        num_workers=4,
    )
 
    class MoCo(pl.LightningModule):
        def __init__(self):
            super().__init__()
            resnet = torchvision.models.resnet18()
            self.backbone = nn.Sequential(*list(resnet.children())[:-1])
            hidden_dim = resnet.fc.in_features
            self.projection_head = MoCoProjectionHead(hidden_dim, hidden_dim, 128)

            self.backbone_momentum = copy.deepcopy(self.backbone)
            self.projection_head_momentum = copy.deepcopy(self.projection_head)

            deactivate_requires_grad(self.backbone_momentum)
            deactivate_requires_grad(self.projection_head_momentum)

            self.criterion = NTXentLoss(memory_bank_size=(memory_bank_size, 128))

        def forward(self, x):
            query = self.backbone(x).flatten(start_dim=1)
            query = self.projection_head(query)
            return query

        def forward_momentum(self, x):
            key = self.backbone_momentum(x).flatten(start_dim=1)
            key = self.projection_head_momentum(key).detach()
            return key

        def training_step(self, batch, batch_idx):
                (x_q, x_k), _, _ = batch

                # update momentum
                update_momentum(self.backbone, self.backbone_momentum, 0.99)
                update_momentum(self.projection_head, self.projection_head_momentum, 0.99)

                # get queries
                q = self.backbone(x_q).flatten(start_dim=1)
                q = self.projection_head(q)

                # get keys
                k, shuffle = batch_shuffle(x_k)
                k = self.backbone_momentum(k).flatten(start_dim=1)
                k = self.projection_head_momentum(k)
                k = batch_unshuffle(k, shuffle)

                loss = self.criterion(q, k)
                self.log("train_loss", loss)
                return loss

        def on_train_epoch_end(self):
            self.custom_histogram_weights()

        # We provide a helper method to log weights in tensorboard
        # which is useful for debugging.
        def custom_histogram_weights(self):
            for name, params in self.named_parameters():
                self.logger.experiment.add_histogram(name, params, self.current_epoch)

        def configure_optimizers(self):
            optim = torch.optim.SGD(
                self.parameters(),
                lr=lr,
                momentum=0.9,
                weight_decay=5e-4,
            )
            scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, epochs)
            return [optim], [scheduler]        

    moco_model = MoCo()
    accelerator = 'gpu' if device == 'cuda' else 'cpu'

    trainer = pl.Trainer(
        max_epochs=epochs,
        devices=1,
        accelerator=accelerator,
        precision='16-mixed' if device == 'cuda' else 32,
        log_every_n_steps=20,
        enable_checkpointing=True,
    )

    print(">>> Trenowanie MoCo v2 przez {} epok...".format(epochs))
    trainer.fit(moco_model, train_loader)
    return moco_model.backbone


## 2.3. Self-distillation

### 2.3.1. Trenowanie metody BYOL (Bootstrap Your Own Latent) na zbiorze nieoznaczonym

In [27]:
def pretrain_byol(train_dataset, epochs=20, batch_size=128, lr=1e-3):
    """
    Trenuje model BYOL (ResNet18 online + target) na podanym zbiorze danych.
    Zwraca wytrenowany backbone (online network).
    """
    # Ustawienie transformacji dwóch widoków (BYOL używa podobnych augmentacji jak SimCLR, ewentualnie dodając solarization, tutaj korzystamy z SimCLRTransform)
    train_dataset.transform = transform_simclr
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
    # Definiujemy backbone sieci online i tworzymy kopię do sieci target
    resnet = torchvision.models.resnet18(pretrained=False)
    online_backbone = nn.Sequential(*list(resnet.children())[:-1])
    backbone_output_dim = resnet.fc.in_features  # 512
    target_backbone = copy.deepcopy(online_backbone)
    deactivate_requires_grad(target_backbone)  # sieć target nie ma gradientów
    # Projekcja (MLP) i predykcja dla sieci online, projekcja dla sieci target
    online_proj = BYOLProjectionHead(input_dim=backbone_output_dim, hidden_dim=backbone_output_dim, output_dim=256)
    online_pred = BYOLPredictionHead(input_dim=256, hidden_dim=256, output_dim=256)
    target_proj = copy.deepcopy(online_proj)
    deactivate_requires_grad(target_proj)
    # Optymalizujemy tylko parametry online (backbone, proj, pred)
    optimizer = torch.optim.Adam(list(online_backbone.parameters()) + list(online_proj.parameters()) + list(online_pred.parameters()), lr=lr)
    online_backbone.to(device); online_proj.to(device); online_pred.to(device)
    target_backbone.to(device); target_proj.to(device)
    online_backbone.train(); online_proj.train(); online_pred.train()
    target_backbone.eval(); target_proj.eval()
    momentum = 0.996  # współczynnik momentum do uaktualniania target network
    print(">>> Trenowanie BYOL przez {} epok...".format(epochs))
    for epoch in range(epochs):
        total_loss = 0.0
        for (views, _) in train_loader:
            x_a, x_b = views[0].to(device), views[1].to(device)  # dwie augmentacje
            # Forward przez online network dla obu widoków
            feat_a = online_backbone(x_a).flatten(start_dim=1)
            feat_b = online_backbone(x_b).flatten(start_dim=1)
            proj_a = online_proj(feat_a)
            proj_b = online_proj(feat_b)
            pred_a = online_pred(proj_a)  # predykcja dla a
            pred_b = online_pred(proj_b)  # predykcja dla b
            # Forward przez target network (bez grad)
            with torch.no_grad():
                # momentum update target sieci
                update_momentum(online_backbone, target_backbone, m=momentum)
                update_momentum(online_proj, target_proj, m=momentum)
                # (target_pred nie ma, bo target sieć kończy na projekcji)
                target_feat_a = target_backbone(x_a).flatten(start_dim=1)
                target_feat_b = target_backbone(x_b).flatten(start_dim=1)
                target_proj_a = target_proj(target_feat_a)
                target_proj_b = target_proj(target_feat_b)
            # Normalizacja wektorów projekcji i predykcji
            pred_a_norm = F.normalize(pred_a, dim=1)
            pred_b_norm = F.normalize(pred_b, dim=1)
            target_a_norm = F.normalize(target_proj_b.detach(), dim=1)  # UWAGA: pred_a porównujemy z target z drugiego widoku
            target_b_norm = F.normalize(target_proj_a.detach(), dim=1)
            # Obliczenie straty MSE pomiędzy znormalizowanymi predykcjami online a docelowymi reprezentacjami target
            loss = 2 - 2 * (pred_a_norm * target_a_norm).sum(dim=1).mean() - 2 * (pred_b_norm * target_b_norm).sum(dim=1).mean()
            # (powyższe to równoważnik: loss = MSE(pred_a_norm, target_a_norm) + MSE(pred_b_norm, target_b_norm))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        print(f"[BYOL] Epoka {epoch+1}/{epochs}, średni loss: {avg_loss:.4f}")
    return online_backbone

### 2.3.2. Trenowanie metody DINO (Distillation with No Labels) na zbiorze nieoznaczonym

In [28]:
def pretrain_dino(train_dataset, epochs=20, batch_size=128, lr=1e-3):
    """
    Trenuje model DINO (ResNet18 student + momentum teacher) na podanym zbiorze danych.
    Zwraca wytrenowany backbone (student backbone).
    """
    # Ustawienie transformacji DINO (wiele widoków) na dataset
    train_dataset.transform = transform_dino
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
    # Definiujemy backbone (ResNet18) dla studenta i kopiujemy dla nauczyciela
    resnet = torchvision.models.resnet18(pretrained=False)
    student_backbone = nn.Sequential(*list(resnet.children())[:-1])
    backbone_output_dim = resnet.fc.in_features  # 512
    teacher_backbone = copy.deepcopy(student_backbone)
    deactivate_requires_grad(teacher_backbone)
    # Głowice projekcyjne DINO dla studenta i nauczyciela.
    # Używamy DINOProjectionHead: parametry (in_dim, hidden_dim, bottleneck_dim, out_dim, [opcje])
    student_head = DINOProjectionHead(input_dim=backbone_output_dim, hidden_dim=512, bottleneck_dim=256, output_dim=2048, freeze_last_layer=1)
    teacher_head = copy.deepcopy(student_head)
    # teacher_head nie zamrażamy w całości, ale podczas optymalizacji nie będziemy go aktualizować (nie jest w optimizer)
    deactivate_requires_grad(teacher_head)
    # Funkcja kosztu DINO – porównuje wyjścia nauczyciela i studenta (zawiera mechanizm centrowania i temperatury)
    criterion = DINOLoss(output_dim=2048, warmup_teacher_temp_epochs=5)
    # Optimizer tylko dla sieci studenta (backbone + head)
    optimizer = torch.optim.Adam(list(student_backbone.parameters()) + list(student_head.parameters()), lr=lr)
    # Harmonogram zmiany współczynnika momentum (od nieco mniejszego do 1)
    # Będziemy liniowo zwiększać momentum nauczyciela od 0.996 do 1.0 w trakcie epok
    initial_momentum = 0.996
    final_momentum = 1.0
    student_backbone.to(device); student_head.to(device)
    teacher_backbone.to(device); teacher_head.to(device)
    student_backbone.train(); student_head.train()
    teacher_backbone.eval(); teacher_head.eval()
    print(">>> Trenowanie DINO przez {} epok...".format(epochs))
    for epoch in range(epochs):
        total_loss = 0.0
        # Wyznacz wartość momentum dla bieżącej epoki (cosine schedule)
        momentum_val = initial_momentum + (final_momentum - initial_momentum) * (epoch / (epochs - 1))
        for batch in train_loader:
            views = batch[0]  # lista widoków augmentowanych (list length = 2 + n_local_views, domyślnie 8)
            # Uaktualnienie wag nauczyciela (backbone i head) - momentum update przed forward
            update_momentum(student_backbone, teacher_backbone, m=momentum_val)
            update_momentum(student_head, teacher_head, m=momentum_val)
            # Przenieś wszystkie widoki na GPU
            views = [v.to(device) for v in views]
            # DINO: nauczyciel otrzymuje tylko 2 globalne widoki (zwykle 224x224), student wszystkie
            # Zakładamy, że transform_dino generuje 2 pierwsze widoki jako "globalne"
            global_views = views[:2]
            # Obliczenia forward:
            # - Teacher output dla globalnych widoków (zatrzymujemy gradient)
            with torch.no_grad():
                teacher_out = [teacher_head(teacher_backbone(v).flatten(start_dim=1)) for v in global_views]
            # - Student output dla wszystkich widoków
            student_out = [student_head(student_backbone(v).flatten(start_dim=1)) for v in views]
            # Oblicz stratę DINO (porównuje rozkłady wyjściowe teacher vs student)
            loss = criterion(teacher_out, student_out, epoch=epoch)
            optimizer.zero_grad()
            loss.backward()
            # DINO zaleca zablokowanie gradientów ostatniej warstwy projekcyjnej studenta na wczesnych epokach (freeze_last_layer)
            student_head.cancel_last_layer_gradients(current_epoch=epoch)
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        print(f"[DINO] Epoka {epoch+1}/{epochs}, średni loss: {avg_loss:.4f}")
    return student_backbone


## 2.4. Model nadzorowany (supervised) na CIFAR100 (baseline)

In [29]:
def train_supervised_classifier(train_dataset, num_classes=100, epochs=20, batch_size=128, lr=0.1):
    """
    Trenuje model klasyfikacyjny (ResNet18) w sposób nadzorowany na podanym zbiorze (z etykietami).
    Zwraca wytrenowany model (backbone + klasyfikator).
    """
    # Ustawiamy transformacje augmentacyjne dla treningu nadzorowanego
    train_dataset.transform = train_transform_supervised
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
    # Tworzymy model ResNet18 z random inicjalizacją (num_classes wyjściowych)
    model = torchvision.models.resnet18(pretrained=False, num_classes=num_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    model.train()
    print(">>> Trenowanie modelu nadzorowanego (ResNet18) przez {} epok...".format(epochs))
    for epoch in range(epochs):
        total_loss = 0.0
        correct = 0
        total = 0
        for (images, labels) in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            # Obliczanie dokładności bieżącej partii (dla monitorowania)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
        avg_loss = total_loss / len(train_loader)
        acc = 100.0 * correct / total
        print(f"[Supervised] Epoka {epoch+1}/{epochs}, loss: {avg_loss:.4f}, accuracy: {acc:.2f}%")
        scheduler.step()
    return model

## 2.5. Trenowanie

In [30]:
# ---- Wykonanie treningów dla poszczególnych metod na CIFAR100 ----
print("\n=== Rozpoczęcie treningów SSL na CIFAR100 ===")
# Etap podstawowy:
backbone_mae_cifar100, mae_model = pretrain_masked_autoencoder(train_dataset_cifar100, epochs=10, batch_size=256)
backbone_simclr_cifar100 = pretrain_simclr(train_dataset_cifar100, epochs=10, batch_size=256)
# Etap pośredni (dodatkowo trening MoCo i supervised baseline na CIFAR100):
backbone_moco_cifar100 = pretrain_moco(train_dataset_cifar100, epochs=10, batch_size=256)
# supervised_model_cifar100 = train_supervised_classifier(train_dataset_cifar100, num_classes=100, epochs=10, batch_size=128)
# backbone_supervised_cifar100 = nn.Sequential(*list(supervised_model_cifar100.children())[:-1])  # ekstrakcja backbone z modelu nadzorowanego
# Etap zaawansowany (self-distillation metody BYOL i DINO na CIFAR100):
# backbone_byol_cifar100 = pretrain_byol(train_dataset_cifar100, epochs=10, batch_size=128)
# backbone_dino_cifar100 = pretrain_dino(train_dataset_cifar100, epochs=10, batch_size=128)
print("=== Zakończono pretraining SSL na CIFAR100 ===\n")

# (Opcjonalnie) Trenowanie na ImageNet-1k dla etapów pośredniego/zaawansowanego
if imagenet_train_dataset is not None:
    print("=== Rozpoczęcie treningów SSL na ImageNet-1k (skala demonstracyjna) ===")
    # Ustawiamy odpowiednie transformacje dla ImageNet i tworzymy DataLoader
    imagenet_train_dataset.transform = transform_simclr_imagenet
    imnet_loader = torch.utils.data.DataLoader(imagenet_train_dataset, batch_size=256, shuffle=True, drop_last=True)
    # Dla przykładu trenujemy SimCLR i BYOL na ImageNet kilka epok (w praktyce potrzeba znacznie więcej)
    backbone_simclr_imnet = pretrain_simclr(imagenet_train_dataset, epochs=2, batch_size=256, lr=0.1)
    backbone_moco_imnet = pretrain_moco(imagenet_train_dataset, epochs=2, batch_size=256, lr=0.1, memory_bank_size=65536)
    # backbone_byol_imnet = pretrain_byol(imagenet_train_dataset, epochs=2, batch_size=256, lr=1e-3)
    # backbone_dino_imnet = pretrain_dino(imagenet_train_dataset, epochs=2, batch_size=256, lr=1e-3)
    # Dla ImageNet również można by przeprowadzić linear probing lub ewaluację na CIFAR, ale pomijamy dalsze szczegóły w tym kodzie demonstracyjnym.
    print("=== Zakończono pretraining SSL na ImageNet-1k ===\n")



=== Rozpoczęcie treningów SSL na CIFAR100 ===


Using 16bit Automatic Mixed Precision (AMP)
Using default `ModelCheckpoint`. Consider installing `litmodels` package to enable `LitModelCheckpoint` for automatic upload to the Lightning model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


>>> Trenowanie Masked Autoencoder przez 10 epok...



  | Name      | Type                        | Params | Mode 
------------------------------------------------------------------
0 | backbone  | MaskedVisionTransformerTIMM | 88.2 M | train
1 | decoder   | MAEDecoderTIMM              | 5.1 M  | train
2 | criterion | MSELoss                     | 0      | train
------------------------------------------------------------------
93.3 M    Trainable params
64.0 K    Non-trainable params
93.4 M    Total params
373.497   Total estimated model params size (MB)
292       Modules in train mode
0         Modules in eval mode


Epoch 9: 100%|██████████| 195/195 [01:08<00:00,  2.84it/s, v_num=9, train_loss_step=0.281, train_loss_epoch=0.292]

`Trainer.fit` stopped: `max_epochs=10` reached.


Epoch 9: 100%|██████████| 195/195 [01:22<00:00,  2.37it/s, v_num=9, train_loss_step=0.281, train_loss_epoch=0.292]


Using 16bit Automatic Mixed Precision (AMP)
Using default `ModelCheckpoint`. Consider installing `litmodels` package to enable `LitModelCheckpoint` for automatic upload to the Lightning model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name            | Type                 | Params | Mode 
-----------------------------------------------------------------
0 | backbone        | Sequential           | 11.2 M | train
1 | projection_head | SimCLRProjectionHead | 328 K  | train
2 | criterion       | NTXentLoss           | 0      | train
-----------------------------------------------------------------
11.5 M    Trainable params
0         Non-trainable params
11.5 M    Total params
46.022    Total estimated model params size (MB)
77        Modules in train mode
0         Modules in eval mode


>>> Trenowanie SimCLR przez 10 epok...
Epoch 0:   0%|          | 0/195 [00:00<?, ?it/s] 

ValueError: not enough values to unpack (expected 3, got 2)

Epoch 3:   0%|          | 0/390 [27:32<?, ?it/s, v_num=9, train_loss_step=0.347, train_loss_epoch=0.355]
Epoch 4:   0%|          | 0/195 [18:32<?, ?it/s, v_num=9, train_loss_step=0.330, train_loss_epoch=0.360]

Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x0000026D744B1260>
Traceback (most recent call last):
  File "e:\programowanie\studia\sem6\WB2\.venv\Lib\site-packages\torch\utils\data\dataloader.py", line 1663, in __del__
    self._shutdown_workers()
  File "e:\programowanie\studia\sem6\WB2\.venv\Lib\site-packages\torch\utils\data\dataloader.py", line 1621, in _shutdown_workers
    if self._persistent_workers or self._workers_status[worker_id]:
                                   ^^^^^^^^^^^^^^^^^^^^
AttributeError: '_MultiProcessingDataLoaderIter' object has no attribute '_workers_status'





# 3. Ewaluacja reprezentacji -- linear probing i KNN

In [None]:
# Funkcja do ewaluacji linear probing: trenowanie liniowego klasyfikatora na zamrożonym backbone
def evaluate_linear(backbone, train_dataset, test_dataset, num_classes, epochs=5, lr=0.01):
    """
    Trenuje liniowy klasyfikator (1 warstwa) na cechach zadanego backbone (który pozostaje zamrożony).
    Zwraca accuracy [%] na zbiorze testowym.
    """
    # Zamrażamy backbone
    backbone.eval()
    for param in backbone.parameters():
        param.requires_grad = False
    # Definiujemy prosty model: backbone (zamrożony) + linear layer
    class LinearModel(nn.Module):
        def __init__(self, backbone, num_classes):
            super().__init__()
            self.backbone = backbone
            # Warstwa liniowa biorąca wektor cech (np. 512) na klasy
            self.fc = nn.Linear(backbone_output_dim, num_classes)
        def forward(self, x):
            with torch.no_grad():
                feats = self.backbone(x).flatten(start_dim=1)  # cechy ze backbone
            out = self.fc(feats)
            return out
    # Określamy wymiar wyjściowy backbone (zakładamy że znamy, np. 512 dla ResNet18, lub pobieramy automatycznie)
    backbone_output_dim = None
    # Spróbujmy wyznaczyć wymiar cech backbone poprzez przepuszczenie jednego batcha testowego
    backbone_output_dim = None
    for (images, _) in test_loader_cifar100:  # używamy CIFAR100 test loader by nie modyfikować datasetu
        with torch.no_grad():
            feat = backbone(images.to(device)).flatten(start_dim=1)
        backbone_output_dim = feat.shape[1]
        break
    if backbone_output_dim is None:
        backbone_output_dim = 512  # domyślnie załóż 512
    model = LinearModel(backbone, num_classes).to(device)
    # DataLoader z normalizacją (użyjemy test_transform bo backbone wymaga takich samych normalizacji, a train_transform_supervised zawiera augmentacje które nie są potrzebne przy trenowaniu linear probe)
    train_dataset.transform = test_transform  # stosujemy tylko normalizację do cech wejściowych backbone
    test_dataset.transform = test_transform
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=256, shuffle=False)
    # Optymalizator i strata
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.fc.parameters(), lr=lr, momentum=0.9)
    model.train()
    for epoch in range(epochs):
        total_loss = 0.0
        for (images, labels) in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)  # backbone jest zamrożony, tylko fc się uczy
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        print(f"[Linear Probing] Epoka {epoch+1}/{epochs}, średni loss: {avg_loss:.4f}")
    # Ewaluacja na test
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for (images, labels) in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    accuracy = 100.0 * correct / total
    print(f"Linear probe accuracy: {accuracy:.2f}%")
    return accuracy

# Funkcja do ewaluacji k-NN (k sąsiadów) na cechach backbone
def evaluate_knn(backbone, train_dataset, test_dataset, k=5):
    """
    Ocena jakości reprezentacji za pomocą klasyfikacji k-NN.
    Zwraca accuracy [%] na zbiorze testowym, wykorzystując k najbliższych sąsiadów z train_dataset.
    """
    backbone.eval()
    # Ekstrahujemy wszystkie cechy ze zbioru treningowego
    train_dataset.transform = test_transform  # upewnij się, że te same przekształcenia co dla testu
    test_dataset.transform = test_transform
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=False)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=256, shuffle=False)
    train_features = []
    train_labels = []
    with torch.no_grad():
        for (images, labels) in train_loader:
            images = images.to(device)
            feats = backbone(images).flatten(start_dim=1)
            # Normalizujemy cechy, aby użyć odległości kosinusowej (opcjonalnie)
            feats = F.normalize(feats, dim=1)
            train_features.append(feats.cpu())
            train_labels.append(labels.cpu())
    train_features = torch.cat(train_features, dim=0)  # [N_train, feature_dim]
    train_labels = torch.cat(train_labels, dim=0)      # [N_train]
    # Obliczamy cechy dla testu
    test_features = []
    test_labels_list = []
    with torch.no_grad():
        for (images, labels) in test_loader:
            images = images.to(device)
            feats = backbone(images).flatten(start_dim=1)
            feats = F.normalize(feats, dim=1)
            test_features.append(feats.cpu())
            test_labels_list.append(labels.cpu())
    test_features = torch.cat(test_features, dim=0)  # [N_test, feature_dim]
    test_labels = torch.cat(test_labels_list, dim=0)
    # K-NN: dla każdego testowego punktu znajdujemy k najbliższych w train_features
    total = test_features.shape[0]
    correct = 0
    # Dla efektywności, liczymy w partiach
    batch_size_eval = 100  # batch dla pętli testowej KNN, by nie przekroczyć pamięci
    for i in range(0, total, batch_size_eval):
        end = min(i + batch_size_eval, total)
        test_batch = test_features[i:end]  # [batch_eval, feature_dim]
        # Obliczamy macierz odległości (kosinusowych) między test_batch a wszystkimi train_features
        # Ponieważ cechy są znormalizowane, odległość kosinusowa ~ 1 - cos(sim) => najbliższy = największy cos
        # Można więc znaleźć top-k największych iloczynów skalarnych:
        similarities = torch.matmul(test_batch, train_features.T)  # [batch_eval, N_train]
        # Pobieramy indeksy top-k najbliższych train dla każdego test
        topk_vals, topk_idxs = torch.topk(similarities, k, dim=1)
        # Głosowanie większościowe
        for idxs, true_label in zip(topk_idxs, test_labels[i:end]):
            neigh_labels = train_labels[idxs]  # etykiety k najbliższych
            # najczęściej występująca etykieta
            predicted_label = torch.mode(neigh_labels).values.item()
            if predicted_label == true_label.item():
                correct += 1
    accuracy = 100.0 * correct / total
    print(f"k-NN accuracy (k={k}): {accuracy:.2f}%")
    return accuracy

# Ewaluacja wszystkich metod na CIFAR100 i CIFAR10
print("\n=== Ewaluacja reprezentacji (CIFAR100 i CIFAR10) ===")
results = {}  # słownik na wyniki accuracy
# Linear probe i kNN dla każdej metody:
methods = [
    ("MAE (masked)", backbone_mae_cifar100),
    ("SimCLR", backbone_simclr_cifar100),
    ("MoCo", backbone_moco_cifar100),
    ("BYOL", backbone_byol_cifar100),
    ("DINO", backbone_dino_cifar100),
    ("Supervised", backbone_supervised_cifar100)
]
for name, backbone in methods:
    print(f"\nOcena dla {name}:")
    acc_cifar100_linear = evaluate_linear(backbone, torchvision.datasets.CIFAR100(root='./data', train=True, transform=None),
                                         torchvision.datasets.CIFAR100(root='./data', train=False, transform=None),
                                         num_classes=100, epochs=5, lr=0.01)
    acc_cifar100_knn = evaluate_knn(backbone, torchvision.datasets.CIFAR100(root='./data', train=True, transform=None),
                                    torchvision.datasets.CIFAR100(root='./data', train=False, transform=None), k=5)
    acc_cifar10_linear = evaluate_linear(backbone, torchvision.datasets.CIFAR10(root='./data', train=True, transform=None),
                                        torchvision.datasets.CIFAR10(root='./data', train=False, transform=None),
                                        num_classes=10, epochs=5, lr=0.01)
    acc_cifar10_knn = evaluate_knn(backbone, torchvision.datasets.CIFAR10(root='./data', train=True, transform=None),
                                   torchvision.datasets.CIFAR10(root='./data', train=False, transform=None), k=5)
    results[name] = {
        "CIFAR100_linear": acc_cifar100_linear,
        "CIFAR100_kNN": acc_cifar100_knn,
        "CIFAR10_linear": acc_cifar10_linear,
        "CIFAR10_kNN": acc_cifar10_knn
    }
print("\n=== Podsumowanie wyników (accuracy) ===")
for name, res in results.items():
    print(f"{name}: CIFAR100 linear={res['CIFAR100_linear']:.2f}%, CIFAR100 kNN={res['CIFAR100_kNN']:.2f}%, "
          f"CIFAR10 linear={res['CIFAR10_linear']:.2f}%, CIFAR10 kNN={res['CIFAR10_kNN']:.2f}%")


# 4. Klasyczne metodt reprezentacji: PCA, t-SNE, UMAP

In [None]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

try:
    import umap
    from umap import UMAP
except ImportError:
    UMAP = None

In [None]:
# Funkcja pomocnicza do trenowania i ewaluacji linear classifier na podanych cechach
def train_and_eval_on_features(train_feats, train_labels, test_feats, test_labels, epochs=100, lr=0.1):
    """
    Trenuje prostą regresję logistyczną (lub perceptron) na dostarczonych cechach i etykietach.
    Zwraca accuracy na zbiorze testowym.
    """
    # Zamieniamy dane na tensory PyTorch
    X_train = torch.tensor(train_feats, dtype=torch.float32)
    y_train = torch.tensor(train_labels, dtype=torch.long)
    X_test = torch.tensor(test_feats, dtype=torch.float32)
    y_test = torch.tensor(test_labels, dtype=torch.long)
    # Definicja prostej sieci: wejście -> num_classes (softmax w cross entropy)
    model = nn.Linear(X_train.shape[1], len(torch.unique(y_train)))
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    model.train()
    dataset = torch.utils.data.TensorDataset(X_train, y_train)
    loader = torch.utils.data.DataLoader(dataset, batch_size=256, shuffle=True)
    for epoch in range(epochs):
        total_loss = 0.0
        for batch_x, batch_y in loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        # (opcjonalnie można dodać early stopping, tu pominieto dla prostoty)
    # Ewaluacja
    model.eval()
    with torch.no_grad():
        X_test = X_test.to(device)
        y_test = y_test.to(device)
        outputs = model(X_test)
        _, preds = torch.max(outputs, 1)
        acc = 100.0 * (preds == y_test).sum().item() / y_test.size(0)
    return acc

# 4.1 PCA jako metoda ekstrakcji cech
print("\n=== PCA: trenowanie na pikselach CIFAR100 ===")
# Przygotuj dane treningowe CIFAR100 (obrazy spłaszczone do wektora 3072)
train_data = torchvision.datasets.CIFAR100(root='./data', train=True, transform=test_transform)
test_data = torchvision.datasets.CIFAR100(root='./data', train=False, transform=test_transform)
X_train = train_data.data.reshape(train_data.data.shape[0], -1) / 255.0  # normalizujemy 0-1
y_train = np.array(train_data.targets)
X_test = test_data.data.reshape(test_data.data.shape[0], -1) / 255.0
y_test = np.array(test_data.targets)
# Uczymy PCA na treningowych i transformujemy do 50 wymiarów
pca = PCA(n_components=50)
pca.fit(X_train)
X_train_pca = pca.transform(X_train)
X_test_pca = pca.transform(X_test)
print("Wariancja wyjaśniana przez 50 składowych PCA: {:.2f}%".format(100 * np.sum(pca.explained_variance_ratio_)))
# Trenujemy classifier na cechach PCA
acc_pca_cifar100 = train_and_eval_on_features(X_train_pca, y_train, X_test_pca, y_test, epochs=100, lr=0.1)
print(f"Accuracy klasyfikatora liniowego na 50 wymiarach PCA (CIFAR100): {acc_pca_cifar100:.2f}%")

# 4.2 t-SNE i UMAP - redukcja do 2D dla wizualizacji i ew. kNN
print("\n=== t-SNE i UMAP: wizualizacja klastrów ===")
# Weźmy reprezentacje wybranego modelu, np. DINO lub SimCLR, dla testowych danych CIFAR10 do wizualizacji
backbone_for_viz = backbone_dino_cifar100  # np. backbone DINO wytrenowany na CIFAR100
# Ekstrahujemy cechy dla wszystkich obrazów testowych CIFAR10
test_features = []
test_labels = []
backbone_for_viz.eval()
with torch.no_grad():
    for (images, labels) in test_loader_cifar10:
        images = images.to(device)
        feats = backbone_for_viz(images).flatten(start_dim=1)
        feats = feats.cpu().numpy()
        test_features.append(feats)
        test_labels.extend(labels.numpy())
test_features = np.vstack(test_features)  # [10000, feature_dim]
test_labels = np.array(test_labels)
# t-SNE redukcja do 2 wymiarów (może chwilę potrwać)
tsne = TSNE(n_components=2, init='pca', random_state=42, perplexity=30)
test_feats_2d_tsne = tsne.fit_transform(test_features[:2000])  # próbka 2000 punktów dla szybkości
labels_sample = test_labels[:2000]
print("t-SNE ukończone dla 2000 punktów.")
# (Opcjonalnie) UMAP redukcja do 2D
if UMAP is not None:
    reducer = UMAP(n_components=2, random_state=42)
    test_feats_2d_umap = reducer.fit_transform(test_features[:2000])
    print("UMAP ukończony dla 2000 punktów.")
# Można w tym miejscu wykonać prostą wizualizację scatter plot (np. matplotlib),
# ale z uwagi na środowisko tekstowe, pominiemy rysowanie grafiki.
# Zamiast tego, dokonamy oceny kNN w przestrzeni 2D t-SNE jako ciekawostkę:
knn_acc_tsne = None
if test_feats_2d_tsne is not None:
    # policzmy kNN (k=5) w przestrzeni t-SNE dla próbkowanych danych
    tree = None
    try:
        from sklearn.neighbors import KNeighborsClassifier
        knn = KNeighborsClassifier(n_neighbors=5)
        knn.fit(test_feats_2d_tsne, labels_sample)
        preds = knn.predict(test_feats_2d_tsne)
        knn_acc_tsne = 100.0 * np.mean(preds == labels_sample)
        print(f"k-NN accuracy w przestrzeni t-SNE (na próbie 2000, k=5): {knn_acc_tsne:.2f}%")
    except ImportError:
        pass
