In [None]:
# referrence https://github.com/ipazc/mtcnn 

In [None]:
import numpy as np
import random
import pickle
import os
from random import sample
import torch

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
!pip install facenet-pytorch

Collecting facenet-pytorch
  Downloading facenet_pytorch-2.5.2-py3-none-any.whl (1.9 MB)
[K     |████████████████████████████████| 1.9 MB 5.2 MB/s 
Installing collected packages: facenet-pytorch
Successfully installed facenet-pytorch-2.5.2


In [None]:
import os
import glob
import time
import torch
import cv2
from PIL import Image
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tqdm.notebook import tqdm

from facenet_pytorch import MTCNN, extract_face

device = 'cuda: 205' if torch.cuda.is_available() else 'cpu'
print(f'Running on device: {device}')

Running on device: cuda:0


In [None]:
import torchvision.models as models

In [None]:
mtcnn = MTCNN(margin=15, keep_all=True, factor=0.5, device=device).eval()

resnet = models.resnet18(pretrained=True)

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth


  0%|          | 0.00/44.7M [00:00<?, ?B/s]

In [None]:
if torch.cuda.is_available():
    resnet.to(device)

In [None]:
class DetectionPipeline:

    def __init__(self, detector, n_frames=None, batch_size=20):

        self.detector = detector
        self.n_frames = n_frames
        self.batch_size = batch_size
        
    def __call__(self, filename):
 
        v_cap = cv2.VideoCapture(filename)
        vid_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
        print("video length is ", vid_len)
    
        if self.n_frames is None:
            sample = np.arange(0, vid_len)

        else:
            sample = np.linspace(0, vid_len - 1, self.n_frames).astype(int)
            #print("sample is ")
            #print(sample)

        faces = []
        frames = []
        for j in range(vid_len):
            success = v_cap.grab()
            if j in sample:
                success, frame = v_cap.retrieve()
                if not success:
                    continue
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = Image.fromarray(frame)
                
                frame = frame.resize([int(d * 0.25) for d in frame.size])
                frames.append(frame)

                if len(frames) % self.batch_size == 0 or j == sample[-1]:
                    faces.extend(self.detector(frames))
                    frames = []

        v_cap.release()
        
        return faces

In [None]:
import copy
def process_faces(faces, resnet):
    faces = [f for f in faces if f is not None]
    if(len(faces) == 0):
        return []

    faces = torch.cat(faces).to(device)
    #300/20 == 15
    # remove files with len < 300
    if(len(faces)<15):
        return []

    faces = faces[:15]
    embeddings = resnet(faces)

    centroid = embeddings.mean(dim=0)
    
    x = (embeddings - centroid).norm(dim=1).cpu().numpy()
    
    return x

In [None]:
root_data_file = "/content/gdrive/My Drive"

In [None]:
detection_pipeline = DetectionPipeline(detector=mtcnn, n_frames = 20, batch_size=60)

filenames = glob.glob('/content/gdrive/My Drive/deepfake-detection/train_sample_videos/*.mp4')
total_files = len(filenames)


In [None]:
total_files

400

In [None]:
#this is the label of the data and it cannot be loaded by colab. Thus, we hardcode it.
data = [1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,1,0,1,1,1,1,
1,0,1,1,1,1,1,1,1,1,1,1,1,0,1,1,0,1,1,1,0,1,1,1,1,
1,1,0,1,0,1,0,1,1,1,0,0,1,1,0,1,1,1,1,0,1,1,0,1,1,1,
1,0,1,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,0,
1,1,1,1,1,1,0,0,1,1,1,1,1,1,0,1,1,1,1,1,1,1,0,1,0,1,
1,0,1,0,1,0,1,1,1,1,1,0,1,1,1,1,1,1,1,0,0,0,1,1,1,1,
1,1,0,1,1,1,1,0,1,1,1,1,1,1,1,1,1,1,0,1,1,1,0,0,1,1,
1,0,1,0,1,1,1,1,1,0,1,0,0,0,1,1,1,0,1,1,1,1,0,0,1,1,
1,0,0,1,1,1,1,1,1,0,1,1,1,1,1,0,1,1,1,1,1,1,0,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,0,1,0,1,1,1,1,1,1,0,1,1,1,1,0,
1,1,1,1,1,0,1,0,0,1,1,1,1,0,1,1,0,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,
1,1,1,1,0,1,1,0,1,1,0,1,0,1,1,1,1,1,1,1,1,1,0,0,0,1,
1,1,1,1,1,1,1,1,0,1,1,1,0,1,0,1,1,1,0,1,1,1,1,1,1,1,
1,1,0,1,1,1,1,1,1,0,1,1,1,1,1,1,0,0,1,1,1,1,1,1,1,0,
1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1,0]

In [None]:
X = []
y = []
start = time.time()
n_processed = 0

with torch.no_grad():
    for i, filename in tqdm(enumerate(filenames), total=len(filenames)):
        try:
            print("training ", i)
            faces = detection_pipeline(filename)   
            z = process_faces(faces, resnet)
            if (len(z)!=0):
                X.append(z)
                if(data[i] == 1):
                    y.append(1)
                else:
                    y.append(0)

        except Exception as e:
            print(e)
            X.append(None)
        
        n_processed += len(faces)

  0%|          | 0/400 [00:00<?, ?it/s]

training  0
video length is  300


  batch_boxes, batch_points = np.array(batch_boxes), np.array(batch_points)
  boxes = np.array(boxes)
  probs = np.array(probs)
  points = np.array(points)


training  1
video length is  300
training  2
video length is  300
training  3
video length is  300
training  4
video length is  300
training  5
video length is  300
training  6
video length is  300
training  7
video length is  300
training  8
video length is  298
training  9
video length is  300
training  10
video length is  300
training  11
video length is  300
training  12
video length is  300
training  13
video length is  300
training  14
video length is  300
training  15
video length is  300
training  16
video length is  300
training  17
video length is  300
training  18
video length is  300
training  19
video length is  300
training  20
video length is  300
training  21
video length is  300
training  22
video length is  300
training  23
video length is  300
training  24
video length is  300
training  25
video length is  300
training  26
video length is  300
training  27
video length is  300
training  28
video length is  298
training  29
video length is  298
training  30
video leng

In [None]:
# the 1st one has some error. 
# VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences 
# (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray
#  points = np.array(points)

X = X[1:]
y = y[1:]

[1, 1, 1, 1, 1, 0, 1, 1, 1]

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.6, random_state = 105)

In [None]:
from sklearn.linear_model import LogisticRegression

clf = LogisticRegression(random_state=0).fit(X_train, y_train)
y_pred_lr = clf.predict(X_test)

In [None]:
from sklearn.metrics import accuracy_score
print(accuracy_score(y_test,y_pred_lr))

0.7368421052631579


303