In [None]:
import os

# Create a dataset directory
os.makedirs('/content/dataset', exist_ok=True)


# To divide the dataset into test, train and validate

In [None]:
import os
import shutil
import random

# Define paths
dataset_path = "/content/dataset"
preprocessed_real_path = os.path.join(dataset_path, "Preprocessed-images", "Celeb-real")
preprocessed_fake_path = os.path.join(dataset_path, "Preprocessed-images", "Celeb-synthesis")

# Output directories
train_dir = os.path.join(dataset_path, "train")
val_dir = os.path.join(dataset_path, "validation")
test_dir = os.path.join(dataset_path, "test")

# Ensure train/val/test directories exist
for split in [train_dir, val_dir, test_dir]:
    os.makedirs(f"{split}/real", exist_ok=True)
    os.makedirs(f"{split}/fake", exist_ok=True)

# Get list of images
all_real_images = os.listdir(preprocessed_real_path)
all_fake_images = os.listdir(preprocessed_fake_path)

# Shuffle datasets
random.shuffle(all_real_images)
random.shuffle(all_fake_images)

# Splitting ratio
train_ratio, val_ratio = 0.8, 0.1
train_real = int(len(all_real_images) * train_ratio)
val_real = int(len(all_real_images) * val_ratio)

train_fake = int(len(all_fake_images) * train_ratio)
val_fake = int(len(all_fake_images) * val_ratio)

# Function to move images
def move_images(image_list, src_folder, dest_folder, label):
    for img in image_list:
        src = os.path.join(src_folder, img)
        dst = os.path.join(dest_folder, label, img)
        shutil.move(src, dst)

# Move Real Images
move_images(all_real_images[:train_real], preprocessed_real_path, train_dir, "real")
move_images(all_real_images[train_real:train_real + val_real], preprocessed_real_path, val_dir, "real")
move_images(all_real_images[train_real + val_real:], preprocessed_real_path, test_dir, "real")

# Move Fake Images
move_images(all_fake_images[:train_fake], preprocessed_fake_path, train_dir, "fake")
move_images(all_fake_images[train_fake:train_fake + val_fake], preprocessed_fake_path, val_dir, "fake")
move_images(all_fake_images[train_fake + val_fake:], preprocessed_fake_path, test_dir, "fake")

print("Dataset organized successfully!")

Dataset organized successfully!


# To resize all images into valid format

In [None]:
import cv2
import numpy as np

def preprocess_image(image_path, target_size=(299, 299)):
    """Resize and normalize image."""
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, target_size)
    img = img / 255.0  # Normalize to [0, 1]
    return img


# Error handling for missed files


In [None]:
def count_images(directory):
    return len(os.listdir(directory))

print("Training set: Real:", count_images(f"{train_dir}/real"), " Fake:", count_images(f"{train_dir}/fake"))
print("Validation set: Real:", count_images(f"{val_dir}/real"), " Fake:", count_images(f"{val_dir}/fake"))
print("Test set: Real:", count_images(f"{test_dir}/real"), " Fake:", count_images(f"{test_dir}/fake"))


Training set: Real: 7745  Fake: 4229
Validation set: Real: 968  Fake: 528
Test set: Real: 969  Fake: 530


In [None]:
import os

real_videos_path = ["/content/dataset/Celeb-real", "/content/dataset/Youtube-real"]

for path in real_videos_path:
    if os.path.exists(path):
        print(f"{path}: {len(os.listdir(path))} videos")
    else:
        print(f"{path} does not exist!")


/content/dataset/Celeb-real: 590 videos
/content/dataset/Youtube-real: 300 videos


In [None]:
import shutil

# Define dataset path
dataset_path = "/content/dataset"
zip_path = "/content/dataset.zip"

# Zip the dataset
shutil.make_archive(zip_path.replace(".zip", ""), 'zip', dataset_path)

print(f"Dataset zipped successfully at {zip_path}")

Dataset zipped successfully at /content/dataset.zip


In [None]:
from google.colab import files

# Download the zip file
files.download(zip_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# CNN Processing setup


In [None]:
import torch
print("CUDA Available:", torch.cuda.is_available())


CUDA Available: True


In [None]:
!pip install insightface torchvision tqdm


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.5.1->torchvision)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch==2.5.1->torchvision)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch==2.5.1->torchvision)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch==2.5.1->torchvision)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch==2.5.1->torchvision)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch==2.5.1->torchvision)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86

