# Self-Supervised Contrastive Deep Learning with SimCLR

### SimCLR: a Simple framework for Contrastive Learning of visual Representations




**Self-supervised:** since it relies on nonhuman-made labels in the pre-training phase,

**Contrastive:** since the concept is to compare/contrast between input samples and distinguish similar from non-similar,

**Deep:** since a deep nerural network is utilized for the learning,



**Overview:**

In terms of money, time and effort, it's expensive to produce ground truth labels for the entire collected dataset. 
This phenomena is widely known as label-scarcity and can hinder machine learning enigeers and data scientists 
from building models with considerable accuracy due to small-scale ground thruth at their disposal.

Self-supervised learning is a novel learning concept that is gaining fame in research in the recent years.
The priciple to pre-train then fine-tune. 
In situations where only a small portion of the collected dataset is annotated while the larger portion is left unannotated,
self-supervised learning offers a new dimension.
Briefly, the steps:

    1. Split the dataset into 1) labeled subset, and 2) unlabled subset,
    
    2.1 Use the unlabeled subset in a pre-training phase, train a model according to some critieria (e.g. SimCLR),
    
    2.2 Produce labels automatically using an augmentation module.
    
    3. Save and copy the optimal parameters obtained through the pre-training phase,
   
    4. Perform transfer learning,
    
    5. Retrain a new model on the labeled (ground truth) subset with the help of parameters obtained previously
    
Amongst many pre-training frameworks, SimCLR is one of the most prominent examples.



**Dataset:**

We use a garbage collection image dataset that contains 4,661 labeled images of different classes (battery, bio, clothes, metal, paper, plastic, ... ),
and 10,854 unlabeled images of the same classes.

## First Experiment:
### Train a supervised classifier on the labeled dataset only.

### 1. Import necessary libraries

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np

### 2. Load and prepare the dataset

In [2]:
# Define data transforms (adjust as needed for grayscale images)
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to match model input size
    transforms.Grayscale(num_output_channels=3),  # Convert grayscale to RGB
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet normalization
])

# Load the dataset
path_to_data = '/kaggle/input/garbage-collection/Garbage Labeled/Garbage Labeled'
all_dataset = datasets.ImageFolder(root=path_to_data, transform=transform)

# Define the split ratio
train_ratio = 0.7
test_ratio = 0.3
total_size = len(all_dataset)
train_size = int(train_ratio * total_size)
test_size = total_size - train_size

# Split the dataset
train_dataset, test_dataset = random_split(all_dataset, [train_size, test_size])

# Create DataLoaders for training and testing
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

### 3. Construct Model (CNN): a Residual Network of 18 layers.

In [3]:
# Load  ResNet-18
model_notPretrained = models.resnet18(pretrained=False) 
# Get the number of input features for the last fully connected layer
num_features = model_notPretrained.fc.in_features  
# Modify the output layer for your specific problem
model_notPretrained.fc = nn.Linear(num_features, 12)  



### 4. Training Loop:

In [5]:
## %%time
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_notPretrained.parameters(), lr=0.001)

num_epochs = 20  
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('Device: ', device)
model_notPretrained.to(device)

