In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install opencv-python-headless
!pip install mtcnn


In [None]:
import os
import cv2
import numpy as np
from mtcnn import MTCNN
from concurrent.futures import ThreadPoolExecutor
import tensorflow as tf
import json

In [None]:
from zipfile import ZipFile
zf = ZipFile('train_sample_videos-20240524T065131Z-001.zip', 'r')
zf.extractall('train_sample_videos')
zf.close()

In [None]:
DATA_FOLDER = "train_sample_videos/"
TRAIN_FOLDER = "train_sample_videos"

video_files = [file for file in os.listdir(os.path.join(DATA_FOLDER, TRAIN_COLAB_FOLDER)) if file.endswith('.mp4')]
print(video_files)

In [None]:
cv2.setUseOptimized(True)
cv2.setNumThreads(4)
cv2.ocl.setUseOpenCL(True)

In [None]:
TARGET_SIZE = (300, 300)
FRAME_SKIP = 5
NUM_WORKERS = 4

In [None]:
detector = MTCNN()

In [None]:
def preprocess_video(video_path, frame_skip):
    frames = []
    frame_count = 0
    cap = cv2.VideoCapture(video_path)

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count % frame_skip == 0:
            results = detector.detect_faces(frame)
            if results:
                x, y, w, h = results[0]['box']
                face = frame[y:y+h, x:x+w]
                if len(face.shape) == 2:
                    face = cv2.cvtColor(face, cv2.COLOR_GRAY2BGR)
                face = cv2.resize(face, TARGET_SIZE)
                frames.append(face)
        frame_count += 1

    cap.release()
    return frames

In [None]:
json_file_path = os.path.join(DATA_FOLDER, TRAIN_FOLDER, "metadata.json")
with open(json_file_path, "r") as json_file:
    metadata = json.load(json_file)

In [None]:
data = []
labels = []

In [None]:
from concurrent.futures import ThreadPoolExecutor
import numpy as np


label_mapping = {'FAKE': 0, 'REAL': 1}


def process_video(video_path, video_file):
    info = metadata[video_file]
    frames = preprocess_video(video_path)
    data.extend(frames)
    video_label = info["label"]
    video_label_sequence = [label_mapping[video_label]] * len(frames)
    labels.extend([video_label_sequence] * len(frames))



with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    for video_file in video_files:
        video_path = os.path.join(DATA_FOLDER, TRAIN_COLAB_FOLDER, video_file)
        executor.submit(process_video, video_path, video_file)

In [None]:
from keras.utils import to_categorical

max_sequence_length = 8
labels = [sequence[:max_sequence_length] if len(sequence) >= max_sequence_length else sequence + [0] * (max_sequence_length - len(sequence)) for sequence in labels]


data = np.array(data)
labels = np.array(labels)


print("Data shape:", data.shape)
print("Labels shape:", labels.shape)


num_classes = len(label_mapping)
one_hot_labels = to_categorical(labels, num_classes=num_classes)


print("One-hot Labels shape:", one_hot_labels.shape)

In [None]:
print(len(data), len(labels))

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data, one_hot_labels, test_size=0.2, random_state=42)
