In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install timm

In [None]:
import numpy as np
import pandas as pd
from PIL import Image

import os
import csv

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split

import torchvision as TV
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
import torchvision.models as models

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report

import timm

import seaborn as sns
import matplotlib.pyplot as plt

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
data_dir_path = '/content/drive/MyDrive/VISIOPE_Project/DATASET/CLASSIFICATION_dataset/'

# Statistics' dataset

In [None]:
# number of images in each class for training datasets
data_dic = {}
dict_label = {0: 'biodegradable', 1 : 'glass', 2 : 'paper', 3 : 'plastic', 4 : 'special', 5 : 'trash'}
# Specify the file name
csv_file_name = ['train_class_balanced.csv', 'val_class_balanced.csv', 'test_class_balanced.csv']

# PER LEGGERE DAI CSV
for file in csv_file_name:
  with open(data_dir_path+file, 'r', newline='') as csvfile:
      csv_reader = csv.reader(csvfile)
      for row in csv_reader:
        if dict_label[int(row[1])] not in data_dic:
          data_dic[dict_label[int(row[1])]] = 1
        data_dic[dict_label[int(row[1])]] += 1

data_df= pd.Series(data_dic)
plt.figure(figsize = (15, 6))
data_df.sort_values().plot(kind = 'bar')
plt.xlabel('Classes')
plt.ylabel('Number of images')

# fare il grafico leggendo i csv

# DATASET

In [None]:
#USARE SE NON VOGLIAMO LO SPLIT DEI DATASET FISSO

# class CustomImageFolder(ImageFolder):
#   def __init__(self, root, transform=None, target_transform=None):
#     super(CustomImageFolder, self).__init__(root, transform, target_transform)

#   def __getitem__(self, index):
#     path, target = self.samples[index]

#     # Print the image path and label during loading
#     #print(f"Image path: {path}, Label: {target}")

#     # Load the image using the parent class's __getitem__ method
#     img = super(CustomImageFolder, self).__getitem__(index)

#     # Additional processing or modifications can be done here if needed

#     return img

In [None]:
class ClassificationDataset(Dataset):
  def __init__(self, csv_file, transform=None):
    self.data_frame = pd.read_csv(csv_file)
    self.transform = transform
    self.root_dir = '/content/drive/MyDrive/VISIOPE_Project/DATASET/CLASSIFICATION_dataset/'

  def __len__(self):
    return len(self.data_frame)

  def __getitem__(self, idx):
    img_path = self.data_frame.iloc[idx, 0]
    label = int(self.data_frame.iloc[idx, 1])

    # Load image
    image = Image.open(self.root_dir+img_path).convert("RGB")

    # Apply transformations
    if self.transform:
        image = self.transform(image)

    return image, label

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

#
# Create an instance of your custom dataset
#custom_dataset = CustomImageFolder(root=data_dir_path, transform=transform)

# train_size = int(0.7 * len(custom_dataset))
# val_size = int(0.15 * len(custom_dataset))
# test_size = len(custom_dataset) - train_size - val_size

# # Suddivisione del dataset
# train_dataset, val_dataset, test_dataset = random_split(custom_dataset, [train_size, val_size, test_size])

train_dataset = ClassificationDataset(data_dir_path+'train_class_balanced.csv', transform)
val_dataset = ClassificationDataset(data_dir_path+'val_class_balanced.csv', transform)
test_dataset = ClassificationDataset(data_dir_path+'test_class_balanced.csv', transform)

# Create a DataLoader to handle batching and shuffling
train_loader = DataLoader(train_dataset, batch_size = 64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size = 64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size = 64, shuffle=False)

In [None]:
#numero dei batch
print(len(train_loader))
print(len(val_loader))
print(len(test_loader))

# MODEL

In [None]:
# Define a custom classifier (replace with your own if needed)
class CustomClassifierModel(nn.Module):

  def __init__(self, num_classes, model_type):
    super(CustomClassifierModel, self).__init__()
    self.model_type = model_type

    if self.model_type == 'eff':
      self.model = timm.create_model("efficientnet_b0", pretrained=True)
      in_features = self.model.classifier.in_features
      self.model.classifier = nn.Linear(in_features, num_classes)

    if self.model_type == 'resnet18':
      self.resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
      in_features = self.resnet.fc.in_features
      self.resnet.fc = nn.Linear(in_features, num_classes)

    if self.model_type == 'resnet50':
      self.resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
      in_features = self.resnet.fc.in_features
      self.resnet.fc = nn.Linear(in_features, num_classes)

    if self.model_type == 'vgg16':
      self.model = models.vgg16(weights=models.VGG16_Weights.DEFAULT, progress = True) #pretrained=True
      self.model.classifier[6] = nn.Linear(4096, num_classes)


  def forward(self, x):     #gli passo anche il label??
    if self.model_type == 'resnet18': return self.resnet(x)
    if self.model_type == 'resnet50': return self.resnet(x)
    if self.model_type == 'eff': return self.model(x)
    if self.model_type == 'vgg16': return self.model(x)

