1. Setup and Libraries Installation

In [None]:
!pip install opencv-python-headless mtcnn tensorflow keras scikit-learn

2. Import Libraries

In [None]:
import cv2
import os
import json
import numpy as np
from mtcnn import MTCNN
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

3. Load Metadata

In [None]:
metadata_path = 'train_sample_videos/metadata.json'

with open(metadata_path, 'r') as file:
    metadata = json.load(file)

4. Extract Frames from Videos

In [None]:
def extract_frames(video_path, output_folder, frame_rate=1):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    cap = cv2.VideoCapture(video_path)
    count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if int(cap.get(cv2.CAP_PROP_POS_FRAMES)) % frame_rate == 0:
            cv2.imwrite(os.path.join(output_folder, f"frame{count:05d}.jpg"), frame)
            count += 1
    cap.release()

train_videos_path = 'train_sample_videos'
test_videos_path = 'test_videos'
output_frames_path = 'extracted_frames'

# Process train videos
for video_file in os.listdir(train_videos_path):
    if video_file.endswith('.mp4'):
        video_path = os.path.join(train_videos_path, video_file)
        extract_frames(video_path, os.path.join(output_frames_path, 'train', video_file.split('.')[0]))

# Process test videos
for video_file in os.listdir(test_videos_path):
    if video_file.endswith('.mp4'):
        video_path = os.path.join(test_videos_path, video_file)
        extract_frames(video_path, os.path.join(output_frames_path, 'test', video_file.split('.')[0]))

5. Face Detection and Cropping

In [None]:
def detect_and_crop_faces(image_path, output_folder, frame_number):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    detector = MTCNN()
    img = cv2.imread(image_path)
    faces = detector.detect_faces(img)
    if not faces:
        print(f"No faces detected in {image_path}")
    count = 0
    for face in faces:
        x, y, width, height = face['box']
        cropped_face = img[y:y+height, x:x+width]
        cv2.imwrite(os.path.join(output_folder, f"frame{frame_number:05d}_face{count:05d}.jpg"), cropped_face)
        count += 1

output_faces_path = 'cropped_faces'

# Process train frames
for video_folder in os.listdir(os.path.join(output_frames_path, 'train')):
    frame_folder_path = os.path.join(output_frames_path, 'train', video_folder)
    face_output_folder = os.path.join(output_faces_path, 'train', video_folder)
    for frame_number, frame_file in enumerate(os.listdir(frame_folder_path)):
        frame_path = os.path.join(frame_folder_path, frame_file)
        detect_and_crop_faces(frame_path, face_output_folder, frame_number)

# Process test frames
for video_folder in os.listdir(os.path.join(output_frames_path, 'test')):
    frame_folder_path = os.path.join(output_frames_path, 'test', video_folder)
    face_output_folder = os.path.join(output_faces_path, 'test', video_folder)
    for frame_number, frame_file in enumerate(os.listdir(frame_folder_path)):
        frame_path = os.path.join(frame_folder_path, frame_file)
        detect_and_crop_faces(frame_path, face_output_folder, frame_number)

6. Data Augmentation

In [None]:
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

def augment_images(image_folder, output_folder, num_augmented_images=5):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    for img in os.listdir(image_folder):
        img_path = os.path.join(image_folder, img)
        image = cv2.imread(img_path)
        x = image.reshape((1,) + image.shape)
        i = 0
        for batch in datagen.flow(x, batch_size=1, save_to_dir=output_folder, save_prefix='aug', save_format='jpg'):
            i += 1
            if i >= num_augmented_images:
                break

output_augmented_path = 'augmented_faces'

# Augment train faces
for video_folder in os.listdir(os.path.join(output_faces_path, 'train')):
    face_folder_path = os.path.join(output_faces_path, 'train', video_folder)
    augmented_output_folder = os.path.join(output_augmented_path, 'train', video_folder)
    augment_images(face_folder_path, augmented_output_folder)

# Augment test faces
for video_folder in os.listdir(os.path.join(output_faces_path, 'test')):
    face_folder_path = os.path.join(output_faces_path, 'test', video_folder)
    augmented_output_folder = os.path.join(output_augmented_path, 'test', video_folder)
    augment_images(face_folder_path, augmented_output_folder)

7. Normalization and Resizing

In [None]:
def preprocess_image(image_path, target_size=(224, 224)):
    img = cv2.imread(image_path)
    img = cv2.resize(img, target_size)
    img = img.astype('float32') / 255.0  # Normalize to 0-1
    return img

# Preprocess train images
preprocessed_train_images = []
train_labels = []
for video_folder in os.listdir(os.path.join(output_augmented_path, 'train')):
    augmented_folder_path = os.path.join(output_augmented_path, 'train', video_folder)
    label = 1 if metadata[video_folder + '.mp4']['label'] == 'FAKE' else 0
    for img_file in os.listdir(augmented_folder_path):
        img_path = os.path.join(augmented_folder_path, img_file)
        preprocessed_train_images.append(preprocess_image(img_path))
        train_labels.append(label)

# Preprocess test images
preprocessed_test_images = []
for video_folder in os.listdir(os.path.join(output_augmented_path, 'test')):
    augmented_folder_path = os.path.join(output_augmented_path, 'test', video_folder)
    for img_file in os.listdir(augmented_folder_path):
        img_path = os.path.join(augmented_folder_path, img_file)
        preprocessed_test_images.append(preprocess_image(img_path))

8. Create Datasets

In [None]:
train_images, val_images, train_labels, val_labels = train_test_split(preprocessed_train_images, train_labels, test_size=0.1, random_state=42)

9. Batch Processing and Data Loading

In [None]:
train_datagen = ImageDataGenerator()
val_datagen = ImageDataGenerator()
test_datagen = ImageDataGenerator()

train_generator = train_datagen.flow(np.array(train_images), np.array(train_labels), batch_size=32)
val_generator = val_datagen.flow(np.array(val_images), np.array(val_labels), batch_size=32)
test_generator = test_datagen.flow(np.array(preprocessed_test_images), batch_size=32, shuffle=False)

Visualization and Validation

In [None]:
# Example of visualizing a preprocessed image
plt.imshow(preprocessed_train_images[0])
plt.show()