In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install face_recognition

In [None]:
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
import face_recognition
import pandas as pd


class VideoDataset(Dataset):
    def __init__(self, video_names, labels, sequence_length=20, transform=None):
        """
        Initializes the VideoDataset.

        Args:
        - video_names (list): List of video file names.
        - labels (pandas.DataFrame): DataFrame containing video labels.
        - sequence_length (int): Length of the sequence to be extracted.
        - transform (callable): Optional transform to be applied on a sample.
        """
        self.video_names = video_names
        self.labels = labels
        self.transform = transform
        self.sequence_length = sequence_length

    def __len__(self):
        """
        Returns the length of the dataset.
        """
        return len(self.video_names)

    def __getitem__(self, idx):
      """
      Gets a sample from the dataset.

      Args:
      - idx (int): Index of the sample to retrieve.

      Returns:
      - frames (torch.Tensor): Extracted frames from the video.
      - label (int): Label of the video (0 for REAL, 1 for FAKE).
      """
      video_path = self.video_names[idx]
      # print("Processing video:", video_path)  # Add this line for debugging
      frames = []

      # Load frames from video
      for i, frame in enumerate(self.frame_extract(video_path)):
          frames.append(self.transform(frame))
          if len(frames) == self.sequence_length:
              break

      # Pad frames to ensure consistent sequence length
      while len(frames) < self.sequence_length:
          if len(frames) > 0:
              frames.append(torch.zeros_like(frames[0]))  # Pad with zeros
          else:
              # If frames is empty, just add a zero tensor of the appropriate shape
              frames.append(torch.zeros(3, 256, 256))  # Modify dimensions as needed

      frames = torch.stack(frames)
      frames = frames[:self.sequence_length]

      # Get label
      temp_video = video_path.split('/')[-1]
      label = self.labels.loc[self.labels["file"] == temp_video, "label"].values[0]
      label = 0 if label == 'REAL' else 1

      return frames, label


    def frame_extract(self, path):
        """
        Extracts frames from the video at the given path.

        Args:
        - path (str): Path to the video file.

        Yields:
        - image (numpy.ndarray): Extracted frame from the video.
        """
        vidObj = cv2.VideoCapture(path)
        success = 1

        while success:
            success, image = vidObj.read()
            if success:
                yield image

def im_plot(tensor):
    """
    Plots the image represented by the given tensor.

    Args:
    - tensor (torch.Tensor): Tensor representing the image.
    """
    image = tensor.cpu().numpy().transpose(1, 2, 0)
    b, g, r = cv2.split(image)
    image = cv2.merge((r, g, b))
    image = image * [0.22803, 0.22145, 0.216989] + [0.43216, 0.394666, 0.37645]
    image = image * 255.0
    plt.imshow(image.astype(int))
    plt.show()


In [None]:
import pandas as pd
path_csv = "/content/drive/MyDrive/metadata files/metadata-global.csv"
read_csv = pd.read_csv(path_csv)
print(read_csv.head(5))

In [None]:
#count the number of fake and real videos
def number_of_real_and_fake_videos(data_list):
  header_list = ["file","label", "split"]
  lab = pd.read_csv('/content/drive/MyDrive/metadata files/metadata-global.csv',names=header_list)
  fake = 0
  real = 0
  for i in data_list:
    temp_video = i.split('/')[-1]
    label = lab.iloc[(labels.loc[labels["file"] == temp_video].index.values[0]),1]
    if(label == 'FAKE'):
      fake+=1
    if(label == 'REAL'):
      real+=1
  return real,fake

In [None]:
import glob

# Get paths of train and test videos
train_videos = glob.glob('/content/drive/MyDrive/preprocessed dataset/train/*.mp4')
test_videos = glob.glob('/content/drive/MyDrive/preprocessed dataset/test/*.mp4')

In [None]:
print("train : " , len(train_videos))
print("test : " , len(test_videos))

In [None]:
import pandas as pd

# Define the header list for the DataFrame
header_list = ["file", "label", "split"]

# Read the metadata CSV file into a DataFrame
labels = pd.read_csv('/content/drive/MyDrive/metadata files/metadata-global.csv', names=header_list)

In [None]:
labels

In [None]:
print("TRAIN: ", "Real:",number_of_real_and_fake_videos(train_videos)[0]," Fake:",number_of_real_and_fake_videos(train_videos)[1])
print("TEST: ", "Real:",number_of_real_and_fake_videos(test_videos)[0]," Fake:",number_of_real_and_fake_videos(test_videos)[1])

In [None]:
import torchvision.transforms as transforms

# Define image size and normalization parameters
im_size = 256
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# Define transforms for training data
train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Resize((im_size, im_size)),
    transforms.Normalize(mean, std)
])

# Define transforms for test/validation data
test_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Resize((im_size, im_size)),
    transforms.Normalize(mean, std)
])

In [None]:
# Create VideoDataset instances for training and testing data
train_data = VideoDataset(train_videos, labels, sequence_length=20, transform=train_transforms)
test_data = VideoDataset(test_videos, labels, sequence_length=20, transform=test_transforms)

In [None]:
from torch.utils.data import DataLoader

# Create DataLoader instances for training and validation data
train_loader = DataLoader(train_data, batch_size=2, shuffle=True, num_workers=4)
valid_loader = DataLoader(test_data, batch_size=2, shuffle=True, num_workers=4)

In [None]:
#Model with feature visualization

