In [None]:
#Mount our google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
TRAIN_DIR = '/content/drive/MyDrive/data_split/Train'
TEST_DIR = '/content/drive/MyDrive/data_split/Test'
VALID_DIR = '/content/drive/MyDrive/data_split/Validation'

BATCH_SIZE = 1
SCALE = 0.25
N_FRAMES = None # The number of frames extracted from each video, 'None' means get all available frames

In [None]:
# Install facenet-pytorch
!pip install facenet-pytorch

from facenet_pytorch.models.inception_resnet_v1 import get_torch_home
torch_home = get_torch_home()


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting facenet-pytorch
  Downloading facenet_pytorch-2.5.2-py3-none-any.whl (1.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m26.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: facenet-pytorch
Successfully installed facenet-pytorch-2.5.2


In [None]:
import os
import glob
import json
import torch
import cv2
from PIL import Image
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
from facenet_pytorch import MTCNN, InceptionResnetV1

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print(f'Running on device: {device}')

Running on device: cuda:0


In [None]:
# Source: https://www.kaggle.com/timesler/facial-recognition-model-in-pytorch
class DetectionPipeline:
    
    
    def __init__(self, detector, n_frames=None, batch_size=60, resize=None):
        
        self.detector = detector
        self.n_frames = n_frames
        self.batch_size = batch_size
        self.resize = resize
    
    def __call__(self, filename):
        
        # Create video reader and find length
        v_cap = cv2.VideoCapture(filename)
        v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Pick 'n_frames' evenly spaced frames to sample
        if self.n_frames is None:
            sample = np.arange(0, v_len)
        else:
            sample = np.linspace(0, v_len - 1, self.n_frames).astype(int)

        # Loop through frames
        faces = []
        frames = []
        for j in range(v_len):
            success = v_cap.grab()
            if j in sample:
                # Load frame
                success, frame = v_cap.retrieve()
                if not success:
                    continue
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = Image.fromarray(frame)
                
                # Resize frame to desired size
                if self.resize is not None:
                    frame = frame.resize([int(d * self.resize) for d in frame.size])
                frames.append(frame)

                # When batch is full, detect faces and reset frame list
                if len(frames) % self.batch_size == 0 or j == sample[-1]:
                    faces.extend(self.detector(frames))
                    frames = []

        v_cap.release()

        return faces

In [None]:
# Source: https://www.kaggle.com/timesler/facial-recognition-model-in-pytorch
def process_faces(faces, feature_extractor):
    # Filter out frames without faces
    faces = [f for f in faces if f is not None]
    if len(faces) == 0:
        return None
    faces = torch.cat(faces).to(device)

    # Generate facial feature vectors using a pretrained model
    embeddings = feature_extractor(faces)

    # Calculate centroid for video and distance of each face's feature vector from centroid
    centroid = embeddings.mean(dim=0)
    x = (embeddings - centroid).norm(dim=1).cpu().numpy()
    
    return x

In [None]:
# Load face detector
face_detector = MTCNN(margin=14, keep_all=True, factor=0.5, device=device).eval()

# Load facial recognition model
feature_extractor = InceptionResnetV1(pretrained='vggface2', device=device).eval()

# Define face detection pipeline
detection_pipeline = DetectionPipeline(detector=face_detector, n_frames=N_FRAMES, batch_size=BATCH_SIZE, resize=SCALE)

  0%|          | 0.00/107M [00:00<?, ?B/s]

In [None]:
# Get the paths of all train videos
all_train_videos = glob.glob(os.path.join(TRAIN_DIR, '*.mp4'))
all_test_videos = glob.glob(os.path.join(TEST_DIR, '*.mp4'))
all_valid_videos = glob.glob(os.path.join(VALID_DIR, '*.mp4'))

# # Get path of metadata.json
# metadata_path = TRAIN_DIR + 'metadata.json'

# Get metadata
with open('metadata.json', 'r') as f:
    metadata = json.load(f)

In [None]:
df = pd.DataFrame(columns=['filename', 'distance', 'label'])

with torch.no_grad():
    for path in tqdm(all_valid_videos):
        file_name = path.split('/')[-1]

        # Detect all faces occur in the video
        faces = detection_pipeline(path)
        
        # Calculate the distances of all faces' feature vectors to the centroid
        distances = process_faces(faces, feature_extractor)
        if distances is None:
            continue

        for distance in distances:
            row = [
                file_name,
                distance,
                1 if metadata[file_name]['label'] == 'FAKE' else 0
            ]

            # Append a new row at the end of the data frame
            df.loc[len(df)] = row

  0%|          | 0/79 [00:00<?, ?it/s]

In [None]:
df.head()

Unnamed: 0,filename,distance,label
0,hnfwagcxdf.mp4,0.587933,1
1,hnfwagcxdf.mp4,1.167863,1
2,hnfwagcxdf.mp4,0.601006,1
3,hnfwagcxdf.mp4,0.606903,1
4,hnfwagcxdf.mp4,0.559757,1


In [None]:
df.sample(10)

Unnamed: 0,filename,distance,label
57539,caifxvsozs.mp4,0.691494,0
24350,dmmvuaikkv.mp4,0.557556,0
134208,txnmkabufs.mp4,0.28229,0
17934,cekwtyxdoo.mp4,0.782703,1
115985,edyncaijwx.mp4,0.627255,0
144383,yljecirelf.mp4,0.396901,0
34019,bggsurpgpr.mp4,0.536438,1
92452,dkdwxmtpuo.mp4,0.314569,1
109479,prwsfljdjo.mp4,0.758,0
60570,cfxkpiweqt.mp4,0.336001,0


In [None]:
df.to_csv('train.csv', index=False)

In [None]:
# Get the paths of all train videos
all_train_videos = glob.glob(os.path.join(TEST_DIR, '*.mp4'))

In [None]:
df = pd.DataFrame(columns=['filename', 'distance', 'label'])

with torch.no_grad():
    for path in tqdm(all_test_videos):
        file_name = path.split('/')[-1]

        # Detect all faces occur in the video
        faces = detection_pipeline(path)
        
        # Calculate the distances of all faces' feature vectors to the centroid
        distances = process_faces(faces, feature_extractor)
        if distances is None:
            continue

        for distance in distances:
            row = [
                file_name,
                distance,
                1 if metadata[file_name]['label'] == 'FAKE' else 0
            ]

            # Append a new row at the end of the data frame
            df.loc[len(df)] = row

  0%|          | 0/170 [00:00<?, ?it/s]

In [None]:
df.to_csv('test.csv', index=False)