<a href="https://colab.research.google.com/github/Adi-204/DeepFake/blob/main/DeepFake_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%matplotlib inline
import os
import torch
from torch.utils.data import Dataset, DataLoader, random_split, ConcatDataset
from torchvision import transforms, models
from torch import nn, optim
from PIL import Image
import random
import torch.nn.functional as F

In [None]:
import cv2
import os

# Path to the folder containing video files in your Google Drive
video_folder_path = '/content/drive/MyDrive/Celeb-DF-v2.zip (Unzipped Files)/Celeb-synthesis'

# Output folder for extracted frames
output_frames_path = '/content/drive/MyDrive/Single_Extracted/Celeb-synthesis'
os.makedirs(output_frames_path, exist_ok=True)

# Number of frames to extract from each video
num_frames_to_extract = 1

# Iterate through all video files in the folder
for video_file in os.listdir(video_folder_path):
    if video_file.endswith(('.mp4', '.avi', '.mov')):  # Add other video formats if needed
        video_path = os.path.join(video_folder_path, video_file)
        cap = cv2.VideoCapture(video_path)

        frame_count = 0
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        step = max(1, total_frames // num_frames_to_extract)

        success, frame = cap.read()
        while success and frame_count < num_frames_to_extract:
            # Save frame as an image file
            frame_filename = f"{os.path.splitext(video_file)[0]}_frame{frame_count}.jpg"
            cv2.imwrite(os.path.join(output_frames_path, frame_filename), frame)

            # Move to the next frame based on the step
            cap.set(cv2.CAP_PROP_POS_FRAMES, step * (frame_count + 1))
            frame_count += 1
            success, frame = cap.read()

        cap.release()
        print(f"Extracted {frame_count} frames from {video_file}")

print("Frame extraction completed.")

In [None]:
pip install dlib opencv-python imutils

In [None]:
!pip install wget  # Install the wget module
import wget
import os

# Download if file doesn't exist
if not os.path.exists('shape_predictor_68_face_landmarks.dat'):
    print("Downloading facial landmark predictor...")
    wget.download('http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2')
    !bzip2 -d shape_predictor_68_face_landmarks.dat.bz2
else:
    print("Facial landmark predictor file already exists")

In [None]:
import cv2
import dlib
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from imutils import face_utils

def detect_and_crop_features(image_path):
    # Initialize dlib's face detector and facial landmarks predictor
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat')

    # Read the image
    img = cv2.imread(image_path)
    if img is None:
        raise ValueError("Unable to load image")

    # Convert to RGB for display
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    # Initialize dictionary to store crops
    crops = {}

    # Detect faces
    faces = detector(img_rgb)

    if len(faces) == 0:
        print("No faces detected in the image")
        return None

    # Process first face found
    face = faces[0]

    # Get facial landmarks
    landmarks = predictor(img_rgb, face)
    landmarks = face_utils.shape_to_np(landmarks)

    # Define facial feature indices
    FACIAL_LANDMARKS_IDXS = {
        "left_eye": (36, 42),
        "right_eye": (42, 48),
        "nose": (27, 36),
        "face": (0, 68)
    }

    # Function to get bounding box with padding
    def get_bbox_with_padding(points, padding_percent=10):
        x = points[:, 0]
        y = points[:, 1]

        # Calculate bounding box
        x1, y1 = np.min(x), np.min(y)
        x2, y2 = np.max(x), np.max(y)

        # Calculate padding
        width = x2 - x1
        height = y2 - y1
        padding_x = int(width * padding_percent / 100)
        padding_y = int(height * padding_percent / 100)

        # Apply padding
        x1 = max(0, x1 - padding_x)
        y1 = max(0, y1 - padding_y)
        x2 = min(img_rgb.shape[1], x2 + padding_x)
        y2 = min(img_rgb.shape[0], y2 + padding_y)

        return (x1, y1, x2, y2)

    # Crop face
    face_box = (face.left(), face.top(), face.right(), face.bottom())
    face_img = Image.fromarray(img_rgb).crop(face_box)
    crops['face'] = face_img

    # Get both eyes in one crop
    left_eye_points = landmarks[36:42]
    right_eye_points = landmarks[42:48]
    eyes_points = np.vstack((left_eye_points, right_eye_points))
    eyes_bbox = get_bbox_with_padding(eyes_points, padding_percent=30)
    crops['eyes'] = Image.fromarray(img_rgb).crop(eyes_bbox)

    # Get nose
    nose_points = landmarks[27:36]
    nose_bbox = get_bbox_with_padding(nose_points, padding_percent=20)
    crops['nose'] = Image.fromarray(img_rgb).crop(nose_bbox)

    return crops

def display_crops(crops):
    if crops is None:
        print("No crops to display")
        return

    plt.figure(figsize=(15, 5))
    for idx, (feature, crop_img) in enumerate(crops.items(), 1):
        plt.subplot(1, 3, idx)
        plt.imshow(crop_img)
        plt.title(f"Cropped {feature}")
        plt.axis('off')
    plt.tight_layout()
    plt.show()

# Example usage
try:
    # Replace with your image path
    image_path = '/content/drive/MyDrive/Colab Notebooks/Single_Extracted/Celeb-real/00066_frame0.jpg'

    # Detect and crop facial features
    crops = detect_and_crop_features(image_path)

    # Display results
    display_crops(crops)

except Exception as e:
    print(f"Error: {str(e)}")

In [None]:
class DeepFakeDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Validate data directory
        if not os.path.isdir(data_dir):
            raise ValueError(f"Invalid directory: {data_dir}")

        # Collect image paths and labels
        for filename in os.listdir(data_dir):
            if filename.lower().endswith((".jpg", ".png", ".jpeg")):
                full_path = os.path.join(data_dir, filename)
                # Validate image file
                try:
                    with Image.open(full_path) as img:
                        img.verify()
                    self.image_paths.append(full_path)
                    label = 1 if "real" in filename.lower() else 0
                    self.labels.append(label)
                except Exception as e:
                    print(f"Skipping invalid image {full_path}: {str(e)}")

        if not self.image_paths:
            raise ValueError(f"No valid images found in {data_dir}")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]

        try:
            # Detect and crop features
            crops = detect_and_crop_features(img_path)

            if crops is None:
                # Fallback to original image if feature detection fails
                img = Image.open(img_path)

                if self.transform:
                    img = self.transform(img)

                return img, img, img, self.labels[idx]

            # Apply transforms if specified
            if self.transform:
                face_img = self.transform(crops.get('face'))
                eye_img = self.transform(crops.get('eyes'))
                nose_img = self.transform(crops.get('nose'))
            else:
                face_img = crops.get('face')
                eye_img = crops.get('eyes')
                nose_img = crops.get('nose')

            return eye_img, nose_img, face_img, self.labels[idx]

        except Exception as e:
            print(f"Error in __getitem__: {str(e)}")
            # Return black placeholder images
            placeholder = torch.zeros(3, 224, 224)
            return placeholder, placeholder, placeholder, self.labels[idx]