for epoch in range(num_epochs):
    model_notPretrained.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model_notPretrained(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch [{epoch + 1}/{num_epochs}] Loss: {running_loss / len(train_loader)}")

print("Training completed.")


Device:  cuda
Epoch [1/20] Loss: 1.839085805649851
Epoch [2/20] Loss: 1.5969772759605856
Epoch [3/20] Loss: 1.482143078364578
Epoch [4/20] Loss: 1.3593986501880722
Epoch [5/20] Loss: 1.3123501007463418
Epoch [6/20] Loss: 1.2593569796459347
Epoch [7/20] Loss: 1.1642034398574455
Epoch [8/20] Loss: 1.1042927176344628
Epoch [9/20] Loss: 1.02481443975486
Epoch [10/20] Loss: 0.9450821318462783
Epoch [11/20] Loss: 0.879255511597091
Epoch [12/20] Loss: 0.8098385836563858
Epoch [13/20] Loss: 0.7338304326814764
Epoch [14/20] Loss: 0.6454079872837254
Epoch [15/20] Loss: 0.596867490048502
Epoch [16/20] Loss: 0.48953332357546864
Epoch [17/20] Loss: 0.35638870627564545
Epoch [18/20] Loss: 0.28485892683851954
Epoch [19/20] Loss: 0.2668719331131262
Epoch [20/20] Loss: 0.1911693909662027
Training completed.


### 5. Testing Loop:

In [6]:
model_notPretrained.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_notPretrained(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy_before_transfer_learning = np.mean(np.array(all_preds) == np.array(all_labels))
conf_matrix = confusion_matrix(all_labels, all_preds)
class_report = classification_report(all_labels, all_preds)

print(f"Accuracy: {accuracy_before_transfer_learning:.4f}")

Accuracy: 0.5054


## Second Experiment: 
### Train an unsupervised model using SimCLR on the unlabeled dataset, 
###  then perform transfer learning, and retrain the model on the labeled dataset.

### 1. Import necessary libraries

In [9]:
import pytorch_lightning as pl
import torchvision
from PIL import Image
from sklearn.preprocessing import normalize
from lightly.data import LightlyDataset
from lightly.transforms import SimCLRTransform, utils

# you might need to install lighty, simply run: !pip install lightly 

### 2. Necessary configurations

In [10]:
num_workers = 8
batch_size = 256
seed = 1
max_epochs = 20
input_size = 224
num_ftrs = 32
pl.seed_everything(seed)

1

### 3. Load and prepare unlabeled dataset

In [11]:
path_to_data = '/kaggle/input/garbage-collection/Garbage Unlabeled/Garbage Unlabeled'

# The following transform will return two augmented images per input image.
transform = SimCLRTransform(input_size=input_size, vf_prob=0.5, rr_prob=0.5)
# vf_prob: Probability that vertical flip is applied.
# rr_prob: Probability that random rotation is applied.

# We create a torchvision transformation for embedding the dataset after training
test_transform = torchvision.transforms.Compose(
    [
        torchvision.transforms.Resize((input_size, input_size)),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(
            mean=utils.IMAGENET_NORMALIZE["mean"],
            std=utils.IMAGENET_NORMALIZE["std"],
        ),
    ]
)


dataset_train_simclr = LightlyDataset(input_dir=path_to_data, transform=transform)

dataloader_train_simclr = torch.utils.data.DataLoader(
    dataset_train_simclr,
    batch_size=batch_size,
    shuffle=True,
    drop_last=True,
    num_workers=num_workers,
)



### 4. Construct SimCLR Model:

In [12]:
from lightly.loss import NTXentLoss
from lightly.models.modules.heads import SimCLRProjectionHead


class SimCLRModel(pl.LightningModule):
    def __init__(self):
        super().__init__()

        # create a ResNet backbone and remove the classification head
        resnet = torchvision.models.resnet18()
        self.backbone = nn.Sequential(*list(resnet.children())[:-1])

        hidden_dim = resnet.fc.in_features
        self.projection_head = SimCLRProjectionHead(hidden_dim, hidden_dim, 128)
        
        # use a criterion for self-supervised learning # (normalized temperature-scaled cross entropy loss)
        self.criterion = NTXentLoss(temperature=0.5)

    def forward(self, x):
        h = self.backbone(x).flatten(start_dim=1)
        z = self.projection_head(h)
        return z

    def training_step(self, batch, batch_idx):
        (x0, x1), _, _ = batch
        z0 = self.forward(x0)
        z1 = self.forward(x1)
        loss = self.criterion(z0, z1)
        self.log("train_loss_ssl", loss)
        return loss

    def configure_optimizers(self):
        # get a PyTorch optimizer
        optim = torch.optim.SGD(self.parameters(), lr=6e-2, momentum=0.9, weight_decay=5e-4)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, max_epochs)
        return [optim], [scheduler]

### 5. Train the SimCLR Model

In [13]:
model = SimCLRModel()
trainer = pl.Trainer(max_epochs=max_epochs, devices=1, accelerator="gpu")
trainer.fit(model, dataloader_train_simclr)
print('Pre-training is complete.')

  self.pid = os.fork()
/opt/conda/lib/python3.10/site-packages/pytorch_lightning/loops/fit_loop.py:298: The number of training batches (42) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Training: |          | 0/? [00:00<?, ?it/s]

  self.pid = os.fork()


Pre-training is complete.


### 6. Save the Pre-trained Model

In [14]:
#You could use the pre-trained model and train a classifier on top.
pretrained_resnet_backbone = model.backbone

#You can also store the backbone and use it in another code
state_dict = {"resnet18_parameters": pretrained_resnet_backbone.state_dict()}
torch.save(state_dict, "simclr_garbage.pth")

### 7. Perform Transfer Learning

In [15]:
model_1 = models.resnet18(pretrained=False)  # Load  ResNet-18
num_features = model_1.fc.in_features  # Get the number of input features for the last fully connected layer
model_1.fc = nn.Linear(num_features, 12)  # Modify the output layer for your specific problem

model_2 = models.resnet18(pretrained=True)  # Instantiate a new ResNet-18 model
model_2.fc = nn.Linear(model_2.fc.in_features, 12)  # Modify the output layer for your specific problem
# Load the saved SimCLR pretrained weights
path_to_pretrained = 'simclr_garbage.pth'
model_2.load_state_dict(torch.load(path_to_pretrained), strict=False)


# Set the model2 to evaluation mode (important to prevent dropout, batch normalization, etc., from affecting forward pass)
model_2.eval()


# Transfer weights from model2 to model1, excluding the last layer (output layer)
model_1_dict = model_1.state_dict()
model_2_dict = model_2.state_dict()

# Filter out unnecessary keys from model2_dict
model_2_dict = {k: v for k, v in model_2_dict.items() if k in model_1_dict and 'fc' not in k}

# Update model_1_dict with values from model_2_dict
model_1_dict.update(model_2_dict)

# Load the updated state_dict into model_1
model_1.load_state_dict(model_1_dict)

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 148MB/s] 
  model_2.load_state_dict(torch.load(path_to_pretrained), strict=False)


