In [None]:
# useful libraries
import numpy as np
import pandas as pd
import os
import torchvision.datasets
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torch.optim as optim
from PIL import Image
from torch.optim.lr_scheduler import CosineAnnealingLR

In [None]:
# GPU check
train_on_gpu = torch.cuda.is_available()
if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')
device = torch.device("cuda:0" if train_on_gpu else "cpu")
print(device)

In [None]:
# creation of the training set of resized images
size = 128

fake_transforms = torchvision.transforms.Compose([
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Resize((size, size))
    ])


fake_training_set = torchvision.datasets.ImageFolder(root='/kaggle/input/supervised-sets/processedData/processedData/processed_train_set', transform=fake_transforms)
# create a "fake" loader in order to compute mean and std for normalization
fake_train_loader = DataLoader(fake_training_set, batch_size=64, shuffle=True, num_workers=2)

In [None]:
# compute the mean and standard deviation for the normalization
def tot_mean_std(loader):
    mean = 0
    std = 0
    count = 0
    for batch,_ in loader:
        batch_samples = batch.size(0)
        batch = batch.view(batch_samples, batch.size(1), -1)
        # reshape to [batch_samples,3,128*128]: ready to go into the CNN
        mean = mean + batch.mean(2).sum(0) # mean over the pixels of each image of the batch summed to the others
        std = std + batch.std(2).sum(0) # same for the standard deviation
        count = count + batch_samples

    mean = mean/count
    std = std/count

    return mean, std

In [None]:
size = 128
mean, std = tot_mean_std(fake_train_loader)
#print(mean)
#print(std)

transforms = torchvision.transforms.Compose([
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Resize((size, size)),
        torchvision.transforms.Normalize(mean = mean, std = std)
    ])


training_set = torchvision.datasets.ImageFolder(root='/kaggle/input/supervised-sets/processedData/processedData/processed_train_set', transform=transforms)
validation_set = torchvision.datasets.ImageFolder(root='/kaggle/input/supervised-sets/processedData/processedData/processed_val_set', transform=transforms)

# "true" loaders
train_loader = DataLoader(training_set, batch_size=64, shuffle=True, num_workers=2)
val_loader = DataLoader(validation_set, batch_size=64, shuffle=True, num_workers=2)

In [None]:
# particular transforms for the test set, including the conversion to RGB (already made for training and validation during the preprocessing)
mean, std = tot_mean_std(fake_train_loader)
transforms_test = torchvision.transforms.Compose([
        torchvision.transforms.Lambda(lambda image: image.convert('RGB')),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Resize((size, size)),
        torchvision.transforms.Normalize(mean = mean, std = std)
    ])

test_set = torchvision.datasets.ImageFolder(root='/kaggle/input/supervised-sets/processedData/processedData/processed_test_set', transform=transforms_test)
test_loader = DataLoader(test_set, batch_size = 1, shuffle=True, num_workers=2)

In [None]:
# ID for each test class (because they are not sorted in the folders)
test_classes_id = [int(x) for x in list(test_set.class_to_idx)]
test_classes_id = np.array(test_classes_id)

In [None]:
# CNN definition
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        # output_size = (input_size - kernel_size + 2*padding_size)/stride + 1
        self.conv1 = nn.Conv2d(3,8,kernel_size = 5, padding = 1)  # First convolutional layer
        self.pool = nn.MaxPool2d(3,2)  # Max pooling layer
        self.conv2 = nn.Conv2d(8,12,kernel_size = 4, padding = 1)  # Second convolutional layer
        self.conv3 = nn.Conv2d(12,16,kernel_size = 3, padding = 1)  # Third convolutional layer
        self.conv4 = nn.Conv2d(16,30,kernel_size = 2, padding = 1)  # Fourth convolutional layer
        self.fc1 = nn.Linear(1470,550)  # First fully-connected layer
        self.fc2 = nn.Linear(550,251) # Output layer
        self.drop1 = nn.Dropout(0.5) # Dropout

    def forward(self, x):
        # alternative: leaky relu
        x = self.pool(F.leaky_relu(self.conv1(x)))  # First convolutional layer with leaky ReLU activation and pooling
        x = self.pool(F.leaky_relu(self.conv2(x)))  # Second convolutional layer with leaky ReLU activation and pooling
        x = self.pool(F.leaky_relu(self.conv3(x)))  # Third convolutional layer with leaky ReLU activation and pooling
        x = self.pool(F.leaky_relu(self.conv4(x))) # Fourth convolutional layer with leaky ReLU activation and pooling
        x = x.view(x.shape[0],-1)  # Flatten the output from convolutional layers
        x = self.drop1(F.leaky_relu(self.fc1(x)))  # First fully-connected layer with leaky ReLU activation
        x = (self.fc2(x))  # Output layer
        return x

net = Net()
net.to(device)

In [None]:
# SUMMARY AND NUMBER OF PARAMETERS
!pip install torchsummary
from torchsummary import summary
summary(net, input_size= (3,128,128))

In [None]:
# RECEPTIVE FIELD (WITHOUT DROPOUT)
#!git clone https://github.com/Fangyh09/pytorch-receptive-field.git
#!mv -v pytorch-receptive-field/torch_receptive_field ./
#from torch_receptive_field import receptive_field

#receptive_field(net, input_size=(3, 128, 128))

In [None]:
# Number of epochs
epochs = 15
# Loss function
criterion = nn.CrossEntropyLoss()  # Use cross-entropy loss for multi-class classification
# Optimizer
optimizer = optim.SGD(net.parameters(), lr = 0.01, momentum=0.9)
# Scheduler
scheduler1 = CosineAnnealingLR(optimizer,T_max=epochs,  # Max number of iterations for scheduler
eta_min=1e-8)  # Min learning rate for scheduler