In [None]:
def oversample_dataset(real_dataset, fake_dataset):
    # Validate inputs
    if not isinstance(real_dataset, DeepFakeDataset) or not isinstance(fake_dataset, DeepFakeDataset):
        raise ValueError("Inputs must be DeepFakeDataset instances")

    real_count = len(real_dataset)
    fake_count = len(fake_dataset)

    # Oversample real frames to match fake frames
    oversampled_real_frames = []
    while len(oversampled_real_frames) < fake_count:
        oversampled_real_frames.extend(real_dataset.image_paths)

    # Shuffle and trim to exact count
    random.shuffle(oversampled_real_frames)
    oversampled_real_frames = oversampled_real_frames[:fake_count]

    # Combine frames and labels
    combined_frames = oversampled_real_frames + fake_dataset.image_paths
    combined_labels = [1] * len(oversampled_real_frames) + [0] * len(fake_dataset.image_paths)

    return combined_frames, combined_labels

# Data transforms (e.g., resizing and normalization)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load datasets for real and fake frames
real_dataset = DeepFakeDataset(data_dir="/content/drive/MyDrive/Colab Notebooks/Single_Extracted/Celeb-real", transform=transform)
fake_dataset = DeepFakeDataset(data_dir="/content/drive/MyDrive/Colab Notebooks/Single_Extracted/Celeb-synthesis", transform=transform)


# Perform oversampling on the real dataset to match the size of the fake dataset
combined_frames, combined_labels = oversample_dataset(real_dataset, fake_dataset)

print("\nDataset Structure:")
print(f"Total combined frames: {len(combined_frames)}")
print(f"Real frames (label 1): {combined_labels.count(1)}")
print(f"Fake frames (label 0): {combined_labels.count(0)}")
print("First 5 Combined Frames:")
for i in range(min(5, len(combined_frames))):
    print(f"{i+1}. {os.path.basename(combined_frames[i])} - Label: {combined_labels[i]}")

In [None]:
class CombinedDataset(Dataset):
    def __init__(self, frames, labels, transform=None):
        self.frames = frames
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
      try:
        img_path = self.frames[idx]
        image = Image.open(img_path)
        label = self.labels[idx]

        crops = detect_and_crop_features(img_path)

        if crops is None:
            # Handle the case where crops is None, e.g., skip this sample or raise an error
          print(f"Warning: crops is None for index {idx}. Skipping this sample.")
          return None # Or raise an exception, depending on your desired behavior


        if crops.get('face') is None or crops.get('eyes') is None or crops.get('nose') is None:
            print(f"Warning: Skipping index {idx} due to missing data.")
            return None  # Skip this sample (can result in imbalanced data)

        if self.transform:
            image = self.transform(image)
            face_img = self.transform(crops.get('face'))
            eye_img = self.transform(crops.get('eyes'))
            nose_img = self.transform(crops.get('nose'))


        label = self.labels[idx]
        return eye_img, nose_img, face_img, label

      except Exception as e:
        print(f"Error at index {idx}: {e}")
        return None, None, None, None

# Create a new combined dataset with oversampling applied
combined_dataset = CombinedDataset(combined_frames, combined_labels, transform=transform)

# Dataset splitting: 70% training, 15% validation, 15% testing
total_length = len(combined_dataset)
train_len = int(0.7 * total_length)
val_len = int(0.15 * total_length)
test_len = total_length - train_len - val_len

train_dataset, val_dataset, test_dataset = random_split(combined_dataset, [train_len, val_len, test_len])

# Create DataLoaders for training, validation, and testing
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

num_images_to_display = 5

for i in range(num_images_to_display):
    eye_img, nose_img, face_img, label = combined_dataset[i]

    # Function to denormalize and display an image
    def display_image(img, title):
        img_np = img.cpu().numpy().transpose(1, 2, 0)
        img_np = (img_np * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406]))
        img_np = np.clip(img_np, 0, 1)
        plt.imshow(img_np)
        plt.title(title)
        plt.axis('off')  # Turn off axis labels and ticks
        plt.show()

    # Display eye_img
    display_image(eye_img, f"Eye - Label: {label}")

    # Display nose_img
    display_image(nose_img, f"Nose - Label: {label}")

    # Display face_img
    display_image(face_img, f"Face - Label: {label}")

In [None]:
resnet = models.resnet50(pretrained=True)