# RetinaFace CNN for face detection of real videos

In [None]:
import os
import cv2
import torch
import torchvision.transforms as T
import numpy as np
from insightface.app import FaceAnalysis
from tqdm import tqdm
from PIL import Image

# Check if CUDA (GPU) is available, else fallback to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Initialize InsightFace RetinaFace with GPU support
face_detector = FaceAnalysis(name="buffalo_l", providers=["CUDAExecutionProvider"])
face_detector.prepare(ctx_id=0)  # ✅ Set to GPU

# Define target size (Xception requires 299x299)
TARGET_SIZE = (299, 299)

# Torchvision transform for resizing and padding
def resize_with_padding(image, target_size):
    """Resize image while maintaining aspect ratio with padding."""
    transform = T.Compose([
        T.Resize(target_size, interpolation=T.InterpolationMode.BILINEAR),
        T.CenterCrop(target_size),
        T.ToTensor()
    ])
    return transform(image)

def detect_and_save_faces(frame, frame_index, video_path, save_dir):
    """Detect faces in a single frame using RetinaFace and save cropped images."""
    frame_np = np.array(frame)
    h, w, _ = frame_np.shape

    # Detect faces using RetinaFace
    faces = face_detector.get(frame_np)

    if faces:
        for face in faces:
            x1, y1, x2, y2 = face.bbox.astype(int)

            # Ensure bounding box stays within frame
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(w, x2), min(h, y2)

            # Extract and process face region
            if x2 > x1 and y2 > y1:
                face_crop = frame_np[y1:y2, x1:x2]
                face_pil = Image.fromarray(face_crop)
                face_tensor = resize_with_padding(face_pil, TARGET_SIZE)
                face_pil = T.ToPILImage()(face_tensor)

                # Save face image
                video_name = os.path.basename(video_path)
                face_path = os.path.join(save_dir, f"{video_name}_{frame_index}.jpg")
                face_pil.save(face_path)