from torch import nn
from torchvision import models
class Model(nn.Module):
    def __init__(self, num_classes,latent_dim= 2048, lstm_layers=1 , hidden_dim = 2048, bidirectional = False):
        super(Model, self).__init__()
        model = models.resnext50_32x4d(pretrained = True) #Residual Network CNN
        self.model = nn.Sequential(*list(model.children())[:-2])
        self.lstm = nn.LSTM(latent_dim,hidden_dim, lstm_layers,  bidirectional)
        self.relu = nn.LeakyReLU()
        self.dp = nn.Dropout(0.4)
        self.linear1 = nn.Linear(2048,num_classes)
        self.avgpool = nn.AdaptiveAvgPool2d(1)
    def forward(self, x):
        batch_size,seq_length, c, h, w = x.shape
        x = x.view(batch_size * seq_length, c, h, w)
        fmap = self.model(x)
        x = self.avgpool(fmap)
        x = x.view(batch_size,seq_length,2048)
        x_lstm,_ = self.lstm(x,None)
        return fmap,self.dp(self.linear1(torch.mean(x_lstm,dim = 1)))

In [None]:
model = Model(2).cuda()
a,b = model(torch.from_numpy(np.empty((1,20,3,256,256))).type(torch.cuda.FloatTensor))

In [None]:
import torch
from torch.autograd import Variable
import time
import os
import sys
import os
import time

def train_epoch(epoch, num_epochs, data_loader, model, criterion, optimizer):
    model.train()
    losses = AverageMeter()
    accuracies = AverageMeter()
    start_time = time.time()  # Record the start time of the epoch
    for i, (inputs, targets) in enumerate(data_loader):
        if torch.cuda.is_available():
            targets = targets.type(torch.cuda.LongTensor)
            inputs = inputs.cuda()
        _, outputs = model(inputs)
        loss = criterion(outputs, targets.type(torch.cuda.LongTensor))
        acc = calculate_accuracy(outputs, targets.type(torch.cuda.LongTensor))
        losses.update(loss.item(), inputs.size(0))
        accuracies.update(acc, inputs.size(0))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate remaining time
        elapsed_time = time.time() - start_time
        average_time_per_epoch = elapsed_time / (epoch * len(data_loader) + i + 1)
        remaining_epochs = num_epochs - epoch
        remaining_batches = len(data_loader) - i - 1
        remaining_time = remaining_epochs * remaining_batches * average_time_per_epoch

        sys.stdout.write(
            "\r[Epoch %d/%d] [Batch %d/%d] [Loss: %f, Acc: %.2f%%] [ETA: %s]"
            % (
                epoch,
                num_epochs,
                i + 1,
                len(data_loader),
                losses.avg,
                accuracies.avg,
                format_time(remaining_time)
            )
        )
        sys.stdout.flush()

    torch.save(model.state_dict(), '/content/deepfakenet.pt')
    return losses.avg, accuracies.avg

def format_time(seconds):
    minutes, seconds = divmod(seconds, 60)
    hours, minutes = divmod(minutes, 60)
    return "{:02}:{:02}:{:02}".format(int(hours), int(minutes), int(seconds))

def test(epoch,model, data_loader ,criterion):
    print('Testing')
    model.eval()
    losses = AverageMeter()
    accuracies = AverageMeter()
    pred = []
    true = []
    count = 0
    with torch.no_grad():
        for i, (inputs, targets) in enumerate(data_loader):
            if torch.cuda.is_available():
                targets = targets.cuda().type(torch.cuda.FloatTensor)
                inputs = inputs.cuda()
            _,outputs = model(inputs)
            loss = torch.mean(criterion(outputs, targets.type(torch.cuda.LongTensor)))
            acc = calculate_accuracy(outputs,targets.type(torch.cuda.LongTensor))
            _,p = torch.max(outputs,1)
            true += (targets.type(torch.cuda.LongTensor)).detach().cpu().numpy().reshape(len(targets)).tolist()
            pred += p.detach().cpu().numpy().reshape(len(p)).tolist()
            losses.update(loss.item(), inputs.size(0))
            accuracies.update(acc, inputs.size(0))
            sys.stdout.write(
                    "\r[Batch %d / %d]  [Loss: %f, Acc: %.2f%%]"
                    % (
                        i,
                        len(data_loader),
                        losses.avg,
                        accuracies.avg
                        )
                    )
        print('\nAccuracy {}'.format(accuracies.avg))
    return true,pred,losses.avg,accuracies.avg
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()
    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count
def calculate_accuracy(outputs, targets):
    batch_size = targets.size(0)

    _, pred = outputs.topk(1, 1, True)
    pred = pred.t()
    correct = pred.eq(targets.view(1, -1))
    n_correct_elems = correct.float().sum().item()
    return 100* n_correct_elems / batch_size

In [None]:
from sklearn.metrics import confusion_matrix
#learning rate
lr = 1e-5 #0.001
#number of epochs
num_epochs = 50

optimizer = torch.optim.Adam(model.parameters(), lr= lr,weight_decay = 1e-5)

#class_weights = torch.from_numpy(np.asarray([1,15])).type(torch.FloatTensor).cuda()
#criterion = nn.CrossEntropyLoss(weight = class_weights).cuda()
criterion = nn.CrossEntropyLoss().cuda()
train_loss_avg =[]
train_accuracy = []
test_loss_avg = []
test_accuracy = []
for epoch in range(1,num_epochs+1):
    l, acc = train_epoch(epoch,num_epochs,train_loader,model,criterion,optimizer)
    train_loss_avg.append(l)
    train_accuracy.append(acc)
    true,pred,tl,t_acc = test(epoch,model,valid_loader,criterion)
    test_loss_avg.append(tl)
    test_accuracy.append(t_acc)

In [None]:
import torch
torch.cuda.empty_cache()