In [None]:
!pip3 install face_recognition

In [None]:
# Checking if videos are corrupted or not
import glob
import torch
import torchvision
from torchvision import transforms
import os
import numpy as np
import cv2
import random

def validate_video(vid_path, train_transforms):
      transform = train_transforms
      count = 20
      video_path = vid_path
      frames = []
      a = int(100/count)
      first_frame = np.random.randint(0,a)
      vid_name = video_path.split('/')[-1]
      for i, frame in enumerate(frame_extract(video_path)):
        frames.append(transform(frame))
        if(len(frames) == count):
          break
      frames = torch.stack(frames)
      frames = frames[:count]
      return frames

def frame_extract(path):
  cap = cv2.VideoCapture(path) 
  status = 1
  while status:
      status, frame = cap.read()
      if status:
          yield frame

im_size = 112
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

train_transforms = transforms.Compose([transforms.ToPILImage(),transforms.Resize((im_size,im_size)),transforms.ToTensor(),transforms.Normalize(mean,std)])

video_fil = glob.glob('./dataset/videos/*.mp4')
video_fil += glob.glob('./dataset/deepfakes/*.mp4')
video_fil += glob.glob('./dataset/real_videos/*.mp4')

print("Total videos found:", len(video_fil))

valid_count = 0
corrupted_list = []
for i in video_fil:
  try:
    valid_count += 1
    validate_video(i, train_transforms)
  except:
    print("Processed:", valid_count, "Remaining:", (len(video_fil) - valid_count))
    print("Corrupted video:", i)
    corrupted_list.append(i)
    continue

print("Total corrupted videos:", len(corrupted_list))

In [None]:
import json
import glob
import numpy as np
import cv2
import copy
import random

video_files = glob.glob('./dataset/videos/*.mp4')
video_files += glob.glob('./dataset/deepfakes/*.mp4')
video_files += glob.glob('./dataset/real_videos/*.mp4')

random.shuffle(video_files)

frame_count = []
valid_videos = []

for video_file in video_files:
  cap = cv2.VideoCapture(video_file)
  frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  
  if frames < 100:
    continue
    
  frame_count.append(frames)
  valid_videos.append(video_file)
  cap.release()

print("Frame counts:", frame_count)
print("Total valid videos:", len(frame_count))
print('Average frames per video:', np.mean(frame_count))
print('Min frames:', min(frame_count))
print('Max frames:', max(frame_count))

In [None]:
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt

class VideoDataset(Dataset):
    def __init__(self, video_names, labels, sequence_length=60, transform=None):
        self.video_names = video_names
        self.labels = labels
        self.transform = transform
        self.count = sequence_length
    
    def __len__(self):
        return len(self.video_names)
    
    def __getitem__(self, idx):
        video_path = self.video_names[idx]
        frames = []
        
        a = int(100/self.count)
        first_frame = np.random.randint(0, a)
        
        video_name = video_path.split('/')[-1]
        label_idx = self.labels.loc[self.labels["file"] == video_name].index.values[0]
        label_text = self.labels.iloc[label_idx, 1]
        
        if label_text == 'FAKE':
            label = 0
        elif label_text == 'REAL':
            label = 1
        
        for i, frame in enumerate(self.frame_extract(video_path)):
            frames.append(self.transform(frame))
            if len(frames) == self.count:
                break
        
        frames = torch.stack(frames)
        frames = frames[:self.count]
        
        return frames, label
    
    def frame_extract(self, path):
        cap = cv2.VideoCapture(path) 
        status = 1
        while status:
            status, frame = cap.read()
            if status:
                yield frame
        cap.release()

def display_frame(tensor):
    image = tensor.cpu().numpy().transpose(1, 2, 0)
    b, g, r = cv2.split(image)
    image = cv2.merge((r, g, b))
    
    image = image * [0.22803, 0.22145, 0.216989] + [0.43216, 0.394666, 0.37645]
    image = image * 255.0
    
    plt.imshow(image.astype(int))
    plt.axis('off')
    plt.show()

In [None]:
# Count the number of fake and real videos in dataset
import pandas as pd

def count_real_fake_videos(video_list, metadata_path='./dataset/metadata.csv'):
    header_list = ["file", "label"]
    labels_df = pd.read_csv(metadata_path, names=header_list)
    fake_count = 0
    real_count = 0
    unknown_count = 0
    
    for video_path in video_list:
        video_name = video_path.split('/')[-1]
        label_matches = labels_df.loc[labels_df["file"] == video_name]
        
        if len(label_matches) > 0:
            label_idx = label_matches.index.values[0]
            label = labels_df.iloc[label_idx, 1]
            
            if label == 'FAKE':
                fake_count += 1
            elif label == 'REAL':
                real_count += 1
            else:
                unknown_count += 1
        else:
            # Video not found in metadata
            unknown_count += 1
    
    print(f"Real videos: {real_count}")
    print(f"Fake videos: {fake_count}")
    
    if unknown_count > 0:
        print(f"Unknown labels: {unknown_count}")
        
    return real_count, fake_count

In [None]:
# Load labels and videos
import random
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
from torchvision import transforms

