# EfficientNet implementation

##Sources


https://docs.pytorch.org/vision/main/models/efficientnet.html \\
https://github.com/lukemelas/EfficientNet-PyTorch \\
https://github.com/pytorch/vision/blob/main/torchvision/models/efficientnet.py

#Code

##Imports

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from timm import create_model
from torch.utils.data import DataLoader
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import random
from google.colab import drive
import shutil
import os

%pip install wandb -q
import wandb
import random
import math
wandb.login()

drive.mount('/content/drive')

Mounted at /content/drive


##Parameters

###Paths

In [2]:
def merge_folders(folder1, folder2, folder3):
  """
  Unisce il contenuto di due cartelle in una terza.

  Args:
      folder1: Percorso della prima cartella.
      folder2: Percorso della seconda cartella.
      folder3: Percorso della cartella di destinazione.
  """
  # Crea la cartella di destinazione se non esiste
  if not os.path.exists(folder3):
    os.makedirs(folder3)

  # Copia il contenuto di folder1 in folder3
  for item in os.listdir(folder1):
    source = os.path.join(folder1, item)
    destination = os.path.join(folder3, item)
    if os.path.isdir(source):
      shutil.copytree(source, destination, dirs_exist_ok=True)
    else:
      shutil.copy2(source, destination)

  # Copia il contenuto di folder2 in folder3
  for item in os.listdir(folder2):
    source = os.path.join(folder2, item)
    destination = os.path.join(folder3, item)
    if os.path.isdir(source):
      shutil.copytree(source, destination, dirs_exist_ok=True)
    else:
      shutil.copy2(source, destination)

In [3]:
# #Training set
# t_fake = '/content/drive/MyDrive/CV_project/Dataset/Samples/train/fake'
# t_real = '/content/drive/MyDrive/CV_project/Dataset/Samples/train/real'
# t_all = '/content/drive/MyDrive/CV_project/Dataset/Samples/train/all'

# #Test set
# tst_fake = '/content/drive/MyDrive/CV_project/Dataset/Samples/test/fake'
# tst_real = '/content/drive/MyDrive/CV_project/Dataset/Samples/test/real'
# tst_all = '/content/drive/MyDrive/CV_project/Dataset/Samples/test/all'
# #merge_folders(tst_fake, tst_real, tst_all)

# #Validation set
# v_fake = '/content/drive/MyDrive/CV_project/Dataset/Samples/validation/fake'
# v_real = '/content/drive/MyDrive/CV_project/Dataset/Samples/validation/real'
# v_all = '/content/drive/MyDrive/CV_project/Dataset/Samples/validation/all'
# #merge_folders(v_fake, v_real, v_all)

DATASET_PATHS = {
    'train': '/content/drive/MyDrive/CV_project/Dataset/Samples/train',
    'test': '/content/drive/MyDrive/CV_project/Dataset/Samples/test',
    'eval': '/content/drive/MyDrive/CV_project/Dataset/Samples/validation'
}

In [4]:
# def count_files(folder_path):
#   """Counts the number of files in a specified folder.

#   Args:
#     folder_path: The path to the folder.

#   Returns:
#     The number of files in the folder.
#   """
#   num_files = len([f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))])
#   return num_files

# folder_path = DATASET_PATHS['train']
# num_files_t = count_files(folder_path)
# print(f"Number of files in folder '{folder_path}': {num_files_t}")

# folder_path = DATASET_PATHS['test']
# num_files_tst = count_files(folder_path)
# print(f"Number of files in folder '{folder_path}': {num_files_tst}")

# folder_path = DATASET_PATHS['eval']
# num_files_v = count_files(folder_path)
# print(f"Number of files in folder '{folder_path}': {num_files_v}")

###Hyperparameters

In [5]:
BATCH_SIZE = 32
NUM_EPOCHS = 10
LEARNING_RATE = 1e-4
NUM_CLASSES = 2  # Binaria per la deepfake detection
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

### Code Repeatibility

In [6]:
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

## Dataset preprocessing

