<a href="https://colab.research.google.com/github/Daksh-14/DeepFake-Detection/blob/main/DeepFake_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader, random_split, ConcatDataset
from torchvision import transforms, models
from torch import nn, optim
from PIL import Image
import random

In [None]:
import cv2
import os

# Path to the folder containing video files in your Google Drive
video_folder_path = '/content/drive/MyDrive/Celeb-DF-v2.zip (Unzipped Files)/Celeb-synthesis'

# Output folder for extracted frames
output_frames_path = '/content/drive/MyDrive/Single_Extracted/Celeb-synthesis'
os.makedirs(output_frames_path, exist_ok=True)

# Number of frames to extract from each video
num_frames_to_extract = 1

# Iterate through all video files in the folder
for video_file in os.listdir(video_folder_path):
    if video_file.endswith(('.mp4', '.avi', '.mov')):  # Add other video formats if needed
        video_path = os.path.join(video_folder_path, video_file)
        cap = cv2.VideoCapture(video_path)

        frame_count = 0
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        step = max(1, total_frames // num_frames_to_extract)

        success, frame = cap.read()
        while success and frame_count < num_frames_to_extract:
            # Save frame as an image file
            frame_filename = f"{os.path.splitext(video_file)[0]}_frame{frame_count}.jpg"
            cv2.imwrite(os.path.join(output_frames_path, frame_filename), frame)

            # Move to the next frame based on the step
            cap.set(cv2.CAP_PROP_POS_FRAMES, step * (frame_count + 1))
            frame_count += 1
            success, frame = cap.read()

        cap.release()
        print(f"Extracted {frame_count} frames from {video_file}")

print("Frame extraction completed.")

In [None]:
#  Custom Dataset class to load frames
class FrameDataset(Dataset):
    def __init__(self, folder_path, transform=None):
        self.folder_path = folder_path
        self.transform = transform
        self.frames = [os.path.join(folder_path, frame) for frame in os.listdir(folder_path) if frame.endswith(".jpg")]

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
        img_path = self.frames[idx]
        image = Image.open(img_path)
        label = 1 if 'real' in img_path else 0  # Label as 1 for real, 0 for fake
        if self.transform:
            image = self.transform(image)
        return image, label

In [None]:
def oversample_dataset(real_dataset, fake_dataset):
    # Calculate the number of samples needed to balance the dataset
    real_count = len(real_dataset)
    fake_count = len(fake_dataset)

    # Create a list to hold the oversampled real images
    oversampled_real_frames = []

    # Add real images to the list until we reach the count of fake images
    while len(oversampled_real_frames) < fake_count:
        oversampled_real_frames.extend(real_dataset.frames)

    # Randomly shuffle the oversampled frames and trim to the exact number of fake images needed
    random.shuffle(oversampled_real_frames)
    oversampled_real_frames = oversampled_real_frames[:fake_count]

    # Create a new dataset with the oversampled real frames and original fake frames
    combined_frames = oversampled_real_frames + fake_dataset.frames

    # Create labels for the combined dataset (1 for real, 0 for fake)
    combined_labels = [1] * len(oversampled_real_frames) + [0] * len(fake_dataset.frames)

    return combined_frames, combined_labels

# Data transforms (e.g., resizing and normalization)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load datasets for real and fake frames
real_dataset = FrameDataset(folder_path="/content/drive/MyDrive/Colab Notebooks/Single_Extracted/Celeb-real", transform=transform)
fake_dataset = FrameDataset(folder_path="/content/drive/MyDrive/Colab Notebooks/Single_Extracted/Celeb-synthesis", transform=transform)

# Perform oversampling on the real dataset to match the size of the fake dataset
combined_frames, combined_labels = oversample_dataset(real_dataset, fake_dataset)

In [None]:
# Create a new dataset class to hold the combined data
class CombinedDataset(Dataset):
    def __init__(self, frames, labels, transform=None):
        self.frames = frames
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
        img_path = self.frames[idx]
        image = Image.open(img_path)
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Create a new combined dataset with oversampling applied
combined_dataset = CombinedDataset(combined_frames, combined_labels, transform=transform)

# Dataset splitting: 70% training, 15% validation, 15% testing
total_length = len(combined_dataset)
train_len = int(0.7 * total_length)
val_len = int(0.15 * total_length)
test_len = total_length - train_len - val_len

train_dataset, val_dataset, test_dataset = random_split(combined_dataset, [train_len, val_len, test_len])

# Create DataLoaders for training, validation, and testing
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True,num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False,num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False,num_workers=2)

