In [1]:
import cv2
import os
import shutil
import numpy as np
import pandas as pd
import random
# import dlib
#from mtcnn import MTCNN

import torch
import torchvision.transforms.v2 as transforms
from torchvision import datasets
from torch.utils.data import Dataset, DataLoader, ConcatDataset, SubsetRandomSampler
from torchvision.models import alexnet, AlexNet_Weights, vgg16, VGG16_Weights
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.transforms import Compose, Resize, Normalize, ToTensor

from PIL import Image
import matplotlib.pyplot as plt
#import seaborn as sns

from sklearn.metrics import classification_report, confusion_matrix

from scipy.stats import entropy


from ray import tune
from ray import train
from ray.tune.search.hebo import HEBOSearch

In [5]:
source_real_path = '/teamspace/studios/deepfake-image-detection-resnet/dataset/real'
dest_real_path = '/videos/Real'

source_fake_path = '/teamspace/studios/deepfake-image-detection-resnet/dataset/fake'
dest_fake_path = '/videos/Fake'

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [6]:
import os
if os.path.exists(source_real_path):
    print("Dataset found!")
else:
    print("Dataset not found!")

Dataset found!


In [None]:
def plot_training_history(history_dic):

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 10))
    
    ax1.plot(history_dic['train_loss'], label='Training Loss', color='blue')
    ax1.plot(history_dic['val_loss'], label='Validation Loss', color='red')
    ax1.set_title('Training and Validation Loss')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.legend()
    
    ax2.plot(history_dic['train_acc'], label='Training Accuracy', color='blue')
    ax2.plot(history_dic['val_acc'], label='Validation Accuracy', color='red')
    ax2.set_title('Training and Validation Accuracy')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy')
    ax2.legend()
    
    plt.tight_layout()
    plt.show()

def evaluate(model, test_ds):
    model.eval()
    y_true = torch.tensor([], dtype=torch.int64).to(device)
    y_pred = torch.tensor([], dtype=torch.int64).to(device)
    
    with torch.no_grad():
        for i, data in enumerate(test_ds):
            x, y = data
            x = x.to(device)
            y = y.to(device)
            output = model(x)
            predictions = torch.where(output >= 0.5, 1, 0)
    
            # Accumulate true labels and predictions
            y_true = torch.cat((y_true, y), dim=0)
            y_pred = torch.cat((y_pred, predictions), dim=0)

        y_true = y_true.cpu()
        y_pred = y_pred.cpu()
            
            
    
    # Generate classification report
    report = classification_report(y_true, y_pred, target_names=["Real", "Fake"])
    print(report)

    matrix = confusion_matrix(y_true, y_pred)
    ax = sns.heatmap(matrix, annot=True, fmt='.2f', cmap='crest')
    
    # Set custom tick labels
    ax.set_xticklabels(['Real', 'Fake'], rotation=0)
    ax.set_yticklabels(['Real', 'Fake'], rotation=0)

    plt.title('Confussion Matrix')
    plt.show()

In [8]:
dataset = torch.load('train_dataset.pt')
val_dataset = torch.load('test_dataset.pt')
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)
dataloaders = {'train':dataloader, 
               'val': val_dataloader}

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import KFold
import os

# Load pre-saved datasets
dataset = torch.load('train_dataset.pt')
val_dataset = torch.load('test_dataset.pt')

# Create DataLoaders
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Combine both train and validation datasets for K-Fold Cross-Validation
dataset_concat = torch.utils.data.ConcatDataset([dataset, val_dataset])

# Set K-Fold parameters
k_folds = 3
batch_size = 32
num_workers = 4

# Dictionary to hold train and validation DataLoaders for each fold
folds = {'train': [], 'val': []}

# Initialize K-Fold Cross-Validation
kfold = KFold(n_splits=k_folds, shuffle=True)

# Split the dataset and create DataLoaders for each fold
for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset_concat)):
    train_subset = Subset(dataset_concat, train_idx)
    val_subset = Subset(dataset_concat, val_idx)

    train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    folds['train'].append(train_loader)
    folds['val'].append(val_loader)

# Print number of folds
print(f"Total train folds: {len(folds['train'])}")
print(f"Total val folds: {len(folds['val'])}")

def train_kfold(model, folds, optim, criterion, epochs, device='cuda', display_prints=True, tuning=False):
    fold_results = []

    for fold in range(len(folds['train'])):
        print(f"Training fold {fold+1}/{len(folds['train'])}")
        train_loader = folds['train'][fold]
        val_loader = folds['val'][fold]
        model.apply(reset_weights)
        optim.zero_grad()

        best_val_loss = float('inf')  # Track best validation loss for saving the model

        for epoch in range(epochs):
            if display_prints:
                print(f'Epoch {epoch}/{epochs - 1}')
                print('-'*10)
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()
                else:
                    model.eval()

                running_loss = 0.0
                running_correct = 0.0
                for inputs, labels in (train_loader if phase == 'train' else val_loader):
                    if device == 'cuda':
                        inputs = inputs.to(device)
                        labels = labels.to(device)
                    labels = labels.float()
                    optim.zero_grad()
                    with torch.set_grad_enabled(phase == 'train'):
                        output = model(inputs)
                        loss = criterion(output, labels)
                        acc = (torch.where(output >= 0.5, 1, 0) == labels).sum()

                        if phase == 'train':
                            loss.backward()
                            optim.step()
                    del inputs, labels, output
                    running_loss += loss.item()
                    running_correct += acc.item()
                epoch_loss = running_loss / len(train_loader if phase == 'train' else val_loader)
                epoch_acc = running_correct / len(train_loader.dataset if phase == 'train' else val_loader.dataset)
                if display_prints:
                    print(f'{phase} Loss: {epoch_loss:.3f} Acc: {epoch_acc:.3f}')
                if tuning and phase == "val":
                    train.report({"loss": epoch_loss.item(), "accuracy": epoch_acc.item()})

            # Save the best model for this fold based on validation loss
            if phase == 'val' and epoch_loss < best_val_loss:
                best_val_loss = epoch_loss
                torch.save(model.state_dict(), f'best_model_fold_{fold+1}.pth')
                print(f"Best model for fold {fold+1} saved!")

        fold_results.append({
            'fold': fold,
            'train_loss': epoch_loss,
            'train_accuracy': epoch_acc
        })

    # Calculate average loss and accuracy across all folds
    avg_train_loss = sum([result['train_loss'] for result in fold_results]) / len(fold_results)
    avg_train_accuracy = sum([result['train_accuracy'] for result in fold_results]) / len(fold_results)

    print(f"Average Train Loss: {avg_train_loss:.3f}")
    print(f"Average Train Accuracy: {avg_train_accuracy:.3f}")

    # Save the final model after all folds
    torch.save(model.state_dict(), 'final_model.pth')
    print("Final model saved!")

    return model, fold_results
# Function to reset the weights of the model
def reset_weights(m):
    if isinstance(m, torch.nn.Conv2d) or isinstance(m, torch.nn.Linear):
        m.reset_parameters()


Total train folds: 3
Total val folds: 3


In [15]:
from models.resnet50 import ResNet50DeepFake
model = ResNet50DeepFake(hidden_dim=32, reduced_feature_dim=512)
model.to(device)
optimizer = optim.AdamW(model.parameters(), lr=0.0001)
criterion = nn.BCELoss()
out_model, result = train_kfold(model, folds, optimizer, criterion, 10)
result

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /home/zeus/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 289MB/s]


Training fold 1/3
Epoch 0/9
----------


RuntimeError: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx