In [27]:
import os
import glob
import random
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import seaborn as sn
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from facenet_pytorch import InceptionResnetV1

In [28]:
# 1. Dataset Class
class VideoDataset(Dataset):
    def __init__(self, video_paths, sequence_length=10, transform=None):
        self.video_paths = video_paths
        self.transform = transform
        self.sequence_length = sequence_length
        
    def __len__(self):
        return len(self.video_paths)
    
    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        # Determine label from directory name
        label = 1 if 'real' in video_path.lower() else 0
        
        frames = []
        cap = cv2.VideoCapture(video_path)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_indices = sorted(random.sample(range(frame_count), min(self.sequence_length, frame_count)))
        
        for frame_idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
            ret, frame = cap.read()
            if ret:
                if self.transform:
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frame = self.transform(frame)
                frames.append(frame)
        
        cap.release()
        
        # Pad sequence if necessary
        while len(frames) < self.sequence_length:
            frames.append(frames[-1])
            
        return torch.stack(frames), torch.tensor(label)

In [29]:
# 2. Model Class
class DeepFakeDetector(nn.Module):
    def __init__(self, num_classes=2):
        super(DeepFakeDetector, self).__init__()
        self.facenet = InceptionResnetV1(pretrained='vggface2')
        # Freeze FaceNet parameters
        for param in self.facenet.parameters():
            param.requires_grad = False
            
        self.lstm = nn.LSTM(512, 256, 2, batch_first=True)
        self.fc = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, x):
        batch_size, seq_length, c, h, w = x.shape
        x = x.view(batch_size * seq_length, c, h, w)
        
        # Get FaceNet embeddings
        embeddings = self.facenet(x)
        embeddings = embeddings.view(batch_size, seq_length, -1)
        
        # LSTM
        lstm_out, _ = self.lstm(embeddings)
        lstm_out = torch.mean(lstm_out, dim=1)
        
        # Classifier
        out = self.dropout(lstm_out)
        out = self.fc(out)
        return out

In [30]:
# 3. Training Functions
class AverageMeter:
    def __init__(self):
        self.reset()
        
    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0
        
    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

def train_epoch(model, dataloader, criterion, optimizer):
    model.train()
    losses = AverageMeter()
    accuracies = AverageMeter()
    
    for inputs, targets in dataloader:
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        
        # Calculate accuracy
        _, preds = torch.max(outputs, 1)
        acc = (preds == targets).float().mean()
        
        # Update metrics
        losses.update(loss.item(), inputs.size(0))
        accuracies.update(acc.item(), inputs.size(0))
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    return losses.avg, accuracies.avg

def validate(model, dataloader, criterion):
    model.eval()
    losses = AverageMeter()
    accuracies = AverageMeter()
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for inputs, targets in dataloader:
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            _, preds = torch.max(outputs, 1)
            acc = (preds == targets).float().mean()
            
            losses.update(loss.item(), inputs.size(0))
            accuracies.update(acc.item(), inputs.size(0))
            
            all_preds.extend(preds.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())
            
    return losses.avg, accuracies.avg, all_preds, all_targets

In [None]:
# 4. Plotting Functions
def plot_metrics(train_metrics, val_metrics, metric_name):
    plt.figure(figsize=(10, 6))
    plt.plot(train_metrics, label=f'Training {metric_name}')
    plt.plot(val_metrics, label=f'Validation {metric_name}')
    plt.title(f'Training and Validation {metric_name}')
    plt.xlabel('Epoch')
    plt.ylabel(metric_name)
    plt.legend()
    plt.show()

def plot_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    df_cm = pd.DataFrame(cm, ['Fake', 'Real'], ['Fake', 'Real'])
    sn.heatmap(df_cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

# Now let's create a function to load and prepare the data
def prepare_data(real_dir, fake_dir):
    # Get video paths
    real_videos = glob.glob(os.path.join(real_dir, "*.mp4"))
    fake_videos = glob.glob(os.path.join(fake_dir, "*.mp4"))
    all_videos = real_videos + fake_videos
    
    # Split data
    train_videos, val_videos = train_test_split(all_videos, test_size=0.2, random_state=42)
    
    # Define transforms
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((160, 160)),  # FaceNet required size
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
    
    # Create datasets
    train_dataset = VideoDataset(train_videos, transform=transform)
    val_dataset = VideoDataset(val_videos, transform=transform)
    
    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=0)
    
    return train_loader, val_loader

# Main training function
def train_model(train_loader, val_loader, num_epochs=20):
    # Initialize model and training components
    model = DeepFakeDetector()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    
    # Training loop
    train_losses = []
    train_accs = []
    val_losses = []
    val_accs = []
    
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        
        # Train
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        
        # Validate
        val_loss, val_acc, preds, targets = validate(model, val_loader, criterion)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    # Plot results
    plot_metrics(train_losses, val_losses, 'Loss')
    plot_metrics(train_accs, val_accs, 'Accuracy')
    plot_confusion_matrix(targets, preds)
    
    # Save model
    torch.save(model.state_dict(), 'deepfake_detector.pth')
    
    return model

# Usage example
if __name__ == "__main__":
    # Set your directories
    REAL_DIR = "D:/editor/DFD/process/real"  # Update this path
    FAKE_DIR = "D:/editor/DFD/process/fake"  # Update this path
    
    # Prepare data
    train_loader, val_loader = prepare_data(REAL_DIR, FAKE_DIR)
    
    # Train model
    model = train_model(train_loader, val_loader)

Epoch 1/20


In [None]:
def predict_video(model, video_path, sequence_length=10, transform=None):
    """
    Predict if a video is real or fake.

    Args:
        model (nn.Module): Trained deepfake detection model.
        video_path (str): Path to the video file.
        sequence_length (int): Number of frames to sample from the video.
        transform (torchvision.transforms.Compose): Transformations to apply to frames.

    Returns:
        str: "Real" or "Fake".
    """
    model.eval()  # Set model to evaluation mode

    # Load video
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Select frame indices to sample
    frame_indices = sorted(random.sample(range(frame_count), min(sequence_length, frame_count)))
    frames = []
    
    for frame_idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = cap.read()
        if ret:
            if transform:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = transform(frame)
            frames.append(frame)
    
    cap.release()
    
    # Pad sequence if necessary
    while len(frames) < sequence_length:
        frames.append(frames[-1])
    
    # Convert to tensor
    frames_tensor = torch.stack(frames).unsqueeze(0)  # Add batch dimension
    
    # Perform inference
    with torch.no_grad():
        outputs = model(frames_tensor)
        _, prediction = torch.max(outputs, 1)
    
    # Map prediction to label
    label_map = {0: "Fake", 1: "Real"}
    return label_map[prediction.item()]

In [None]:
if __name__ == "__main__":
    # Define file path
    video_path = "C:/Users/Dilshan/Desktop/11.mp4"
    
    # Define the same transformations used during training
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((160, 160)),  # FaceNet required size
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                             std=[0.229, 0.224, 0.225])
    ])
    
    # Load the trained model
    model = DeepFakeDetector()
    model.load_state_dict(torch.load("deepfake_detector.pth"))
    model.eval()
    
    # Predict
    result = predict_video(model, video_path, sequence_length=10, transform=transform)
    print(f"The video is predicted to be: {result}")
