In [58]:
import os
import shutil
import random

In [61]:
def create_splits(root_dir,
                  test_list_filename="List_of_testing_videos.txt",
                  subfolders=["Celeb-synthesis", "Youtube-real", "Celeb-real"],
                  train_ratio=0.8):
    """
    Reads the test list file (ignoring the first number on each line) and
    copies videos from the specified subfolders into a 'test' folder if their
    relative path (e.g., "Youtube-real/00170.mp4") is in the list.
    Then, the remaining videos are randomly split into 'train' and 'val'
    folders according to the train_ratio.
    
    Parameters:
      root_dir (str): The path to the "celeb-df" folder.
      test_list_filename (str): Name of the text file containing test video paths.
      subfolders (list of str): The subfolders (relative to root_dir) to search.
      train_ratio (float): Fraction of the remaining (non-test) videos to go to train.
    """
    # Create output directories
    test_dir = os.path.join(root_dir, "test")
    train_dir = os.path.join(root_dir, "train")
    val_dir   = os.path.join(root_dir, "val")
    os.makedirs(test_dir, exist_ok=True)
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(val_dir, exist_ok=True)

    # Read the test file and parse each line.
    # Each line is assumed to be: "<number> <relative_path>"
    test_list_path = os.path.join(root_dir, test_list_filename)
    test_paths = set()
    with open(test_list_path, 'r') as f:
        for line in f:
            # Split the line by whitespace
            parts = line.strip().split()
            if len(parts) >= 2:
                # Use the second token as the relative path
                rel_path = parts[1].strip()
                # For robust matching, store as lower-case
                test_paths.add(rel_path.lower())

    remaining_files = []  # List to store non-test video file paths

    # Loop through each specified subfolder
    for folder in subfolders:
        folder_path = os.path.join(root_dir, folder)
        if not os.path.isdir(folder_path):
            print(f"Folder '{folder_path}' not found. Skipping.")
            continue

        for file_name in os.listdir(folder_path):
            if file_name.lower().endswith(".mp4"):
                # Construct the relative path as it would appear in the test list.
                # For example: "Youtube-real/00170.mp4"
                relative_path = os.path.join(folder, file_name).replace("\\", "/")
                # Compare in lower-case for robustness.
                if relative_path.lower() in test_paths:
                    dest_file = os.path.join(test_dir, file_name)
                    print(f"Copying {os.path.join(folder_path, file_name)} to {dest_file} (test set)")
                    shutil.copy2(os.path.join(folder_path, file_name), dest_file)
                else:
                    remaining_files.append(os.path.join(folder_path, file_name))

    # Shuffle the remaining (non-test) videos
    random.shuffle(remaining_files)
    num_remaining = len(remaining_files)
    num_train = int(train_ratio * num_remaining)

    train_files = remaining_files[:num_train]
    val_files = remaining_files[num_train:]

    # Copy training files
    for source_file in train_files:
        file_name = os.path.basename(source_file)
        dest_file = os.path.join(train_dir, file_name)
        print(f"Copying {source_file} to {dest_file} (train set)")
        shutil.copy2(source_file, dest_file)

    # Copy validation files
    for source_file in val_files:
        file_name = os.path.basename(source_file)
        dest_file = os.path.join(val_dir, file_name)
        print(f"Copying {source_file} to {dest_file} (val set)")
        shutil.copy2(source_file, dest_file)

    print(f"Done. Total non-test videos: {num_remaining}. Train: {len(train_files)}, Val: {len(val_files)}.")

# Set your root directory (e.g., the folder "celeb-df")
root_directory = "Celeb-DF"
create_splits(root_directory)


Copying Celeb-DF\Celeb-synthesis\id10_id11_0001.mp4 to Celeb-DF\test\id10_id11_0001.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id10_id11_0004.mp4 to Celeb-DF\test\id10_id11_0004.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id10_id12_0001.mp4 to Celeb-DF\test\id10_id12_0001.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id10_id12_0004.mp4 to Celeb-DF\test\id10_id12_0004.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id10_id13_0001.mp4 to Celeb-DF\test\id10_id13_0001.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id10_id13_0004.mp4 to Celeb-DF\test\id10_id13_0004.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id10_id7_0001.mp4 to Celeb-DF\test\id10_id7_0001.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id10_id7_0004.mp4 to Celeb-DF\test\id10_id7_0004.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id11_id7_0008.mp4 to Celeb-DF\test\id11_id7_0008.mp4 (test set)
Copying Celeb-DF\Celeb-synthesis\id16_id0_0011.mp4 to Celeb-DF\test\id16_id0_0011.mp4 (test set)
Copying Celeb-DF\C