In [None]:
# Instantiate the model
num_classes = 6
model_type = 'vgg16'
#'resnet50'
#'vgg16'
#'efficientNet'

model = CustomClassifierModel(num_classes, model_type)
model.to(device)    # model to put on GPU before to define the optimizer

criterion = nn.CrossEntropyLoss()   #provare altre error function, usare crossentropy con i pesi per unbalanced data
#criterion = TV.ops.sigmoid_focal_loss() #FocalLoss(input, target)

optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
#optimizer = optim.Adam(model.parameters(), lr=0.001) # da provare in generale e da provare con effnet

#scheduler = optim.lr_scheduler.LinearLR(optimizer, start_factor = 0.001, end_factor = 0.3, total_iters = min(1000, len(train_loader)-1))
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer) #lascio parametri di default

## Training

In [None]:
num_epochs = 5
c = 0
k = 0

train_loss = []
val_loss = []
accuracy = []
accuracy_v = []

for epoch in range(num_epochs):

  #METRICS FOR EACH BATCH
  running_loss_train = 0.0
  correct_predictions_train = 0
  total_samples_train = 0

  running_loss_val = 0.0
  correct_predictions_val = 0
  total_samples_val = 0

  true_label_train = []
  true_label_val = []

  pred_label_train = []
  pred_label_val = []

  print('TRAIN')

  for images, labels in train_loader:
    c += 1
    if  c%10 == 0: print(c)
    x = images.to(device)
    y = labels.to(device)
    optimizer.zero_grad()
    outputs = model(x)
    loss = criterion(outputs, y)
    loss.backward()
    optimizer.step()

    # Update statistics
    running_loss_train += loss.item()
    _, predicted = torch.max(outputs.data, 1)
    total_samples_train += labels.size(0)
    correct_predictions_train += (predicted == y).sum().item()

    y = y.cpu()
    predicted = predicted.cpu()

    for elem in y: true_label_train.append(elem)
    for elem in predicted: pred_label_train.append(elem)

  # scheduler.step(running_loss_train)      #it can be in or out the batch loop/epochs... try with updates for each epoch

  # Calculate accuracy and average loss for the epoch
  accuracy_train = correct_predictions_train / total_samples_train
  average_loss_train = running_loss_train / len(train_loader)

  print('VAL')

  model.eval()
  with torch.no_grad():
    for images, labels in val_loader:
      k += 1
      if  k%5 == 0: print(k)
      x = images.to(device)
      y = labels.to(device)
      outputs = model(x)
      loss = criterion(outputs, y)

      # Update statistics
      running_loss_val += loss.item()
      _, predicted = torch.max(outputs.data, 1)
      total_samples_val += labels.size(0)
      correct_predictions_val += (predicted == y).sum().item()

      y = y.cpu()
      predicted = predicted.cpu()

      for elem in y: true_label_val.append(elem)
      for elem in predicted: pred_label_val.append(elem)

    scheduler.step(running_loss_val)      #it can be in or out the batch loop/epochs... try with updates for each epoch

    # Calculate accuracy and average loss for the epoch
    accuracy_val = correct_predictions_val / total_samples_val
    average_loss_val = running_loss_val / len(val_loader)

    train_loss.append(average_loss_train)
    val_loss.append(average_loss_val)
    accuracy.append(accuracy_train)
    accuracy_v.append(accuracy_val)

    print(f"Epoch {epoch + 1}, Loss train: {average_loss_train:.4f}, Accuracy train: {accuracy_train * 100:.2f}%, Loss val: {average_loss_val:.4f}, Accuracy val: {accuracy_val * 100:.2f}%")

print("TRAINING REPORT")
print(classification_report(true_label_train, pred_label_train, digits = 4))

