# Evaluation of SSL methods

In this session we are going to implement the following evaluation modalities of SSL methods:
- kNN classifier
- linear probe
- MLP

We are going to use the ImageNet pretrained model of pytorch as pretrained model and test their performance of CIFAR10 (as we have already done in the CNN lab classes before in the course).

In [19]:
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
from sklearn.neighbors import KNeighborsClassifier

In [2]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])


batch_size = 64

device = torch.device('cuda')

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:12<00:00, 13.2MB/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


In [3]:
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.eval()
    model.fc.train()
    train_loss = 0
    for batch_idx, (data, target) in enumerate(train_loader): #per ogni batch di dati e target
        data, target = data.to(device), target.to(device) #sposto i dati e i target sulla GPU
        optimizer.zero_grad() #azzera i gradienti
        output = model(data) #calcola l'output della rete (viene eseguito il forward) per ogni dato del batch
        loss = criterion(output, target) #calcola la loss
        train_loss += loss.item()
        loss.backward() #calcola il gradiente
        optimizer.step() #aggiorna i pesi
        if batch_idx % 500 == 0: #se il numero del batch è divisibile per 500
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
    train_loss /= len(train_loader)
    return train_loss

In [4]:
def test(model, device, test_loader, criterion):
    model.eval() #setta il modello in evaluation mode
    test_loss = 0 #inizializza la loss totale a 0
    correct = 0 #inizializza il conteggio delle predizioni corrette a 0
    with torch.no_grad(): #non vengono calcolati i gradienti, infatti ciò non serve in fase di testing
        for data, target in test_loader: #per ogni batch di dati e target
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()  # somma la loss del batch alla loss totale
            pred = output.argmax(dim=1, keepdim=True)  # ottiene gli indici delle classi predette per ogni dato del batch
            correct += pred.eq(target.view_as(pred)).sum().item() #somma il numero di predizioni corrette del batch e
                                                                  #lo aggiunge al conteggio totale di predizioni corrette

    test_loss /= len(test_loader.dataset) #calcola la loss media sul dataset di test

    test_accuracy = correct / len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * test_accuracy))
    return test_loss, test_accuracy

## Exercise 0

Create a linear probe on CIFAR10 using the pretrained model of torchvision, models.resnet18(pretrained=True).

Keep attention to set the model in evaluation mode with model.eval() during the training of the linear layer, since all the layers of the pretrained model should be used in eval mode (eg the batch norm/dropout layers, if present).

In [None]:
net = models.resnet18(pretrained=True)
#net = torch.nn.Sequential(*(list(backbone.children())[:-1]), torch.nn.Flatten())
for param in net.parameters():
    param.requires_grad = False  # Congela i pesi del modello pre-addestrato

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 166MB/s]


In [None]:
fc1 = nn.Linear(512, 10)

net.fc = fc1

net.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [None]:
# the main loop
train_losses = []
test_losses = []
test_accuracies = []
model_state_dict = None
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.fc.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-04)
epochs = 10

for epoch in range(1, epochs + 1):
    train_loss = train(net, device, trainloader, optimizer, criterion, epoch)
    train_losses.append(train_loss)
    test_loss, test_acc = test(net, device, testloader, criterion)
    test_losses.append(test_loss)
    test_accuracies.append(test_acc)


Test set: Average loss: 0.0560, Accuracy: 4807/10000 (48%)


Test set: Average loss: 0.0464, Accuracy: 5419/10000 (54%)


Test set: Average loss: 0.0455, Accuracy: 5151/10000 (52%)


Test set: Average loss: 0.0472, Accuracy: 5224/10000 (52%)


Test set: Average loss: 0.0397, Accuracy: 5508/10000 (55%)


Test set: Average loss: 0.0524, Accuracy: 4914/10000 (49%)


Test set: Average loss: 0.0559, Accuracy: 5121/10000 (51%)


Test set: Average loss: 0.0462, Accuracy: 5325/10000 (53%)