In [None]:
def extract_features(video_path):
    """
    Extract all features from a video in one pass.
    Returns a single feature vector combining all features.
    """
    # Initialize detectors and extractors
    sift = cv2.SIFT_create()
    face_detector = dlib.get_frontal_face_detector()
    landmark_predictor = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat')
    mp_face_mesh = mp.solutions.face_mesh
    face_mesh = mp_face_mesh.FaceMesh()
    
    # Initialize feature lists
    all_features = []
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    
    # Read first frame
    ret, prev_frame = cap.read()
    if not ret:
        return None
        
    # Convert to grayscale for initial frame
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    
    # 1. Basic Statistical Features
    all_features.extend([
        np.mean(prev_gray),
        np.std(prev_gray),
        np.max(prev_gray),
        np.min(prev_gray),
        skew(prev_gray.flatten()),
        kurtosis(prev_gray.flatten())
    ])
    
    # 2. Histogram Features
    hist = cv2.calcHist([prev_gray], [0], None, [256], [0, 256])
    hist = hist.flatten() / hist.sum()  # Normalize
    all_features.extend(hist)
    
    # 3. SIFT Features
    keypoints, descriptors = sift.detectAndCompute(prev_gray, None)
    if descriptors is not None:
        sift_features = [
            np.mean(descriptors),
            np.std(descriptors),
            skew(descriptors.flatten()),
            kurtosis(descriptors.flatten())
        ]
    else:
        sift_features = [0, 0, 0, 0]
    all_features.extend(sift_features)
    
    # 4. Edge and Texture Features
    blurred = cv2.GaussianBlur(prev_gray, (5,5), 0)
    all_features.extend([
        cv2.Laplacian(blurred, cv2.CV_64F).var(),
        cv2.Sobel(blurred, cv2.CV_64F, 1, 0, ksize=5).var(),
        cv2.Sobel(blurred, cv2.CV_64F, 0, 1, ksize=5).var()
    ])
    
    # Initialize motion and facial features accumulators
    motion_features = []
    facial_features = []
    quality_scores = []
    
    # Process first 30 frames (or less if video is shorter)
    frame_count = 0
    while cap.isOpened() and frame_count < 30:
        ret, curr_frame = cap.read()
        if not ret:
            break
            
        curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
        
        # 5. Motion Features
        flow = cv2.calcOpticalFlowFarneback(
            prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
        )
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        motion_features.append([
            np.mean(mag),
            np.std(mag),
            np.mean(ang),
            np.std(ang)
        ])
        
        # 6. Facial Landmarks
        faces = face_detector(curr_gray)
        if len(faces) > 0:
            face = faces[0]
            landmarks = landmark_predictor(curr_gray, face)
            points = np.array([[p.x, p.y] for p in landmarks.parts()])
            
            # Calculate facial features
            distances = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1))
            left_eye = points[36:42]
            right_eye = points[42:48]
            eye_asymmetry = np.mean(np.abs(left_eye - np.flip(right_eye, axis=0)))
            face_height = np.linalg.norm(points[8] - points[27])
            face_width = np.linalg.norm(points[0] - points[16])
            
            facial_features.append([
                np.mean(distances),
                np.std(distances),
                eye_asymmetry,
                face_height/face_width
            ])
        
        # 7. BRISQUE Quality Score
        try:
            quality_scores.append(brisque.score(curr_frame))
        except:
            quality_scores.append(0)
        
        prev_gray = curr_gray
        frame_count += 1
    
    cap.release()
    
    # Average temporal features
    if motion_features:
        all_features.extend(np.mean(motion_features, axis=0))
    else:
        all_features.extend([0, 0, 0, 0])
        
    if facial_features:
        all_features.extend(np.mean(facial_features, axis=0))
    else:
        all_features.extend([0, 0, 0, 0])
        
    if quality_scores:
        all_features.append(np.mean(quality_scores))
    else:
        all_features.append(0)
    
    return np.array(all_features)

In [71]:
import cv2
import os
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import glob
from tqdm import tqdm
from skimage.feature import local_binary_pattern
from scipy.stats import kurtosis, skew
import dlib
import mediapipe as mp
from imquality import brisque
from xgboost import XGBClassifier
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