In [7]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Caricamento dei dataset
train_dataset = datasets.ImageFolder(DATASET_PATHS['train'], transform=transform)
test_dataset = datasets.ImageFolder(DATASET_PATHS['test'], transform=transform)
eval_dataset = datasets.ImageFolder(DATASET_PATHS['eval'], transform=transform)

# Creazione dei DataLoader
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
eval_loader = DataLoader(eval_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

total_samples = len(train_dataset) + len(test_dataset) + len(eval_dataset)
print(f"Training samples: {len(train_dataset)}, Test samples: {len(test_dataset)}, Evaluation samples: {len(eval_dataset)}")

#Overwiev
print("\nOverview:")
print(f"Total number of samples: {total_samples}")
print(f"Split:")
print(f"Training set: {round((len(train_dataset)*100)/total_samples)}%")
print(f"Test set: {round((len(test_dataset)*100)/total_samples)}%")
print(f"Validation set: {round((len(eval_dataset)*100)/total_samples)}%")

Training samples: 6998, Test samples: 1494, Evaluation samples: 1494

Overview:
Total number of samples: 9986
Split:
Training set: 70%
Test set: 15%
Validation set: 15%


## Model (EfficientNet)

In [8]:
model = create_model('efficientnetv2_rw_s', pretrained=False, num_classes=NUM_CLASSES)
model = model.to(DEVICE)

##Training

In [9]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
wandb.init(
      entity = "CV_Leo_Fra"
      project="EfficientNet-Deepfake-Detection",
      job_type="training",
      config={
      "learning_rate": LEARNING_RATE,
      "batch_size": BATCH_SIZE,
      "epochs": NUM_EPOCHS,
      "architecture": "EfficientNetV2_s",
      "dataset": "DFFD distilled"
      })
# Ciclo di addestramento
train_losses = []

for epoch in range(NUM_EPOCHS):
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        wandb.log({"training_loss": running_loss})

        if (i + 1) % 100 == 0:  # Print progress every 100 batches
            print(f'Epoch [{epoch+1}/{NUM_EPOCHS}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')


    epoch_loss = running_loss / len(train_loader.dataset)
    train_losses.append(epoch_loss)
    print(f'Epoch {epoch+1}/{NUM_EPOCHS} finished, Average Loss: {epoch_loss:.4f}')

Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7f6ffcaf4540>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/torch/utils/data/dataloader.py", line 1618, in __del__
    self._shutdown_workers()
  File "/usr/local/lib/python3.11/dist-packages/torch/utils/data/dataloader.py", line 1582, in _shutdown_workers
    w.join(timeout=_utils.MP_STATUS_CHECK_INTERVAL)
  File "/usr/lib/python3.11/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/popen_fork.py", line 40, in wait
    if not wait([self.sentinel], timeout):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/connection.py", line 948, in wait
    ready = selector.select(timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/selectors.py", line 415, in select
    fd_event_list = self._selector.poll(timeout)
     

KeyboardInterrupt: 

## Validation

In [None]:
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
print(f'Accuratezza sul set di test: {accuracy:.4f}')
wandb.log({"test_accuracy": accuracy})

 ## Inference


In [None]:
print("\nReport di classificazione:")
report = classification_report(all_labels, all_preds, target_names=['Real', 'Fake'])
wandb.log({"classification_report": wandb.Html(report)})
conf_matrix = confusion_matrix(all_labels, all_preds)
wandb.log({"confusion_matrix": wandb.plot.confusion_matrix(probs=None, y_true=all_labels, preds=all_preds, class_names=['Real', 'Fake'])})
print(report)

## Plotting

In [None]:
plt.figure(figsize=(10,5))
plt.plot(range(1, NUM_EPOCHS+1), train_losses, label='Loss di addestramento')
plt.xlabel('Epoca')
plt.ylabel('Loss')
plt.title('Andamento della Loss durante l\'addestramento')
plt.legend()
plt.show()

plt.figure(figsize=(10,8))
sns.heatmap(conf_matrix, annot=True, fmt='d', xticklabels=['Real', 'Fake'], yticklabels=['Real', 'Fake'], cmap='Blues')
plt.xlabel('Predetto')
plt.ylabel('Reale')
plt.title('Matrice di Confusione')
plt.show()

wandb.finish()