In [None]:
# Define the model architecture with shared backbone and separate branches (same as before)
class DeepFakeDetector(nn.Module):
    def __init__(self, pretrained=True):
        super(DeepFakeDetector, self).__init__()

        resnet = models.resnet50(pretrained=pretrained)
        self.shared_backbone = nn.Sequential(*(list(resnet.children())[:-2]))

        # Eye Detection Branch
        self.eye_conv = nn.Sequential(
            nn.Conv2d(2048, 512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.eye_fc = nn.Linear(512 * 3 * 3, 256)
        self.eye_classifier = nn.Linear(256, 2)

        # Nose Detection Branch
        self.nose_conv = nn.Sequential(
            nn.Conv2d(2048, 512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.nose_fc = nn.Linear(512 * 3 * 3, 256)
        self.nose_classifier = nn.Linear(256, 2)

    def forward(self, x):
        features = self.shared_backbone(x)
        eye_features = self.eye_conv(features)
        eye_features = eye_features.view(eye_features.size(0), -1)
        eye_output = self.eye_classifier(self.eye_fc(eye_features))
        nose_features = self.nose_conv(features)
        nose_features = nose_features.view(nose_features.size(0), -1)
        nose_output = self.nose_classifier(self.nose_fc(nose_features))

        final_output = (eye_output + nose_output) / 2
        return final_output

In [None]:


def train_model(model, optimizer, criterion, num_epochs=2, save_path="model_checkpoint.pth"):
    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        # Training phase
        for batch in train_loader:
            inputs, labels = batch

            if torch.cuda.is_available():
                inputs, labels = inputs.cuda(), labels.cuda()

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            del inputs, labels, outputs, loss

        running_loss /= len(train_loader)

        # Evaluation phase
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for val_batch in val_loader:
                inputs_val, labels_val = val_batch

                if torch.cuda.is_available():
                    inputs_val, labels_val = inputs_val.cuda(), labels_val.cuda()

                outputs_val = model(inputs_val)
                loss_val = criterion(outputs_val, labels_val)
                val_loss += loss_val.item()

        val_loss /= len(val_loader)

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {running_loss:.4f}, Val Loss: {val_loss:.4f}")

        # Save model checkpoint after evaluation
        checkpoint = {
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': val_loss,
        }
        torch.save(checkpoint, save_path)
        print(f"Model saved at epoch {epoch+1}")

# Assuming DeepFakeDetector is the model class
model = DeepFakeDetector(pretrained=True)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)
criterion = nn.CrossEntropyLoss()

if torch.cuda.is_available():
    model.cuda()

train_model(model=model, optimizer=optimizer, criterion=criterion, num_epochs=20, save_path="/content/drive/MyDrive/Colab Notebooks/deepfake_detector_checkpoint.pth")

In [None]:
def load_model_checkpoint(model, optimizer, checkpoint_path):

    checkpoint = torch.load(checkpoint_path)

    model.load_state_dict(checkpoint['model_state_dict'])

    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    epoch = checkpoint['epoch']
    val_loss = checkpoint['loss']

    print(f"Checkpoint loaded. Resuming from epoch {epoch} with validation loss {val_loss:.4f}")
    print(f"Model state dict keys: {model.state_dict().keys()}")

    return model, optimizer, epoch, val_loss

model, optimizer, start_epoch, best_val_loss = load_model_checkpoint(model, optimizer, checkpoint_path)

In [None]:
import torch
import numpy as np
import time
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc, precision_recall_curve
import matplotlib.pyplot as plt
import seaborn as sns

def evaluate_model_with_timing(model, data_loader):
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []
    total_time = 0.0
    total_images = 0

    with torch.no_grad():
        for batch in data_loader:
            inputs, labels = batch
            batch_size = inputs.size(0)
            total_images += batch_size

            if torch.cuda.is_available():
                inputs, labels = inputs.cuda(), labels.cuda()

            # Measure the time taken for each batch prediction
            start_time = time.time()
            outputs = model(inputs)
            end_time = time.time()

            # Update total time
            total_time += (end_time - start_time)

            _, preds = torch.max(outputs, 1)
            probs = torch.softmax(outputs, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    # Calculate average time per batch and per image
    avg_time_per_batch = total_time / len(data_loader)
    avg_time_per_image = total_time / total_images

    return np.array(all_preds), np.array(all_labels), np.array(all_probs), total_time, avg_time_per_batch, avg_time_per_image

def plot_confusion_matrix(cm, classes, title='Confusion Matrix'):
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=classes, yticklabels=classes)
    plt.title(title)
    plt.ylabel('Actual Label')
    plt.xlabel('Predicted Label')
    plt.show()

def plot_roc_curve(y_true, y_scores):
    fpr, tpr, _ = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    plt.show()

def plot_precision_recall_curve(y_true, y_scores):
    precision, recall, _ = precision_recall_curve(y_true, y_scores)

    plt.figure(figsize=(8, 6))
    plt.plot(recall, precision, color='b', lw=2)
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.show()

# Get predictions, ground truth, and timing information
preds, labels, probs, total_time, avg_time_per_batch, avg_time_per_image = evaluate_model_with_timing(model, test_loader)

# Accuracy, Precision, Recall, F1-Score
accuracy = accuracy_score(labels, preds)
precision = precision_score(labels, preds, average='weighted')
recall = recall_score(labels, preds, average='weighted')
f1 = f1_score(labels, preds, average='weighted')

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")

# Print timing information
print(f"Total time taken for predictions: {total_time:.4f} seconds")
print(f"Average time per batch: {avg_time_per_batch:.4f} seconds")
print(f"Average time per image: {avg_time_per_image:.6f} seconds")

# Confusion Matrix
cm = confusion_matrix(labels, preds)
plot_confusion_matrix(cm, classes=['Fake', 'Real'])

# ROC and Precision-Recall curves (for binary classification)
if len(np.unique(labels)) == 2:  # Check if it's binary classification
    y_scores = probs[:, 1]  # Get probabilities for class 1
    plot_roc_curve(labels, y_scores)
    plot_precision_recall_curve(labels, y_scores)
else:
    print("ROC and Precision-Recall curves are only plotted for binary classification.")