def extract_faces(video_path, save_dir, frames_per_video=10):
    """Extracts faces from a video and saves them as images."""
    os.makedirs(save_dir, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Cannot open {video_path}")
        return

    try:
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        step = max(1, frame_count // frames_per_video)

        for i in range(0, frame_count, step):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()
            if not ret:
                continue

            # Convert to RGB before face detection
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_img = Image.fromarray(frame_rgb)

            detect_and_save_faces(pil_img, i, video_path, save_dir)

    finally:
        cap.release()  # Ensure video file is released

# Paths
dataset_folder = "/content/dataset"
video_folders = ["Celeb-real", "Youtube-real"]
output_folder = os.path.join(dataset_folder, "Preprocessed-images", "Celeb-real")
video_extensions = {".mp4", ".avi", ".mov", ".mkv"}

# Process all real videos sequentially (GPU optimized)
video_files = []
for folder in video_folders:
    video_path = os.path.join(dataset_folder, folder)

    if not os.path.exists(video_path):
        print(f"Skipping {folder}: Path does not exist!")
        continue

    for video in os.listdir(video_path):
        video_file = os.path.join(video_path, video)

        output_check = os.path.join(output_folder, f"{video}_0.jpg")
        if os.path.exists(output_check):
            continue  # Skip if already processed

        if os.path.isfile(video_file) and any(video.lower().endswith(ext) for ext in video_extensions):
            video_files.append(video_file)

print(f"Total videos to process: {len(video_files)}")

# Process each video one by one (GPU optimized)
for video_file in tqdm(video_files, desc="Processing Videos"):
    extract_faces(video_file, output_folder)


Using device: cuda
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/w600k_r50.onnx recognition ['None', 3, 112, 112] 127.5 127.5
set det-size: (640

Processing Videos: 100%|██████████| 890/890 [2:32:58<00:00, 10.31s/it]


In [None]:
import shutil

# Define the source folder and output zip file
source_folder = "/content/dataset/Preprocessed-images/Celeb-real"
output_zip = "/content/Celeb-real.zip"

# Create a zip file
shutil.make_archive(output_zip.replace(".zip", ""), 'zip', source_folder)

print(f"Zipped folder saved as: {output_zip}")

Zipped folder saved as: /content/Celeb-real.zip


In [None]:
from google.colab import files
files.download("/content/Celeb-real.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Do Not Execute
## CNN code for face detection of CelebDF dataset of synthetic videos

In [None]:
import os
import cv2
import torch
import torchvision.transforms as T
import numpy as np
from insightface.app import FaceAnalysis
from tqdm import tqdm
from PIL import Image

# Check if MPS is available, else fallback to CPU
device = torch.device("cpu")  # Must use CPU due to MPS limitations

# Initialize InsightFace RetinaFace
face_detector = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"])  # ✅ Load RetinaFace
face_detector.prepare(ctx_id=-1)  # ✅ Run on CPU

# Define target size (Xception requires 299x299)
TARGET_SIZE = (299, 299)

# Torchvision transform for resizing and padding
def resize_with_padding(image, target_size):
    """Resize image while maintaining aspect ratio with padding (using PyTorch)."""
    transform = T.Compose([
        T.Resize(target_size, interpolation=T.InterpolationMode.BILINEAR),
        T.CenterCrop(target_size),  # Ensures it remains square after resizing
        T.ToTensor()
    ])
    return transform(image)

def detect_and_save_faces(frames, frame_indices, video_path, save_dir):
    """Detect faces in a batch of frames using InsightFace RetinaFace and save cropped images."""
    for i, frame in enumerate(frames):
        frame_np = np.array(frame)  # Convert PIL image to NumPy
        h, w, _ = frame_np.shape

        # Detect faces using InsightFace
        faces = face_detector.get(frame_np)  # ✅ Correct method

        if faces:  # Ensure at least one face is detected
            for face in faces:
                x1, y1, x2, y2 = face.bbox.astype(int)  # ✅ Correct way to get bounding box

                # Ensure bounding box stays within frame
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(w, x2), min(h, y2)

                # Extract face region
                if x2 > x1 and y2 > y1:
                    face_crop = frame_np[y1:y2, x1:x2]
                    face_pil = Image.fromarray(face_crop)

                    # Resize while keeping aspect ratio
                    face_tensor = resize_with_padding(face_pil, TARGET_SIZE)

                    # Convert back to PIL for saving
                    face_pil = T.ToPILImage()(face_tensor)

                    # Save face image
                    face_path = os.path.join(save_dir, f"{os.path.basename(video_path)}_{frame_indices[i]}.jpg")
                    face_pil.save(face_path)

def extract_faces(video_path, save_dir, frames_per_video=10, batch_size=4):
    """Extract faces from a video and save them as uniformly resized images."""
    os.makedirs(save_dir, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Cannot open {video_path}")
        return

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    step = max(1, frame_count // frames_per_video)

    frame_batch = []
    frame_indices = []

    for i in range(0, frame_count, step):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if not ret:
            continue

        # Convert to RGB before face detection
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        pil_img = Image.fromarray(frame_rgb)

        frame_batch.append(pil_img)
        frame_indices.append(i)

        if len(frame_batch) >= batch_size:
            detect_and_save_faces(frame_batch, frame_indices, video_path, save_dir)
            frame_batch.clear()
            frame_indices.clear()

    if frame_batch:
        detect_and_save_faces(frame_batch, frame_indices, video_path, save_dir)

    cap.release()

# Paths
video_folder = os.path.abspath("CelebDF")
output_folder = "Preprocessing"
video_extensions = {".mp4", ".avi", ".mov", ".mkv"}

# Process all videos
for subfolder in os.listdir(video_folder):
    subfolder_path = os.path.join(video_folder, subfolder)

    if not os.path.isdir(subfolder_path):
        continue

    for video in tqdm(os.listdir(subfolder_path), desc=f"Processing {subfolder}", mininterval=2):
        video_path = os.path.join(subfolder_path, video)

        output_check = os.path.join(output_folder, f"{video}_0.jpg")
        if os.path.exists(output_check):
            continue

        if os.path.isfile(video_path) and any(video.lower().endswith(ext) for ext in video_extensions):
            extract_faces(video_path, output_folder)




To Delete Folder


In [None]:
import shutil

folder_path = "/content/dataset/Preprocessed-images/Celeb-real"  # Replace with the actual folder path

shutil.rmtree(folder_path)  # Deletes the folder and all its contents

print(f"Deleted folder: {folder_path}")

Deleted folder: /content/dataset/Preprocessed-images/Celeb-real