header_list = ["file", "label"]
labels = pd.read_csv('./dataset/metadata.csv', names=header_list)

train_videos = video_files[:int(0.8*len(video_files))]
valid_videos = video_files[int(0.8*len(video_files)):]

print("Training set size:", len(train_videos))
print("Validation set size:", len(valid_videos))

train_real, train_fake = count_real_fake_videos(train_videos)
valid_real, valid_fake = count_real_fake_videos(valid_videos)

print(f"TRAINING SET: Real: {train_real}, Fake: {train_fake}, Total: {train_real + train_fake}")
print(f"VALIDATION SET: Real: {valid_real}, Fake: {valid_fake}, Total: {valid_real + valid_fake}")

im_size = 112
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((im_size, im_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

test_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((im_size, im_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

train_dataset = VideoDataset(
    video_names=train_videos,
    labels=labels,
    sequence_length=10,
    transform=train_transforms
)

valid_dataset = VideoDataset(
    video_names=valid_videos,
    labels=labels,
    sequence_length=10,
    transform=test_transforms
)

train_loader = DataLoader(
    train_dataset,
    batch_size=4,
    shuffle=True,
    num_workers=4
)

valid_loader = DataLoader(
    valid_dataset,
    batch_size=4,
    shuffle=False,
    num_workers=4
)

if len(train_dataset) > 0:
    sample_frames, sample_label = train_dataset[0]
    display_frame(sample_frames[0])
    print(f"Label: {'REAL' if sample_label == 1 else 'FAKE'}")

In [None]:
import torch
from torch import nn
from torchvision import models

class DeepfakeDetector(nn.Module):
    def __init__(self, num_classes=2, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False):
        super(DeepfakeDetector, self).__init__()
        backbone = models.resnext50_32x4d(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(backbone.children())[:-2])
        self.lstm = nn.LSTM(
            input_size=latent_dim,
            hidden_size=hidden_dim,
            num_layers=lstm_layers,
            batch_first=True,
            bidirectional=bidirectional
        )
        self.relu = nn.LeakyReLU()
        self.dropout = nn.Dropout(0.4)
        self.classifier = nn.Linear(hidden_dim, num_classes)
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        
    def forward(self, x):
        batch_size, seq_length, c, h, w = x.shape
        x = x.view(batch_size * seq_length, c, h, w)
        feature_maps = self.feature_extractor(x)
        x = self.avgpool(feature_maps)
        x = x.view(batch_size, seq_length, 2048)
        x_lstm, _ = self.lstm(x, None)
        prediction = self.classifier(self.dropout(torch.mean(x_lstm, dim=1)))
        return feature_maps, prediction

In [None]:
model = Model(2).cuda()
a,b = model(torch.from_numpy(np.empty((1,20,3,112,112))).type(torch.cuda.FloatTensor))

In [None]:
import torch
from torch.autograd import Variable
import time
import os
import sys
import os
def train_epoch(epoch, num_epochs, data_loader, model, criterion, optimizer):
    model.train()
    losses = AverageMeter()
    accuracies = AverageMeter()
    t = []
    for i, (inputs, targets) in enumerate(data_loader):
        if torch.cuda.is_available():
            targets = targets.type(torch.cuda.LongTensor)
            inputs = inputs.cuda()
        _,outputs = model(inputs)
        loss  = criterion(outputs,targets.type(torch.cuda.LongTensor))
        acc = calculate_accuracy(outputs, targets.type(torch.cuda.LongTensor))
        losses.update(loss.item(), inputs.size(0))
        accuracies.update(acc, inputs.size(0))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        sys.stdout.write(
                "\r[Epoch %d/%d] [Batch %d / %d] [Loss: %f, Acc: %.2f%%]"
                % (
                    epoch,
                    num_epochs,
                    i,
                    len(data_loader),
                    losses.avg,
                    accuracies.avg))
    torch.save(model.state_dict(),'/content/checkpoint.pt')
    return losses.avg,accuracies.avg
def test(epoch,model, data_loader ,criterion):
    print('Testing')
    model.eval()
    losses = AverageMeter()
    accuracies = AverageMeter()
    pred = []
    true = []
    count = 0
    with torch.no_grad():
        for i, (inputs, targets) in enumerate(data_loader):
            if torch.cuda.is_available():
                targets = targets.cuda().type(torch.cuda.FloatTensor)
                inputs = inputs.cuda()
            _,outputs = model(inputs)
            loss = torch.mean(criterion(outputs, targets.type(torch.cuda.LongTensor)))
            acc = calculate_accuracy(outputs,targets.type(torch.cuda.LongTensor))
            _,p = torch.max(outputs,1) 
            true += (targets.type(torch.cuda.LongTensor)).detach().cpu().numpy().reshape(len(targets)).tolist()
            pred += p.detach().cpu().numpy().reshape(len(p)).tolist()
            losses.update(loss.item(), inputs.size(0))
            accuracies.update(acc, inputs.size(0))
            sys.stdout.write(
                    "\r[Batch %d / %d]  [Loss: %f, Acc: %.2f%%]"
                    % (
                        i,
                        len(data_loader),
                        losses.avg,
                        accuracies.avg
                        )
                    )
        print('\nAccuracy {}'.format(accuracies.avg))
    return true,pred,losses.avg,accuracies.avg

class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()
    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count
        
def calculate_accuracy(outputs, targets):
    batch_size = targets.size(0)

    _, pred = outputs.topk(1, 1, True)
    pred = pred.t()
    correct = pred.eq(targets.view(1, -1))
    n_correct_elems = correct.float().sum().item()
    return 100* n_correct_elems / batch_size

In [None]:
import torch
import time
import os
import sys
from sklearn.metrics import confusion_matrix, classification_report

def train_epoch(epoch, num_epochs, data_loader, model, criterion, optimizer, checkpoint_path='./checkpoints'):
    model.train()
    losses = AverageMeter()
    accuracies = AverageMeter()
    os.makedirs(checkpoint_path, exist_ok=True)
    
    for i, (inputs, targets) in enumerate(data_loader):
        # Move data to GPU if available
        if torch.cuda.is_available():
            targets = targets.type(torch.cuda.LongTensor)
            inputs = inputs.cuda()
        
        _, outputs = model(inputs)
        loss = criterion(outputs, targets)
        
        acc = calculate_accuracy(outputs, targets)
        
        losses.update(loss.item(), inputs.size(0))
        accuracies.update(acc, inputs.size(0))
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        sys.stdout.write(
            "\r[Epoch %d/%d] [Batch %d/%d] [Loss: %.4f, Acc: %.2f%%]"
            % (
                epoch,
                num_epochs,
                i,
                len(data_loader),
                losses.avg,
                accuracies.avg
            )
        )
    
    checkpoint_file = os.path.join(checkpoint_path, f'model_epoch_{epoch}.pt')
    torch.save(model.state_dict(), checkpoint_file)
    print(f"\nCheckpoint saved to {checkpoint_file}")
    
    return losses.avg, accuracies.avg

def validate(epoch, model, data_loader, criterion):
    
    print('\nValidating model...')
    
    model.eval()
    losses = AverageMeter()
    accuracies = AverageMeter()
    
    # Lists to store predictions and true labels
    true_labels = []
    predictions = []
    
    # Validation loop
    with torch.no_grad():
        for i, (inputs, targets) in enumerate(data_loader):
            # Move data to GPU if available
            if torch.cuda.is_available():
                targets = targets.cuda().type(torch.cuda.LongTensor)
                inputs = inputs.cuda()
            
            # Forward pass
            _, outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            # Calculate accuracy
            acc = calculate_accuracy(outputs, targets)
            
            # Get predictions
            _, preds = torch.max(outputs, 1)
            
            # Store true labels and predictions
            true_labels.extend(targets.cpu().numpy().tolist())
            predictions.extend(preds.cpu().numpy().tolist())
            
            # Update metrics
            losses.update(loss.item(), inputs.size(0))
            accuracies.update(acc, inputs.size(0))
            
            # Print progress
            sys.stdout.write(
                "\r[Batch %d/%d] [Loss: %.4f, Acc: %.2f%%]"
                % (
                    i,
                    len(data_loader),
                    losses.avg,
                    accuracies.avg
                )
            )
    
    print(f'\nValidation Accuracy: {accuracies.avg:.2f}%')
    
    return true_labels, predictions, losses.avg, accuracies.avg

class AverageMeter(object):    
    def __init__(self):
        self.reset()
        
    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0
        
    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

def calculate_accuracy(outputs, targets):
    batch_size = targets.size(0)
    _, pred = outputs.topk(1, 1, True)
    pred = pred.t()
    correct = pred.eq(targets.view(1, -1))
    n_correct_elems = correct.float().sum().item()
    return 100 * n_correct_elems / batch_size

def print_metrics(true_labels, predictions):
    print("\nDetailed Classification Report:")
    print(classification_report(true_labels, predictions, target_names=['FAKE', 'REAL']))
    
    cm = confusion_matrix(true_labels, predictions)
    print("\nConfusion Matrix:")
    print(cm)
    
    tn, fp, fn, tp = cm.ravel()
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    
    print(f"\nPrecision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

In [None]:
def plot_loss(train_loss_avg,test_loss_avg,num_epochs):
  loss_train = train_loss_avg
  loss_val = test_loss_avg
  print(num_epochs)
  epochs = range(1,num_epochs+1)
  plt.plot(epochs, loss_train, 'g', label='Training loss')
  plt.plot(epochs, loss_val, 'b', label='validation loss')
  plt.title('Training and Validation loss')
  plt.xlabel('Epochs')
  plt.ylabel('Loss')
  plt.legend()
  plt.show()
  
def plot_accuracy(train_accuracy,test_accuracy,num_epochs):
  loss_train = train_accuracy
  loss_val = test_accuracy
  epochs = range(1,num_epochs+1)
  plt.plot(epochs, loss_train, 'g', label='Training accuracy')
  plt.plot(epochs, loss_val, 'b', label='validation accuracy')
  plt.title('Training and Validation accuracy')
  plt.xlabel('Epochs')
  plt.ylabel('Accuracy')
  plt.legend()
  plt.show()