In [None]:
import cv2
import dlib

In [None]:
!pip install opendatasets --upgrade --quiet
import opendatasets as od

In [None]:
dataset_url="https://www.kaggle.com/datasets/sorokin/faceforensics"

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set(style="whitegrid")
import os
import glob as gb
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision
import time
import random
import torchvision.transforms as t
import torchvision.models as models
import albumentations as A

In [None]:
od.download(dataset_url)

Skipping, found downloaded files in "./faceforensics" (use force=True to force download)


In [None]:
detector = dlib.get_frontal_face_detector()

In [None]:
from google.colab.patches import cv2_imshow

In [None]:
import os

In [None]:
fake = '/content/faceforensics/manipulated_sequences/Deepfakes/c23/videos/'
original = '/content/faceforensics/original_sequences/youtube/c23/videos/'

In [None]:
os.mkdir("images")

os.mkdir("images/real")
os.mkdir("images/fake")

os.mkdir("images/fake/train")
os.mkdir("images/fake/test")
os.mkdir("images/fake/validation")

os.mkdir("images/real/train")
os.mkdir("images/real/test")
os.mkdir("images/real/validation")

In [None]:
def save(img,file_path, folder_name, name, bbox, width=180,height=227):
    x, y, w, h = bbox
    imgCrop = img[y:h, x: w]
    if imgCrop.any():
      imgCrop = cv2.resize(imgCrop, (width, height)) #we need this line to reshape the images
      cv2.imwrite(file_path+folder_name+name+".jpg", imgCrop)

def faces(vid_path,file_path):
    i=0
    vids = 0
    for file in os.listdir(vid_path):
      path=os.path.join(vid_path, file)
      cap = cv2.VideoCapture(path)
      _, frame = cap.read()
      gray =cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)
      faces = detector(gray)
      if vids<720:
        folder_name = "train/"
      elif vids<860:
        folder_name = "test/"
      else:
        folder_name = "validation/"
    # detect the face
      if len(faces):
        for counter,face in enumerate(faces):
            x1, y1 = face.left(), face.top()
            x2, y2 = face.right(), face.bottom()
            save(frame,file_path,folder_name,str(i),(x1,y1,x2,y2))
            i=i+1
      vids+=1
    cap.release()


In [None]:
faces(fake,'/content/images/fake/')

In [None]:
faces(original,'/content/images/real/')

In [None]:
import torch.utils.data as data
from torchvision import datasets

In [None]:
x_real = []
y_real = []
x_fake=[]
y_fake=[]

In [None]:
augmentation = A.Compose(
    [
        A.Downscale(p=0.5),
        A.HorizontalFlip(p=0.5),
        A.RandomBrightnessContrast(p=0.5),
        A.HueSaturationValue(p=0.5),
        A.GaussNoise(p=0.5),
        A.JpegCompression(p=0.5)
    ]
)



In [None]:
for folder in  os.listdir("/content/images/") :
    files = gb.glob(pathname= str( "/content/images/" + folder + "/train" + '/*.jpg'))
    for file in files:
      image = cv2.imread(file)
      image = np.array(cv2.resize(image,(224,224)))
      aug = augmentation(image=image)
      image = aug["image"]
      image = (torch.tensor(image).permute(2,0,1) )/255
      if folder=="real":
        y_real.append(0)
        x_real.append(image)
      else:
        y_fake.append(1)
        x_fake.append(image)

In [None]:
from torch.utils.data import Dataset

class customDataset(Dataset):
    def __init__(self, positive_data,negative_data, positive_labels,negative_labels, transform = t.Resize(224)):
        self.positive_data = positive_data
        self.negative_data=negative_data
        self.positive_labels = torch.LongTensor(positive_labels)
        self.negative_labels=torch.LongTensor(negative_labels)
        self.transform = transform

    def __getitem__(self, index):
        anchor_img = self.positive_data[index]
        positive_img=random.choice(self.positive_data)
        #while positive_img==anchor_img:
          #positive_img=random.choice(self.positive_data)
        negative_img=random.choice(self.negative_data)
        anchor_label = self.positive_labels[index]
        if self.transform:
          anchor_img = self.transform(anchor_img)
          positive_img = self.transform(positive_img)
          negative_img = self.transform(negative_img)
          return anchor_img,positive_img,negative_img,anchor_label

    def __len__(self):
        return len(self.positive_data)

In [None]:
train_realanchor_set=customDataset(x_real,x_fake,y_real,y_fake)
train_fakeanchor_set=customDataset(x_fake,x_real,y_fake,y_real)

In [None]:
train_realanchor_loader=DataLoader(train_realanchor_set, batch_size=6, shuffle=True)
train_fakeanchor_loader=DataLoader(train_fakeanchor_set, batch_size=6, shuffle=True)

In [None]:
b4 = models.efficientnet_b4(pretrained=False)

class efficientnet_b4_att(nn.Module):
    def __init__(self):
        super(efficientnet_b4_att, self).__init__()
        self.first = b4.features[0:4]
        self.attention = nn.Sequential(
            nn.Conv2d(56,1,1),
            nn.Sigmoid(),
            )
        self.last = b4.features[4:]

    def forward(self, x):
        x = self.last(self.first(x) * self.attention(self.first(x)))
        return x

In [None]:
class EfficientViT(nn.Module):
  def __init__(self, config, channels=512, selected_efficient_net = 0):
        super().__init__()


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = efficientnet_b4_att().to(device)

In [None]:
print(model)

In [None]:
class TripletLoss(nn.Module):
    def __init__(self, margin=2.5):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def calc_euclidean(self, x1, x2):
        #print('x1:',x1.size())
        return (x1 - x2).pow(2).sum(1)

    def forward(self, anchor: torch.Tensor, positive: torch.Tensor, negative: torch.Tensor) -> torch.Tensor:
        distance_positive = self.calc_euclidean(anchor, positive)
        distance_negative = self.calc_euclidean(anchor, negative)
        #print('p:',distance_positive)
        #print('n',distance_negative)
        losses = torch.relu(distance_positive - distance_negative + self.margin)
       # print('loss:',losses.size())
       # print('anchor:',anchor.size())
        return losses.mean()

In [None]:
import math

In [None]:
class LosslessTripletLoss(nn.Module):
    def __init__(self,beta,epsilon,n):
        super(LosslessTripletLoss, self).__init__()
        self.beta=beta
        self.epsilon=epsilon
        self.dim=n

    def positive_dist(self, x1, x2):
        print('root:',(x1 - x2).pow(2))
        return (-math.log((-((x1 - x2).pow(2)))/self.beta)+1+self.epsilon).sum(1)

    def negative_dist(self,x1,x2):
        return (-math.log((-(self.dim-((x1 - x2).pow(2))))/self.beta)+1+self.epsilon).sum(1)

    def forward(self, anchor: torch.Tensor, positive: torch.Tensor, negative: torch.Tensor) -> torch.Tensor:
        distance_positive = self.positive_dist(anchor, positive)
        distance_negative = self.negative_dist(anchor, negative)
        print('p:',distance_positive)
        print('n',distance_negative)
        losses = distance_positive + distance_negative
       # print('loss:',losses.size())
       # print('anchor:',anchor.size())
        return losses.mean()

In [None]:
import torch.optim as optim

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = torch.jit.script(LosslessTripletLoss(1792,1e-8,1792))

In [None]:
from tqdm.notebook import tqdm
epochs=50

In [None]:
model.train()
outputs=[]
for epoch in tqdm(range(epochs), desc="Epochs"):
    running_loss1 = []
    running_loss2 = []
    for step1, (anchor_img1, positive_img1, negative_img1, anchor_label1) in enumerate(tqdm(train_realanchor_loader, desc="Training", leave=False)):
        anchor_img1 = anchor_img1.to(device)
        positive_img1 = positive_img1.to(device)
        negative_img1 = negative_img1.to(device)

        optimizer.zero_grad()
        anchor_out1 = model.forward(anchor_img1)
        positive_out1 = model.forward(positive_img1)
        negative_out1= model.forward(negative_img1)

        loss = criterion(anchor_out1, positive_out1, negative_out1)
        loss.backward()
        optimizer.step()

        running_loss1.append(loss.cpu().detach().numpy())
    print("Epoch: {}/{} - Loss1: {:.4f}".format(epoch+1, epochs, np.mean(running_loss1)))


    for step2, (anchor_img2, positive_img2, negative_img2, anchor_label2) in enumerate(tqdm(train_fakeanchor_loader, desc="Training", leave=False)):
        anchor_img2 = anchor_img2.to(device)
        positive_img2 = positive_img2.to(device)
        negative_img2 = negative_img2.to(device)

        optimizer.zero_grad()
        anchor_out2 = model.forward(anchor_img2)
        positive_out2 = model.forward(positive_img2)
        negative_out2= model.forward(negative_img2)

        loss = criterion(anchor_out2, positive_out2, negative_out2)
        loss.backward()
        optimizer.step()

        running_loss2.append(loss.cpu().detach().numpy())
    print("Epoch: {}/{} - Loss2: {:.4f}".format(epoch+1, epochs, np.mean(running_loss2)))

In [None]:
for param in model.parameters():
    param.requires_grad = False

In [None]:
x_train=[]
y_train=[]
for i in range(0,len(x_real)):
  x_train.append(x_real[i])
  y_train.append(y_real[i])
for i in range(0,len(x_fake)):
  x_train.append(x_fake[i])
  y_train.append(y_fake[i])

In [None]:
# x_test=[]
# y_test=[]
# for i in range(0,len(x_real_val)):
#   x_test.append(x_real_val[i])
#   y_test.append(y_real_val[i])
# for i in range(0,len(x_fake)):
#   x_test.append(x_fake_val[i])
#   y_test.append(y_fake_val[i])

In [None]:
from torch.utils.data import Dataset

class customDataset(Dataset):
    def __init__(self, data, labels, transform = None):
        self.data = data
        self.labels = torch.FloatTensor(labels)
        self.transform = transform

    def __getitem__(self, index):
        x = self.data[index]
        if len(self.labels):
          y = self.labels[index]
        else:
          y = None
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.data)

In [None]:
train_finetuning = customDataset(x_train,y_train)
# test_dataset=customDataset(x_test,y_test)

In [None]:
train_finetuning_loader=DataLoader(train_finetuning, batch_size=64, shuffle=True)
# test_loader=DataLoader(test_dataset,batch_size=64,shuffle=False)

In [None]:
class finetuning_layer(nn.Module):
  def __init__(self):
    super(finetuning_layer, self).__init__()
    self.classifier=nn.Linear(1792,1)
  def forward(self,x):
    x = model.forward(x).squeeze()
    return self.classifier(x)

In [None]:
final_layer_model=finetuning_layer()

In [None]:
final_layer_model.to(device)

In [None]:
epochs = 50
lr = 0.001
optimizer = optim.Adam
criterion = torch.nn.functional.binary_cross_entropy_with_logits


def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

def accuracy(y_pred, y):
   # predicted = torch.max(y_pred.data)
    predicted=torch.round(y_pred)
    total = y.size(0)
    correct = (predicted == y).sum().item()
    return correct/total

def train(model, dataset, opt_fn, criterion,epoch,learning_rate):

    optimizer=opt_fn(model.parameters(),learning_rate)
    final_layer_model.train()
    train_loss=[]
    train_acc=[]
    for batch_idx,(data,target) in enumerate(dataset):

        data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output=final_layer_model(data)
        output=torch.squeeze(output,1)
        #loss=criterion(output,target)
        logloss=torch.nn.functional.cross_entropy(output,targets)
        logloss.backward()
        optimizer.step()
        acc = accuracy(output, target)
        train_acc.append(acc)
        train_loss.append(logloss.item())
        print('\repoch:{}({:.0f}%)\tloss:{:.3f}\ttrain_accuracy:{:.2f}%'.format(epoch+1,100*batch_idx/len(dataset),
        np.mean(train_loss),100*np.mean(train_acc)),end='')

def eval(model, dataset, criterion):

    model.eval()
    val_acc=[]
    for batch_idx,(data,target) in enumerate(dataset):
        output=model(data)
        acc = accuracy(output, target)
        val_acc.append(acc)
    print('val_accuracy:{:.2f}%'.format(100*np.mean(val_acc)))
    return np.mean(val_acc)

In [None]:
for epoch in range(epochs):
      start_time = time.monotonic()
      train(final_layer_model, train_finetuning_loader, optimizer, criterion,epoch,lr)
      # eval_accuracy = eval(model, test_loader, criterion)
      end_time = time.monotonic()
      epoch_mins, epoch_secs = epoch_time(start_time, end_time)

      print("TIME TAKEN FOR THE EPOCH: {} mins and {} seconds\n".format(epoch_mins, epoch_secs))
print("OVERALL TRAINING COMPLETE")