In [1]:
import copy

import pytorch_lightning as pl
import torch
import torch.nn as nn
import torchvision

from lightly.data import LightlyDataset, SimCLRCollateFunction, collate
from lightly.loss import NTXentLoss
from lightly.models import ResNetGenerator
from lightly.models.modules.heads import MoCoProjectionHead
from lightly.models.utils import (
    batch_shuffle,
    batch_unshuffle,
    deactivate_requires_grad,
    update_momentum,
)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
num_workers = 8
batch_size = 512
memory_bank_size = 4096
seed = 1
max_epochs = 1
input_size=32

In [3]:
path_to_train = "../Datasets/cifar10/train/"
path_to_test = "../Datasets/cifar10/test/"

In [4]:
pl.seed_everything(seed)

Global seed set to 1


1

In [5]:
# MoCo v2 uses SimCLR augmentations, additionally, disable blur
collate_fn = SimCLRCollateFunction(
    input_size=input_size,
    gaussian_blur=0.0,
)

In [6]:
# Augmentations typically used to train on cifar-10
train_classifier_transforms = torchvision.transforms.Compose(
    [
        torchvision.transforms.RandomCrop(input_size, padding=4),
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(
            mean=collate.imagenet_normalize["mean"],
            std=collate.imagenet_normalize["std"],
        ),
    ]
)

# No additional augmentations for the test set
test_transforms = torchvision.transforms.Compose(
    [
        torchvision.transforms.Resize((input_size, input_size)),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(
            mean=collate.imagenet_normalize["mean"],
            std=collate.imagenet_normalize["std"],
        ),
    ]
)
 




In [7]:

# We use the moco augmentations for training moco
dataset_train_simclr = LightlyDataset(input_dir=path_to_train)

# Since we also train a linear classifier on the pre-trained moco model we
# reuse the test augmentations here (MoCo augmentations are very strong and
# usually reduce accuracy of models which are not used for contrastive learning.
# Our linear layer will be trained using cross entropy loss and labels provided
# by the dataset. Therefore we chose light augmentations.)
dataset_train_classifier = LightlyDataset(
    input_dir=path_to_train, transform=train_classifier_transforms
)

dataset_test = LightlyDataset(input_dir=path_to_test, transform=test_transforms)

In [16]:
dataloader_train_simclr = torch.utils.data.DataLoader(
    dataset_train_simclr,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn,
    drop_last=True,
    num_workers=num_workers,
)

dataloader_train_classifier = torch.utils.data.DataLoader(
    dataset_train_classifier,
    batch_size=1,
    shuffle=True,
    drop_last=True,
    num_workers=num_workers,
)

dataloader_test = torch.utils.data.DataLoader(
    dataset_test,
    batch_size=1,
    shuffle=False,
    drop_last=False,
    num_workers=num_workers,
)

In [18]:
from lightly.loss import NTXentLoss
from lightly.models.modules.heads import SimCLRProjectionHead


class SimCLRModel(pl.LightningModule):
    def __init__(self):
        super().__init__()

        # create a ResNet backbone and remove the classification head
        resnet = torchvision.models.resnet18()
        self.backbone = nn.Sequential(*list(resnet.children())[:-1])

        hidden_dim = resnet.fc.in_features
        self.projection_head = SimCLRProjectionHead(hidden_dim, hidden_dim, 128)

        self.criterion = NTXentLoss()

    def forward(self, x):
        h = self.backbone(x).flatten(start_dim=1)
        z = self.projection_head(h)
        return z

    def training_step(self, batch, batch_idx):
        (x0, x1), _, _ = batch
        z0 = self.forward(x0)
        z1 = self.forward(x1)
        loss = self.criterion(z0, z1)
        self.log("train_loss_ssl", loss)
        return loss

    def configure_optimizers(self):
        optim = torch.optim.SGD(
            self.parameters(), lr=6e-2, momentum=0.9, weight_decay=5e-4
        )
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, max_epochs)
        return [optim], [scheduler]