In [50]:
def extract_features(video_path):
    """Extract features from a video using traditional CV techniques."""
    # Open the video
    cap = cv2.VideoCapture(video_path)
    
    # Initialize features
    features = []
    
    # Read first frame
    success, frame = cap.read()
    if not success:
        return None
    
    # Convert to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # Calculate basic statistical features
    features.extend([
        np.mean(gray),  # Mean intensity
        np.std(gray),   # Standard deviation of intensity
        np.max(gray),   # Maximum intensity
        np.min(gray),   # Minimum intensity
    ])
    
    # Calculate histogram features
    hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
    hist = hist.flatten()
    hist = hist / hist.sum()  # Normalize
    features.extend(hist)
    
    # Calculate Haralick texture features
    glcm = cv2.GaussianBlur(gray, (5,5), 0)
    features.extend([
        cv2.Laplacian(glcm, cv2.CV_64F).var(),  # Edge features
        cv2.Sobel(glcm, cv2.CV_64F, 1, 0, ksize=5).var(),  # Horizontal edges
        cv2.Sobel(glcm, cv2.CV_64F, 0, 1, ksize=5).var()   # Vertical edges
    ])
    
    cap.release()
    return np.array(features)

In [73]:
def process_dataset(base_path, file_limit=5):
    """Process all videos in the dataset and prepare for training, validation and testing."""
    X_train, y_train = [], []  # Training data
    X_val, y_val = [], []      # Validation data  
    X_test, y_test = [], []    # Test data
    
    # Process training data
    train_path = os.path.join(base_path, 'train', '*.mp4')
    for video_path in tqdm(glob.glob(train_path), desc="Processing training data"):
        filename = os.path.basename(video_path)
        label = 1 if filename.startswith('id') else 0
        features = extract_features(video_path)
        if features is not None:
            X_train.append(features)
            y_train.append(label)

    # Process validation data
    val_path = os.path.join(base_path, 'val', '*.mp4')
    for video_path in tqdm(glob.glob(val_path), desc="Processing validation data"):
        filename = os.path.basename(video_path)
        label = 1 if filename.startswith('id') else 0
        features = extract_features(video_path)
        if features is not None:
            X_val.append(features)
            y_val.append(label)
    
    # Process test data
    test_path = os.path.join(base_path, 'test', '*.mp4')
    for video_path in tqdm(glob.glob(test_path), desc="Processing test data"):
        filename = os.path.basename(video_path)
        label = 1 if filename.startswith('id') else 0
        features = extract_features(video_path)
        if features is not None:
            X_test.append(features)
            y_test.append(label)
    
    return (np.array(X_train), np.array(X_val), np.array(X_test),
            np.array(y_train), np.array(y_val), np.array(y_test))

In [91]:
def train_and_evaluate(X_train, X_val, X_test, y_train, y_val, y_test):
    """Train and evaluate model with validation set."""
    
    # Train model
    model = XGBClassifier(eval_metric='logloss', early_stopping_rounds=10)  
    # Train the model
    model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)
    
    # Predict on test set
    y_pred = model.predict(X_test)
    
    # Calculate metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    print("\nTest Set Metrics:")
    print(f"Accuracy: {accuracy:.3f}")
    print(f"Precision: {precision:.3f}")
    print(f"Recall: {recall:.3f}") 
    print(f"F1 Score: {f1:.3f}")
    
    return model

In [62]:
def predict_video(model, video_path):
    """Predict whether a single video is real or fake."""
    features = extract_features(video_path)
    if features is not None:
        prediction = model.predict([features])[0]
        return "Fake" if prediction == 1 else "Real"
    return "Error processing video"

In [75]:

base_path = "Celeb-DF"  # Adjust this to your path

print("Processing dataset...")
X_train, X_test, X_val, y_train, y_val, y_test = process_dataset(base_path, file_limit=5)


# # Example of predicting a single video
# test_video = os.path.join(base_path, "test", "test_video.mp4")  # Adjust path as needed
# if os.path.exists(test_video):
#     result = predict_video(model, test_video)
#     print(f"\nTest video prediction: {result}")

Processing dataset...


Processing training data: 100%|██████████| 1063/1063 [00:23<00:00, 46.07it/s]
Processing validation data: 100%|██████████| 396/396 [00:12<00:00, 32.74it/s]
Processing test data: 100%|██████████| 100/100 [00:02<00:00, 47.16it/s]


In [39]:
np.save('X.npy', X)
np.save('y.npy', y) 

In [76]:
X_train.shape

(1063, 263)

In [92]:
model = train_and_evaluate(X_train,X_test,X_val,y_train,y_val,y_test)


Test Set Metrics:
Accuracy: 0.770
Precision: 0.885
Recall: 0.771
F1 Score: 0.824


In [57]:
#xgboost
model = train_and_evaluate(X, y)

Precision: 0.000
Recall: 0.000
F1 Score: 0.000


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [30]:
#forest
model = train_and_evaluate(X, y)


Classification Report:
              precision    recall  f1-score   support

           0       0.94      0.49      0.64        35
           1       0.89      0.99      0.94       142

    accuracy                           0.89       177
   macro avg       0.92      0.74      0.79       177
weighted avg       0.90      0.89      0.88       177