<All keys matched successfully>

### 8. Retrain the Supervised Model with the Weights Obtained trough Pre-training

In [16]:
%%time
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_1.parameters(), lr=0.001)

num_epochs = 20  
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('Device: ', device)
model_1.to(device)

for epoch in range(num_epochs):
    model_1.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model_1(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch [{epoch + 1}/{num_epochs}] Loss: {running_loss / len(train_loader)}")

print("Training completed.")


Device:  cuda
Epoch [1/20] Loss: 1.161772913792554
Epoch [2/20] Loss: 0.7451676957163156
Epoch [3/20] Loss: 0.5213552676871711
Epoch [4/20] Loss: 0.44773638058526843
Epoch [5/20] Loss: 0.30860307169895546
Epoch [6/20] Loss: 0.24355591450105696
Epoch [7/20] Loss: 0.24272805711656226
Epoch [8/20] Loss: 0.2018017700380262
Epoch [9/20] Loss: 0.17227554308506204
Epoch [10/20] Loss: 0.12211893288893443
Epoch [11/20] Loss: 0.13374521726669342
Epoch [12/20] Loss: 0.08304232521000884
Epoch [13/20] Loss: 0.04813294818022234
Epoch [14/20] Loss: 0.08096590420737972
Epoch [15/20] Loss: 0.14850555443405813
Epoch [16/20] Loss: 0.0978841343255458
Epoch [17/20] Loss: 0.11204620728762273
Epoch [18/20] Loss: 0.06582138011478544
Epoch [19/20] Loss: 0.027853412582201188
Epoch [20/20] Loss: 0.03252619634640366
Training completed.
CPU times: user 15min 5s, sys: 24.7 s, total: 15min 29s
Wall time: 8min 35s


### 9. Test Again and Report Accuracy

In [17]:
model_1.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_1(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy_after_transfer_learning = np.mean(np.array(all_preds) == np.array(all_labels))
conf_matrix = confusion_matrix(all_labels, all_preds)
class_report = classification_report(all_labels, all_preds)

print(f"Accuracy: {accuracy_after_transfer_learning:.4f}")

Accuracy: 0.8313


### 10. Compare Accuracy Before and After Transfer Learning

In [31]:
print(f"Accuracy before Transfer Learning: {(accuracy_before_transfer_learning)*100:.0f}%")

print(f"Accuracy After  Transfer Learning: {(accuracy_after_transfer_learning)*100:.0f}%")


Accuracy before Transfer Learning: 51%
Accuracy After  Transfer Learning: 83%


In [33]:
improvement = (accuracy_before_transfer_learning/accuracy_after_transfer_learning)*100
print(f" The classification accuracy has been improved by {improvement:.0f}% thanks to pre-training using SimCLR while keeping the size of the labeled data unchanged.")

 The classification accuracy has been improved by 61% thanks to pre-training using SimCLR while keeping the size of the labeled data unchanged.