Test set: Average loss: 0.0621, Accuracy: 4551/10000 (46%)


Test set: Average loss: 0.0479, Accuracy: 4968/10000 (50%)



## Exercise 1

Compare the linear evaluation with the MLP strategy.

Implement now a MLP on top of the model and train just this part.
Also in this case keep attention to set the model in evaluation mode with model.eval() during the training of the linear layer, since all the layers of the pretrained model should be used in eval mode (eg the batch norm/dropout layers, if present).

In [None]:
net = models.resnet18(pretrained=True)

fc1 = nn.Linear(512, 256)
fc2 = nn.Linear(256, 128)
fc3 = nn.Linear(128, 10)

# Replace the model's classifier with a new sequential layer
# that includes the new fc1 and the modified fc
net.fc = nn.Sequential(
    fc1,
    nn.ReLU(),   # Optional: Add an activation function like ReLU
    fc2,
    nn.ReLU(),
    fc3
)
net.to(device)



ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [None]:
for param in net.parameters():
    param.requires_grad = False  # Congela tutte le parti del modello

# Sblocca solo i parametri del nuovo MLP
for param in net.fc.parameters():
    param.requires_grad = True

In [None]:
# the main loop
train_losses = []
test_losses = []
test_accuracies = []
model_state_dict = None
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.fc.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-04)
epochs = 10

for epoch in range(1, epochs + 1):
    train_loss = train(net, device, trainloader, optimizer, criterion, epoch)
    train_losses.append(train_loss)
    test_loss, test_acc = test(net, device, testloader, criterion)
    test_losses.append(test_loss)
    test_accuracies.append(test_acc)


Test set: Average loss: 0.0228, Accuracy: 5146/10000 (51%)


Test set: Average loss: 0.0219, Accuracy: 5316/10000 (53%)


Test set: Average loss: 0.0221, Accuracy: 5485/10000 (55%)


Test set: Average loss: 0.0213, Accuracy: 5501/10000 (55%)


Test set: Average loss: 0.0216, Accuracy: 5554/10000 (56%)


Test set: Average loss: 0.0226, Accuracy: 5445/10000 (54%)


Test set: Average loss: 0.0220, Accuracy: 5569/10000 (56%)


Test set: Average loss: 0.0217, Accuracy: 5482/10000 (55%)


Test set: Average loss: 0.0225, Accuracy: 5504/10000 (55%)


Test set: Average loss: 0.0214, Accuracy: 5545/10000 (55%)



## Exercise 2

Implement the KNN classifier. Test several values of k.

In [15]:
net = models.resnet18(pretrained=True)
net.fc = nn.Identity()
for param in net.parameters():
    param.requires_grad = False  # Congela i pesi del modello pre-addestrato

net.to(device)



ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [25]:
features=[]
labels=[]

for batch_idx, (data, target) in enumerate(trainloader):
    data=data.to(device)
    with torch.no_grad():
        feature_batch=net(data)
    features.append(feature_batch.cpu())
    labels.append(target.cpu())


features=torch.cat(features, dim=0)
labels=torch.cat(labels, dim=0).numpy()

mean = features.mean(dim=0)
std = features.std(dim=0)

# Normalizza le feature
features = ((features - mean) / std).numpy()

In [30]:
k=30
test_features=[]
test_labels=[]


knn=KNeighborsClassifier(n_neighbors=k, metric='cosine')
knn.fit(features, labels)

for batch_idx, (data, target) in enumerate(testloader):
    data=data.to(device)
    with torch.no_grad():
        feature_batch=net(data)
    test_features.append(feature_batch.cpu())
    test_labels.append(target.cpu())

test_features=torch.cat(test_features, dim=0)
test_labels=torch.cat(test_labels, dim=0).numpy()

mean = test_features.mean(dim=0)
std = test_features.std(dim=0)

# Normalizza le feature
test_features = ((test_features - mean) / std).numpy()

test_accuracy=knn.score(test_features, test_labels)

print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Test Accuracy: 35.47%