# Backbone model with branches for eye, nose, and face detection
class BackboneWithBranches(nn.Module):
    def __init__(self):
        super(BackboneWithBranches, self).__init__()
        self.shared_backbone = nn.Sequential(*(list(resnet.children())[:-2]))

        # Eye Detection Branch
        self.eye_conv = nn.Sequential(
            nn.Conv2d(2048, 512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.eye_fc = nn.Sequential(
                nn.Linear(512 * 3 * 3, 256),
                nn.ReLU(),
                nn.Dropout(0.5)
        )
        self.eye_classifier = nn.Linear(256, 2)

        # Nose Detection Branch
        self.nose_conv = nn.Sequential(
            nn.Conv2d(2048, 512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.nose_fc = nn.Sequential(
                nn.Linear(512 * 3 * 3, 256),
                nn.ReLU(),
                nn.Dropout(0.5)
        )
        self.nose_classifier = nn.Linear(256, 2)

        # Face Detection Branch
        self.face_conv = nn.Sequential(
            nn.Conv2d(2048, 512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.face_fc = nn.Sequential(
                nn.Linear(512 * 3 * 3, 256),
                nn.ReLU(),
                nn.Dropout(0.5)
        )
        self.face_classifier = nn.Linear(256, 2)

    def forward(self, eye_img, nose_img, face_img):
        # Feature extraction for eye
        eye_features = self.shared_backbone(eye_img)
        eye_features = self.eye_conv(eye_features)
        eye_features = eye_features.view(eye_features.size(0), -1)  # Flatten
        eye_output = self.eye_classifier(self.eye_fc(eye_features))

        # Feature extraction for nose
        nose_features = self.shared_backbone(nose_img)
        nose_features = self.nose_conv(nose_features)
        nose_features = nose_features.view(nose_features.size(0), -1)
        nose_output = self.nose_classifier(self.nose_fc(nose_features))

        # Feature extraction for face
        face_features = self.shared_backbone(face_img)
        face_features = self.face_conv(face_features)
        face_features = face_features.view(face_features.size(0), -1)
        face_output = self.face_classifier(self.face_fc(face_features))

        return eye_output, nose_output, face_output


In [None]:
def majority_voting(eye_output, nose_output, face_output):
    outputs = torch.stack([eye_output, nose_output, face_output], dim=0)
    final_output = outputs.mean(dim=0)
    return final_output

In [None]:
pip install --upgrade torch ultralytics

In [None]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)


def train(model, train_loader, criterion, optimizer, num_epochs=10, val_loader=None, save_path="model_checkpoint.pth"):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for batch_idx, (eye_img, nose_img, face_img, labels) in enumerate(train_loader):
            if eye_img is None or nose_img is None or face_img is None or labels is None:
                print(f"Warning: Skipping batch {batch_idx} due to missing data.")
                continue  # Skip this batch and move to the next

            eye_img, nose_img, face_img, labels = eye_img.to(device), nose_img.to(device), face_img.to(device), labels.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            eye_output, nose_output, face_output = model(eye_img, nose_img, face_img)
            final_output = majority_voting(eye_output, nose_output, face_output)
            loss = criterion(final_output, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(final_output.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        running_loss /= len(train_loader)



        # Validation loop (if validation data is provided)
        if val_loader is not None:
            model.eval()
            val_loss = 0.0
            with torch.no_grad():
                for val_batch in val_loader:
                    eye_img_val, nose_img_val, face_img_val, labels_val = val_batch
                    eye_img_val, nose_img_val, face_img_val, labels_val = (
                        eye_img_val.to(device), nose_img_val.to(device), face_img_val.to(device), labels_val.to(device)
                    )

                    eye_output, nose_output, face_output = model(eye_img_val, nose_img_val, face_img_val)
                    final_output_val = majority_voting(eye_output, nose_output, face_output)
                    loss_val = criterion(final_output_val, labels_val)
                    val_loss += loss_val.item()
            val_loss /= len(val_loader)
            print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {running_loss:.4f}, Val Loss: {val_loss:.4f}")

        # Save model checkpoint after each epoch
        checkpoint = {
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': running_loss,
        }
        torch.save(checkpoint, save_path)
        print(f"Model saved at epoch {epoch+1} to {save_path}")

# Model, criterion, and optimizer setup
model = BackboneWithBranches()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.00005)
num_epochs = 3

# Start training with train_loader and val_loader
train(model, train_loader, criterion, optimizer, num_epochs, val_loader=val_loader, save_path="/content/drive/MyDrive/Colab Notebooks/deepfake_detector_check.pth")

In [None]:
def load_model_checkpoint(model, optimizer, checkpoint_path):

    checkpoint = torch.load(checkpoint_path)

    model.load_state_dict(checkpoint['model_state_dict'])

    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    epoch = checkpoint['epoch']
    val_loss = checkpoint['loss']

    print(f"Checkpoint loaded. Resuming from epoch {epoch} with validation loss {val_loss:.4f}")
    print(f"Model state dict keys: {model.state_dict().keys()}")

    return model, optimizer, epoch, val_loss

checkpoint_path = "/content/drive/MyDrive/Colab Notebooks/deepfake_detector_check.pth"
model, optimizer, start_epoch, best_val_loss = load_model_checkpoint(model, optimizer, checkpoint_path)

In [None]:
import torch
import numpy as np
import time
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc, precision_recall_curve
import matplotlib.pyplot as plt
import seaborn as sns

def evaluate_model_with_timing(model, data_loader):
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []
    total_time = 0.0
    total_images = 0

    with torch.no_grad():
        for batch in data_loader:
            eye_img, nose_img, face_img, labels = batch
            batch_size = labels.size(0)
            total_images += batch_size

            if torch.cuda.is_available():
                eye_img, nose_img, face_img, labels = (
                    eye_img.cuda(), nose_img.cuda(), face_img.cuda(), labels.cuda()
                )

            # Measure the time taken for each batch prediction
            start_time = time.time()
            eye_output, nose_output, face_output = model(eye_img, nose_img, face_img)
            final_output = majority_voting(eye_output, nose_output, face_output)
            end_time = time.time()

            # Update total time
            total_time += (end_time - start_time)

            _, preds = torch.max(final_output, 1)
            probs = torch.softmax(final_output, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    # Calculate average time per batch and per image
    avg_time_per_batch = total_time / len(data_loader)
    avg_time_per_image = total_time / total_images

    return np.array(all_preds), np.array(all_labels), np.array(all_probs), total_time, avg_time_per_batch, avg_time_per_image

def plot_confusion_matrix(cm, classes, title='Confusion Matrix'):
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=classes, yticklabels=classes)
    plt.title(title)
    plt.ylabel('Actual Label')
    plt.xlabel('Predicted Label')
    plt.show()

def plot_roc_curve(y_true, y_scores):
    fpr, tpr, _ = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    plt.show()

def plot_precision_recall_curve(y_true, y_scores):
    precision, recall, _ = precision_recall_curve(y_true, y_scores)

    plt.figure(figsize=(8, 6))
    plt.plot(recall, precision, color='b', lw=2)
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.show()

# Get predictions, ground truth, and timing information
preds, labels, probs, total_time, avg_time_per_batch, avg_time_per_image = evaluate_model_with_timing(model, test_loader)

# Accuracy, Precision, Recall, F1-Score
accuracy = accuracy_score(labels, preds)
precision = precision_score(labels, preds, average='weighted')
recall = recall_score(labels, preds, average='weighted')
f1 = f1_score(labels, preds, average='weighted')

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")

# Print timing information
print(f"Total time taken for predictions: {total_time:.4f} seconds")
print(f"Average time per batch: {avg_time_per_batch:.4f} seconds")
print(f"Average time per image: {avg_time_per_image:.6f} seconds")

# Confusion Matrix
cm = confusion_matrix(labels, preds)
plot_confusion_matrix(cm, classes=['Fake', 'Real'])

# ROC and Precision-Recall curves (for binary classification)
if len(np.unique(labels)) == 2:  # Check if it's binary classification
    y_scores = probs[:, 1]  # Get probabilities for class 1
    plot_roc_curve(labels, y_scores)
    plot_precision_recall_curve(labels, y_scores)
else:
    print("ROC and Precision-Recall curves are only plotted for binary classification.")