In [None]:
# TRAINING AND VALIDATION PHASE
best_model = None
record_loss = [] # record the validation losses

avg_losses=[] # used for the final plot
validation_errors = [] # used for the final plot

# Loop over the dataset multiple times (epochs)
for epoch in range(epochs):
    running_loss=[]  # track the total loss for this epoch

    net.train() # training mode (dropout used)

    # Iterate over the training data loader
    for i, data in enumerate(train_loader, 0):
        # Get the inputs (images) and labels from the current batch
        inputs, labels = data

        # Move the inputs and labels to the device
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Clear the gradients accumulated in the previous iteration
        optimizer.zero_grad()


        outputs = net(inputs)  # Forward pass
        # Pass the input images through the network to get predictions
        loss = criterion(outputs, labels)  # Compute the loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update the weights and biases of the network based on the calculated gradients
        running_loss.append(loss.item())  # Accumulate the loss for this mini-batch

    #Scheduler step
    scheduler1.step()

    print('[%d] loss: %.4f' %(epoch + 1, np.mean(running_loss)))
    avg_losses.append(np.mean(running_loss))

    # Validation set loop
    val_running_loss = []  # store validation loss for each batch

    net.eval() # evaluation mode

    for i, data in enumerate(val_loader):
        # Get inputs and labels from the data loader
        inputs, labels = data


        inputs = inputs.to(device)  # Move data to the specified device
        labels = labels.to(device)  # Move labels to the specified device

        # Forward pass with gradient suppression
        with torch.no_grad():
            outputs = net(inputs)  # Get model predictions without calculating gradients

        loss = criterion(outputs.squeeze(), labels.squeeze())
        val_running_loss.append(loss.item())

    # Calculate and print validation performance metrics
    print('Validation loss: %.6f' % (np.mean(val_running_loss)))
    validation_errors.append(np.mean(val_running_loss))

    if(best_model is None or np.mean(val_running_loss)< min(record_loss)): # save the model only if the mean of the current val loss is better than the previous
        print(f"best model updated at epoch {epoch+1}")
        best_model = net
        torch.save(net.state_dict(), 'best_model.pth')
        record_loss.append( np.mean(val_running_loss))


In [None]:
import matplotlib
import matplotlib.pyplot as plt
# Plotting both training and validation losses in one graph
epochs_x = range(1, len(avg_losses) + 1)
plt.plot(epochs_x, avg_losses, label='Training Loss')
plt.plot(epochs_x, validation_errors, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

In [None]:
# TESTING PHASE
correct = 0
total = 0
labelsTest = [] # true labels
predictions = [] # predicted classes

net.eval() # evaluation mode

# Disable gradient calculation
with torch.no_grad():
    # Loop over the test loader
    for data in test_loader:
        # Get the image and label from the current batch
        inputs, labels = data

        labelsTest.append(labels.cpu())

        # Move the image data to the specified device
        inputs = inputs.to(device)

        # Get the network's prediction for each image
        outputs = net(inputs)

        # Find the class with the highest probability
        _, predicted = torch.max(outputs.cpu(), 1)

        predictions.append(predicted.cpu())

        # Update total number of test images
        total += labels.size(0)

        # Count correct predictions
        correct += (predicted == labels).sum().item()

# Compute and print accuracy
print('Accuracy of the network on the test images: %d %%' % (
    100 * correct / total))


In [None]:
# true labels taken from iFood19 info
trainLabels = pd.read_csv('/kaggle/input/supervised-sets/train_labels.csv')
testLabels = pd.read_csv('//kaggle/input/supervised-sets/val_labels.csv')

trainClasses = trainLabels['label'].unique()
testClasses = testLabels['label'].unique()

In [None]:
# number of samples per class in the test set
n = []
testClasses = np.sort(testClasses)
for i in testClasses:
    matching_rows = testLabels[testLabels['label'] == i]
    n.append(len(matching_rows))

In [None]:
# Percentages of correctly classified samples in the test set for each class
# The fact that the folders are not sorted per class is taken into account by the class id
k = 0
for i in range(0,251):
    corr = 0
    for j in range(0,len(test_loader)):
        if((labelsTest[j].item() == predictions[j].item()) & (labelsTest[j].item() == (np.where(test_classes_id == i)[0][0]))):
            corr = corr + 1
    if(int(corr/n[i]*100) > 0):
        print("Correctly classified samples for class", i)
        print(int((corr/n[i])*100),"%")
        k = k+1
print("Number of classes with non zero percentage of correctly classified samples")
print(k)

In [None]:
# Confusion matrix for two specific classes
import seaborn as sns
from sklearn.metrics import confusion_matrix
import matplotlib as mp


ls = []
preds = []

for j in range(0,len(test_loader)):
    if(((labelsTest[j].item() == (np.where(test_classes_id == 12)[0][0])) | (labelsTest[j].item() == (np.where(test_classes_id == 148)[0][0]))) & ((predictions[j].item() == (np.where(test_classes_id == 12)[0][0])) | (predictions[j].item() == (np.where(test_classes_id == 148)[0][0])))):
        ls.append(labelsTest[j].item())
        preds.append(predictions[j].item())


conf_matr = confusion_matrix(ls, preds)
ax = sns.heatmap(conf_matr, annot=True, cmap='Oranges')
ax.set_title('Confusion Matrix for Test set \n\n');
ax.set_xlabel('\nPredicted Values')
ax.set_ylabel('Actual Values ');
mp.pyplot.show()
mp.pyplot.clf()