# Self supervised learning part of the project

In [None]:
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch import cat
import torch.nn.init as init


#Define the network
class Network(nn.Module):

    def __init__(self, classes=500):    #Classes = to number of permutations
        super(Network, self).__init__()

        self.conv = nn.Sequential()
        self.conv.add_module('conv1_s1',nn.Conv2d(3, 16, kernel_size=5))
        self.conv.add_module('relu1_s1',nn.ReLU(inplace=True))
        self.conv.add_module('pool1_s1',nn.MaxPool2d(kernel_size=6, stride=2))
        

        self.conv.add_module('conv2_s1',nn.Conv2d(16, 32, kernel_size=5))
        self.conv.add_module('relu2_s1',nn.ReLU(inplace=True))
        
        

        self.conv.add_module('conv3_s1',nn.Conv2d(32, 64, kernel_size=3))
        self.conv.add_module('relu3_s1',nn.ReLU(inplace=True))
        self.conv.add_module('pool3_s1',nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.conv.add_module('conv4_s1',nn.Conv2d(64, 128, kernel_size=3))
        self.conv.add_module('relu4_s1',nn.ReLU(inplace=True))
        self.conv.add_module('pool4_s1',nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.conv.add_module('conv5_s1',nn.Conv2d(128, 56, kernel_size=3))
        self.conv.add_module('relu5_s1',nn.ReLU(inplace=True))
        

        self.fc6 = nn.Sequential()
        self.fc6.add_module('fc6_s1',nn.Linear(56*3*3, 200))
        self.fc6.add_module('relu6_s1',nn.ReLU(inplace=True))
        self.fc6.add_module('drop6_s1',nn.Dropout(p=0.5))

        self.fc7 = nn.Sequential()
        self.fc7.add_module('fc7',nn.Linear(9*200,1400))
        self.fc7.add_module('relu7',nn.ReLU(inplace=True))
        self.fc7.add_module('drop7',nn.Dropout(p=0.5))

        self.classifier = nn.Sequential()
        self.classifier.add_module('fc8',nn.Linear(1400, classes))

    
    
    def forward(self, x):   
        B,T,C,H,W = x.size()
        x = x.transpose(0,1)

        x_list = []
        for i in range(9):  #9 patches go through the convolutional layers
            z = self.conv(x[i])
            z = self.fc6(z.view(B,-1))
            z = z.view([B,1,-1])
            x_list.append(z)

        x = cat(x_list,1)   #concatenate the 9 results from the convolutional layer for the fully connected part
        x = self.fc7(x.view(B,-1))  
        x = self.classifier(x)

        return x
    

In [None]:
from torchsummary import summary
#summary of the model
model = Network(500)
summary(model, (9,3, 75, 75))

In [None]:
device = torch.device("mps:0") if torch.backends.mps.is_available() else torch.device("cpu")    # Check if GPU is available
print("Device:", device)    # Print the device

In [None]:
import os, sys, numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.autograd import Variable

from JigsawImageLoader import DataLoader    #Use the Dataloader from the original paper

classes = 500   #Number of permutations
batch_size = 68

    
#Define sets and dataloaders   
train_data = DataLoader("/Users/stefanocarotti/Supervised/Project/train_set","/Users/stefanocarotti/Supervised/Project/annot/train_info.csv" ,
                            classes= classes)
train_loader = torch.utils.data.DataLoader(dataset=train_data,
                                            batch_size=batch_size,
                                            shuffle=True,num_workers=4)
    
    
val_data = DataLoader("/Users/stefanocarotti/Supervised/Project/val_set","/Users/stefanocarotti/Supervised/Project/annot/val_info.csv" ,
                            classes=classes)
val_loader = torch.utils.data.DataLoader(dataset=val_data,
                                            batch_size=batch_size,
                                            shuffle=True,
                                            num_workers=4)


# Network initialization
net = Network(classes=classes).to(device)

criterion = nn.CrossEntropyLoss()   # Define the loss function 
optimizer = torch.optim.Adam(net.parameters(),lr=0.0002)  # Define the optimizer
     

In [None]:
    ############## TRAINING ###############
print(('Start training: lr %f, batch size %d, classes %d'%(0.0002,batch_size,classes)))

    
    # Train the Model
epochs = 50
#Initialize loss for plotting
train_losses = []
for epoch in range(epochs):
     prog_bar = tqdm( train_loader, total=len(train_loader))    
     for i, (images, labels, original) in enumerate(prog_bar):
        # Convert torch tensors to Variables and pass them to the GPU 
        images = Variable(images)
        labels = Variable(labels)
        images = images.to(device)
        labels = labels.to(device)

        # Forward + Backward + Optimize
        net.train()
        optimizer.zero_grad()
        outputs = net(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        loss = float(loss.cpu().data.numpy())
        train_losses.append(loss)   #Save the loss for plotting
        
       
     avg_loss = np.mean(train_losses)   #Calculate the average loss for the epoch
    
     print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")
    
    # Validation
     net.eval()
     correct = 0
     total = 0
     prog_bar = tqdm(val_loader, total=len(val_loader))
     for i, (images, labels, original) in enumerate(prog_bar):
        # Convert torch tensors to Variables and pass them to the GPU 
        images = Variable(images)
        labels = Variable(labels)
        images = images.to(device)
        labels = labels.to(device)
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)   #Get the predicted label
        total += labels.size(0)
        correct += (predicted == labels).sum()
     accuracy = 100 * correct / total
     print(f"Validation Accuracy: {accuracy}%")
    
        

In [None]:
#Save the model
torch.save(net.state_dict(), 'convnet.pth')

In [None]:
#Load the model
net = Network(classes=500).to(device)
net.load_state_dict(torch.load('convnet.pth'))

In [None]:
plt.plot(train_losses)

In [None]:
def accuracy(prediction, y):
    """Calculate accuracy."""
    return ((prediction == y).sum() / len(y)).item()

def evaluate(model, testloader):
    model.eval()  # Set the model to evaluation mode
    y_true = []
    y_pred = []
    epoch_loss = 0
    epoch_acc = 0
    prog_bar = tqdm( testloader, total=len(testloader)) 
    for i, (images, labels, original) in enumerate(prog_bar):  # Iterate over the validation data loader
        with torch.no_grad():
            images = images.to(device)  
            labels = labels.to(device)
            prediction = model.forward(images)
            epoch_loss += criterion(prediction, labels)
            epoch_acc += accuracy(torch.argmax(prediction, dim=1), labels)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(torch.argmax(prediction, dim=1).cpu().numpy())
            
    epoch_loss /= len(testloader)  # Calculate the average loss for the epoch
    epoch_acc /= len(testloader)  # Calculate the average accuracy for the epoch        
    return y_true, y_pred, epoch_loss, epoch_acc 

In [None]:
y_true, y_pred, test_loss, test_acc = evaluate(net, val_loader)

In [None]:
print(test_acc)

In [None]:
#extract the first image from the training set in order to plot Jigsaw puzzle solving in the report
images, labels, original = next(iter(train_loader))


In [None]:
#visualize the first image
plt.figure(figsize=(10,10))
for i in range(9):
    plt.subplot(3,3,i+1)
    #No axes
    plt.axis('off')
    #No grid
    plt.grid(False)
    plt.imshow(images[2][i].cpu().numpy().transpose(1,2,0))
    
#Save the shuffled image
plt.savefig('shuffled_image.png')
    


In [None]:
#Feed the image to the network in order to recognize the permutation
#Pass the image to device
images = images.to(device)
output = net(images[2].unsqueeze(0))
output = nn.Softmax(dim=1)(output)
print(torch.argmax(output, dim=1))  #Print the prediction

#Print the label of the image
print(labels[2])

perm = np.load('permutations_500.npy')
print(perm[144])  #Print the recognized permutation

#Visualize the first image ordering the permutation
plt.figure(figsize=(10,10))
for i in range(9):
    plt.subplot(3,3,i+1)
    #No axes
    plt.axis('off')
    #No grid
    plt.grid(False)
    
    
    #sort the indexes of perm[i] according to the permutation
    idx = np.argsort(perm[144])
    plt.imshow(images[2][idx[i]].cpu().numpy().transpose(1,2,0))
    
#Save the unshuffled image
plt.savefig('unshuffled_image.png')

In [None]:
#Use Fooddataset class from the Supervised part for the linear classifier


import glob
from PIL import Image
import numpy as np
import os
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import pandas as pd
import scipy
import seaborn as sns


class FoodDataset(Dataset):
    
    def __init__(self, root_dir, filelist, transform, split):
       
        self.split = split
        self.root_dir = root_dir  # Set the root directory for the dataset
        self.data = pd.read_csv(filelist, sep=",")  # Load the dataset         

        self.transform = transform
        total_data_len = int(len(self.data))
        idx = np.arange(total_data_len)
        np.random.seed(41)
        np.random.shuffle(idx)
        print(f"Shuffled indices (first 5): {idx[:5]}")  # Print first 5 shuffled indices

        # Select data based on split
        if split == "train":
            self.data = self.data.iloc[idx[:int(total_data_len * 0.8)]]  # Use 80% of the data for training
        elif split == "val":
            self.data = self.data.iloc[idx[int(total_data_len * 0.8):]]  # Use 20% of the data for validation
        else:
            self.data = self.data  # Use all data for testing (only for code clarity)

    def __len__(self):
        
        return len(self.data)

    def __getitem__(self, idx):
       
        img_name, label = self.data.iloc[idx]  # Get the image name and label at the specified index
        image = Image.open(os.path.join(self.root_dir, img_name))
        image = self.transform(image)

        sample = {'image': image, 'label': label}
        return sample, img_name


#Same transform and setting as the Supervised part

# Define data transformations (augmentations for training and normalization)
transform_train = transforms.Compose([  # Compose multiple transformations together
    transforms.Resize((256, 256)),  # Resize images to 256x256
    transforms.RandomHorizontalFlip(p=0.5),  # Randomly flip images horizontally for training augmentation
    transforms.ToTensor(),  # Convert PIL images to PyTorch tensors
    transforms.Normalize(  # Normalize pixel values
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

transform_val = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images to 256x256 (consistent with training)
    transforms.ToTensor(),  # Convert PIL images to PyTorch tensors
    transforms.Normalize(  # Normalize pixel values
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

# Set batch size
bs = 64

# Create datasets for training, validation, and testing

trainset = FoodDataset("/Users/stefanocarotti/Supervised/Project/train_set",
                       "/Users/stefanocarotti/Supervised/Project/annot/train_info.csv", transform_train, "train")

valset = FoodDataset("/Users/stefanocarotti/Supervised/Project/train_set",
                     "/Users/stefanocarotti/Supervised/Project/annot/train_info.csv", transform_val, "val")

testset = FoodDataset("/Users/stefanocarotti/Supervised/Project/val_set",
                      "/Users/stefanocarotti/Supervised/Project/annot/val_info.csv", transform_val, "test")

# Create data loaders
trainloader = torch.utils.data.DataLoader(trainset, batch_size=bs, shuffle=True)
valloader = torch.utils.data.DataLoader(valset, batch_size=1, shuffle=False)
testloader = torch.utils.data.DataLoader(testset, batch_size=1, shuffle=False)

# Print dataset length
print(f"Number of training samples: {len(trainloader) * bs}")
print(f"Number of validation samples: {len(valloader)}")
print(f"Number of test samples: {len(testloader)}")

# visualize one example
sample, name = trainset[0]
print(sample['image'].shape)
plt.imshow(sample['image'].permute(1, 2, 0))
print(name, sample['label'])


In [None]:
#Evaluation functions from the Supervised part
def accuracy(prediction, y):
    """Calculate accuracy."""
    return ((prediction == y).sum() / len(y)).item()


def evaluate_classifier(classif_model, testloader):
    classif_model.eval()  # Set the model to evaluation mode
    y_true = []
    y_pred = []
    epoch_loss = 0
    epoch_acc = 0
    prog_bar = tqdm(testloader, total=len(testloader))
    for i, (sample, _) in enumerate(prog_bar):  # Iterate over the validation data loader
        with torch.no_grad():
            image = sample['image'].to(device)
            label = sample['label'].to(device)
            z = net.conv(image)  # Forward pass through the convolutional layers
            z = torch.flatten(z, 1)  # Flatten the output from the convolutional layers
            prediction = classif_model(z)
            epoch_loss += criterion(prediction, label)
            epoch_acc += accuracy(torch.argmax(prediction, dim=1), label)
            y_true.extend(label.cpu().numpy())
            y_pred.extend(torch.argmax(prediction, dim=1).cpu().numpy())

    epoch_loss /= len(testloader)  # Calculate the average loss for the epoch
    epoch_acc /= len(testloader)  # Calculate the average accuracy for the epoch        
    return y_true, y_pred, epoch_loss, epoch_acc




In [None]:
# Linear classifier for SSL
from torch.optim.lr_scheduler import StepLR
sample, name = trainset[0]
image = sample['image'].to(device)  # Pass the image to the device in order to retrieve dimensions

# Retrieve the dimensions of the output from the convolutional layers
output = net.conv(image)
print(output.shape)
n_classes = 251 #Food recognition task
latent_dim = output.shape[0] * output.shape[1] * output.shape[2]
print(latent_dim)
def construct_classifier(latent_dim, n_classes):
        classifier = nn.Sequential( # Define the linear classifier with dropout
            nn.Linear(latent_dim, 1024),  # Linear layer (latent_dim -> 1024)
            nn.ReLU(),  # ReLU activation
            nn.Dropout(0.5),  # Dropout layer (p=0.5)
            nn.Linear(1024, n_classes)  # Linear layer (1024 -> n_classes)
        )
        return classifier

linear_classifier = construct_classifier(latent_dim, n_classes).to(device)  # Initialize the linear classifier
print(linear_classifier)



In [None]:
# Train the linear classifier
lin_optim = torch.optim.Adam(linear_classifier.parameters(), lr=0.0001) # Define the optimizer
scheduler = StepLR(lin_optim, step_size=15, gamma=0.5)  # Define the learning rate scheduler
criterion = nn.CrossEntropyLoss()  # Define the loss function (Cross-Entropy loss)
epochs = 20
net.eval()  # Set the convolutional part to evaluation mode

#Initialization
loss_list = []
epoch_loss = []
test_acc_list = []
test_loss_list = []

prog_bar = tqdm( trainloader, total=len(trainloader))
for epoch in range(epochs):
    for i, (sample, _) in enumerate(prog_bar):
            linear_classifier.train() # Set the linear classifier to training mode
            lin_optim.zero_grad()
            images = sample['image'].to(device)
            labels = sample['label'].to(device)
            
            # Forward + Backward + Optimize  
            z = net.conv(images)    # Forward pass through the convolutional layers
            z = torch.flatten(z, 1) # Flatten the output from the convolutional layers
            outputs = linear_classifier(z)     # Forward pass through the linear classifier
            loss = criterion(outputs, labels)  # Calculate the loss
            loss.backward()
            lin_optim.step()
            loss_list.append(loss.item())
    epoch_loss.append(np.mean(loss_list))
    print(f"Epoch {epoch + 1}, Loss: {epoch_loss[-1]:.4f}")
    
    
    
    # Evaluate the linear classifier
    linear_classifier.eval()
    y_true, y_pred, test_loss, test_acc = evaluate_classifier(linear_classifier, testloader)
    
    test_acc_list.append(test_acc)
    test_loss_list.append(test_loss)
    
    print(f"epoch: {epoch+1}, Validation Accuracy: {test_acc*100}%", f"Validation Loss: {test_loss}")
    scheduler.step()  # Update the learning rate scheduler on validation loss
    
    #Save the best model
    if epoch == 0:
        best_loss = test_loss
    else:
        if test_loss < best_loss:
            best_loss = test_loss
            torch.save(linear_classifier.state_dict(), "best_linear_classifier.pth")

In [None]:
#Load the best model
linear_classifier = construct_classifier(latent_dim, n_classes).to(device)
linear_classifier.load_state_dict(torch.load('best_linear_classifier.pth'))

In [None]:
# Evaluate the linear classifier
y_true, y_pred, test_loss, test_acc = evaluate_classifier(linear_classifier, testloader)