print("VALIDATION REPORT")
print(classification_report(true_label_val, pred_label_val, digits = 4))

In [None]:
epochs = range(1, num_epochs + 1)

# Plotting training and validation loss
plt.figure(figsize=(10, 5))

plt.plot(epochs, train_loss, label='Training Loss', color = 'blue')
plt.plot(epochs, val_loss, label='Validation Loss', color = 'red')
plt.plot(epochs, accuracy, label='Training Accuracy', color='green')
plt.plot(epochs, accuracy_v, label='Validation Accuracy', color='black')

plt.title('Metrics')
plt.xlabel('Epochs')
plt.ylabel('Loss / Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
model_path = '/content/drive/MyDrive/VISIOPE_Project/weights/pesi_classification/4_ep_vgg_64b_FIX_SETS_bal_lr.pth'
torch.save(model.state_dict(), model_path)

### CONFUSION MATRIX TRAINING

In [None]:
label = ['biodegradable','glass','paper','plastic', 'special', 'trash']

# Calculate evaluation metrics
accuracy = accuracy_score(true_label_train, pred_label_train)
precision = precision_score(true_label_train, pred_label_train, average = 'weighted')
recall = recall_score(true_label_train, pred_label_train, average = 'weighted')
f1 = f1_score(true_label_train, pred_label_train, average = 'weighted')
conf_matrix = confusion_matrix(true_label_train, pred_label_train)

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

# Crea una heatmap utilizzando seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False,
            xticklabels=np.unique(label), yticklabels=np.unique(label))

plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

## Testing

In [None]:
csv_file_path = data_dir_path+'test_class.csv'
df_test = pd.read_csv(csv_file_path)

true_lab = []
pred_lab = []

model.eval()
with torch.no_grad():
  for i in range(len(df_test)):
    img_path = df_test.iloc[i, 0]
    label = int(df_test.iloc[i, 1])

    # Load image
    image = Image.open(data_dir_path+img_path).convert("RGB")
    image = transform(image)

    x = image.to(device)
    y = label.to(device)
    outputs = model(x)
    # Calculate metrics as needed
    _, predicted = torch.max(outputs.data, 1)

    y = y.cpu()
    predicted = predicted.cpu()

    for elem in labels: true_lab.append(elem)
    for elem in predicted: pred_lab.append(elem)

  accuracy = accuracy_score(true_lab, pred_lab)
  precision = precision_score(true_lab, pred_lab, average = 'weighted')
  recall = recall_score(true_lab, pred_lab, average = 'weighted')
  f1 = f1_score(true_lab, pred_lab, average = 'weighted')
  conf_matrix = confusion_matrix(true_lab, pred_lab)


print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
#print(f"Confusion Matrix:\n{conf_matrix}")

# Crea una heatmap utilizzando seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False,
            xticklabels=np.unique(label), yticklabels=np.unique(label))

plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

print("Test Report")
print(classification_report(true_lab, pred_lab, digits = 4))

In [None]:
#TEST

# Evaluation loop (replace this with your own evaluation logic)

dir_label = {0 : 'biodegradable', 1 : 'glass', 2 : 'paper', 3 : 'plastic', 4 : 'special', 5 : 'trash'}

true_lab = []
pred_lab = []

model.eval()
with torch.no_grad():
  for inputs, labels in test_loader:
    x = inputs.to(device)
    y = labels.to(device)
    outputs = model(x)
    # Calculate metrics as needed
    _, predicted = torch.max(outputs.data, 1)

    y = y.cpu()
    predicted = predicted.cpu()

    # for i in range(128):
    #   if y[i] != predicted[i]: print('TRUE', dir_label[y[i].item()], 'PREDICT', dir_label[predicted[i].item()])

    for elem in labels: true_lab.append(elem)
    for elem in predicted: pred_lab.append(elem)

  accuracy = accuracy_score(true_lab, pred_lab)
  precision = precision_score(true_lab, pred_lab, average = 'weighted')
  recall = recall_score(true_lab, pred_lab, average = 'weighted')
  f1 = f1_score(true_lab, pred_lab, average = 'weighted')
  conf_matrix = confusion_matrix(true_lab, pred_lab)


print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
#print(f"Confusion Matrix:\n{conf_matrix}")

# Crea una heatmap utilizzando seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False,
            xticklabels=np.unique(label), yticklabels=np.unique(label))
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

print("Test Report")
print(classification_report(true_lab, pred_lab, digits = 4))