In [19]:
class Classifier(pl.LightningModule):
    def __init__(self, backbone):
        super().__init__()
        # use the pretrained ResNet backbone
        self.backbone = backbone

        # freeze the backbone
        deactivate_requires_grad(backbone)

        # create a linear layer for our downstream classification model
        self.fc = nn.Linear(512, 10)

        self.criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        y_hat = self.backbone(x).flatten(start_dim=1)
        y_hat = self.fc(y_hat)
        return y_hat

    def training_step(self, batch, batch_idx):
        x, y, _ = batch
        y_hat = self.forward(x)
        loss = self.criterion(y_hat, y)
        self.log("train_loss_fc", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y, _ = batch
        y_hat = self.forward(x)
        y_hat = torch.nn.functional.softmax(y_hat, dim=1)

        # calculate number of correct predictions
        _, predicted = torch.max(y_hat, 1)
        num = predicted.shape[0]
        correct = (predicted == y).float().sum()

        return num, correct

    def configure_optimizers(self):
        optim = torch.optim.SGD(self.fc.parameters(), lr=30.0)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, max_epochs)
        return [optim], [scheduler]

In [20]:
model = SimCLRModel()
trainer = pl.Trainer(max_epochs=max_epochs, devices=1, accelerator="gpu")
trainer.fit(model, dataloader_train_simclr) 

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name            | Type                 | Params
---------------------------------------------------------
0 | backbone        | Sequential           | 11.2 M
1 | projection_head | SimCLRProjectionHead | 328 K 
2 | criterion       | NTXentLoss           | 0     
---------------------------------------------------------
11.5 M    Trainable params
0         Non-trainable params
11.5 M    Total params
46.022    Total estimated model params size (MB)


Epoch 0: 100%|███████████████████████████████████████████████████████████████| 97/97 [00:33<00:00,  2.91it/s, v_num=22]

`Trainer.fit` stopped: `max_epochs=1` reached.


Epoch 0: 100%|███████████████████████████████████████████████████████████████| 97/97 [00:33<00:00,  2.88it/s, v_num=22]


array([ 0.4850587 ,  0.69055575,  0.57068247, ..., -1.8044444 ,
       -1.8044444 , -1.8044444 ], dtype=float32)

In [None]:
from sklearn import svm
import numpy as np
import torch
from tqdm import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.eval()
model.to(device)

# Initialize features and labels as empty numpy arrays
X_train = np.array([])
y_train = np.array([])

# Wrap your dataloader with tqdm for a progress bar
for image, target, fname in tqdm(dataloader_train_classifier):
    with torch.no_grad():
        # Forward pass to extract features
        # Note: You might need to modify this depending on the output of your SimCLR model
        feature = model(image.to(device)).cpu().numpy().flatten()
        target = target.cpu().numpy().flatten()

        # If features and labels are empty, assign the first feature and label
        # Else, stack the new feature and label as a new row

        if X_train.size == 0 and y_train.size == 0:
            X_train = feature
            y_train = target
        else:
            X_train = np.vstack((X_train, feature))
            y_train = np.hstack((y_train, target))

In [45]:
# Train the SVM classifier
from sklearn import svm
clf = svm.SVC()
clf.fit(X_train, y_train)

In [52]:
X_test.shape

(2166, 128)

In [None]:
from sklearn import svm
from sklearn.metrics import accuracy_score

# Initialize features and labels as empty numpy arrays
X_test = np.array([])
y_test = np.array([])

for image, target, fname in tqdm(dataloader_test):
    with torch.no_grad():
        
        feature = model(image.to(device)).cpu().numpy().flatten()
        target = target.cpu().numpy().flatten()

    if X_test.size == 0 and y_test.size == 0:
        X_test = feature
        y_test = target
    else:
        X_test = np.vstack((X_test, feature))
        y_test = np.hstack((y_test, target))


In [53]:

# Predict labels for test data
X_test_predicted = clf.predict(X_test)

# Calculate accuracy
accuracy = accuracy_score(y_test, X_test_predicted)

print(f"Model accuracy: {accuracy}")


Model accuracy: 0.4242843951985226
