In [11]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import cv2
from pathlib import Path
from tqdm import tqdm
import pickle
import joblib
from scipy.stats import skew
from skimage.feature import local_binary_pattern, hog


from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, classification_report, confusion_matrix

import warnings
warnings.filterwarnings('ignore')

In [12]:
train = pd.read_csv("/kaggle/input/deepfake-metadata/train_metadata.csv")
val = pd.read_csv("/kaggle/input/deepfake-metadata/val_metadata.csv")
test = pd.read_csv("/kaggle/input/deepfake-metadata/test_metadata.csv")

In [13]:
print(train.shape)
print(val.shape)
print(test.shape)

(2758, 6)
(597, 6)
(587, 6)


In [14]:
train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2758 entries, 0 to 2757
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   label         2758 non-null   object
 1   type          2758 non-null   object
 2   path          2758 non-null   object
 3   video_folder  2758 non-null   object
 4   frames        2758 non-null   int64 
 5   source_id     2758 non-null   int64 
dtypes: int64(2), object(4)
memory usage: 129.4+ KB


In [15]:
train.iloc[1000]

label                                                        fake
type                                                    Face2Face
path            /kaggle/input/faceforencispp-extracted-frames/...
video_folder                                              605_591
frames                                                         32
source_id                                                     605
Name: 1000, dtype: object

In [16]:
print(train["path"].iloc[0])

/kaggle/input/faceforencispp-extracted-frames/real/437


In [17]:
class ClassicalFeatureExtractor:
    def __init__(self):
        self.lbp_radius = 1
        self.lbp_points = 8
        self.hog_orientations = 9
        self.hog_pixels_per_cell = (8, 8)
        self.hog_cells_per_block = (2, 2)
        
    def extract_lbp_features(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        lbp = local_binary_pattern(gray, self.lbp_points, self.lbp_radius, method='uniform')
        hist, _ = np.histogram(lbp.ravel(), bins=self.lbp_points + 2, range=(0, self.lbp_points + 2))
        hist = hist.astype(float)
        hist /= (hist.sum() + 1e-7)
        return hist
    
    def extract_hog_features(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        gray_small = cv2.resize(gray, (64, 64))
        features = hog(gray_small, orientations=8,
                      pixels_per_cell=(16, 16),
                      cells_per_block=(1, 1),
                      block_norm='L2-Hys', visualize=False, feature_vector=True)
        return features
    
    def extract_color_stats(self, image):
        stats = []
        for channel in range(3):
            ch = image[:, :, channel].flatten()
            stats.extend([
                np.mean(ch),
                np.std(ch),
                skew(ch)
            ])
        return np.array(stats)
    
    def extract_landmark_features(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        gray_small = cv2.resize(gray, (128, 128))
        detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        
        faces = detector.detectMultiScale(gray_small, 1.3, 3)
        
        features = np.zeros(5)
        
        if len(faces) > 0:
            x, y, w, h = faces[0]
            features[0] = w
            features[1] = h
            features[2] = w / (h + 1e-7)
            features[3] = x + w/2
            features[4] = y + h/2
        
        return features
    
    def extract_all_features(self, image_path):
        image = cv2.imread(str(image_path))
        if image is None:
            return None
        
        image = cv2.resize(image, (128, 128))
        
        lbp_feats = self.extract_lbp_features(image)
        color_feats = self.extract_color_stats(image)
        landmark_feats = self.extract_landmark_features(image)
        
        all_features = np.concatenate([lbp_feats, color_feats, landmark_feats])
        return all_features

def process_video_folder(video_path, extractor, max_frames=16):
    video_path = Path(video_path)
    frame_files = sorted(list(video_path.glob('*.png')) + list(video_path.glob('*.jpg')))
    
    if len(frame_files) == 0:
        return None
    
    step = max(1, len(frame_files) // max_frames)
    sampled_frames = frame_files[::step][:max_frames]
    
    features_list = []
    for frame_file in sampled_frames:
        feats = extractor.extract_all_features(frame_file)
        if feats is not None:
            features_list.append(feats)
    
    if len(features_list) == 0:
        return None
    
    features_array = np.array(features_list)
    aggregated = np.concatenate([
        np.mean(features_array, axis=0),
        np.std(features_array, axis=0),
        np.max(features_array, axis=0)
    ])
    
    return aggregated

def extract_features_for_split(metadata_df, extractor, split_name):
    features_list = []
    labels_list = []
    failed_indices = []
    
    for idx, row in tqdm(metadata_df.iterrows(), total=len(metadata_df), desc=f"Processing {split_name}"):
        video_features = process_video_folder(row['path'], extractor, max_frames=16)
        
        if video_features is not None:
            features_list.append(video_features)
            labels_list.append(1 if row['label'] == 'fake' else 0)
        else:
            failed_indices.append(idx)
    
    features_array = np.array(features_list)
    labels_array = np.array(labels_list)
    
    print(f"{split_name} - Extracted features shape: {features_array.shape}")
    print(f"{split_name} - Labels shape: {labels_array.shape}")
    print(f"{split_name} - Failed samples: {len(failed_indices)}")
    
    return features_array, labels_array, failed_indices

In [None]:
if __name__ == "__main__":
    train = pd.read_csv("/kaggle/input/deepfake-metadata/train_metadata.csv")
    val = pd.read_csv("/kaggle/input/deepfake-metadata/val_metadata.csv")
    test = pd.read_csv("/kaggle/input/deepfake-metadata/test_metadata.csv")
    
    extractor = ClassicalFeatureExtractor()
    
    print("Extracting train features...")
    train_features, train_labels, train_failed = extract_features_for_split(train, extractor, "train")
    
    print("\nExtracting validation features...")
    val_features, val_labels, val_failed = extract_features_for_split(val, extractor, "val")
    
    print("\nExtracting test features...")
    test_features, test_labels, test_failed = extract_features_for_split(test, extractor, "test")
    
    with open('classical_features.pkl', 'wb') as f:
        pickle.dump({
            'train_features': train_features,
            'train_labels': train_labels,
            'val_features': val_features,
            'val_labels': val_labels,
            'test_features': test_features,
            'test_labels': test_labels,
            'train_failed': train_failed,
            'val_failed': val_failed,
            'test_failed': test_failed
        }, f)
    
    print("\nFeatures saved to classical_features.pkl")
    print(f"Total feature dimension: {train_features.shape[1]}")

Extracting train features...


Processing train:  27%|██▋       | 745/2758 [07:53<21:21,  1.57it/s]

In [None]:
def load_features():
    with open('/kaggle/input/deepfake-metadata/classical_features.pkl', 'rb') as f:
        data = pickle.load(f)
    return data

In [None]:
def normalize_features(train_feats, val_feats, test_feats):
    scaler = StandardScaler()
    train_norm = scaler.fit_transform(train_feats)
    val_norm = scaler.transform(val_feats)
    test_norm = scaler.transform(test_feats)
    return train_norm, val_norm, test_norm, scaler

In [None]:
def evaluate_model(model, X, y, split_name):
    y_pred = model.predict(X)
    y_proba = model.predict_proba(X)[:, 1] if hasattr(model, 'predict_proba') else y_pred
    
    acc = accuracy_score(y, y_pred)
    prec = precision_score(y, y_pred)
    rec = recall_score(y, y_pred)
    f1 = f1_score(y, y_pred)
    
    try:
        auc = roc_auc_score(y, y_proba)
    except:
        auc = 0.0
    
    print(f"\n{split_name} Results:")
    print(f"Accuracy: {acc:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall: {rec:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"ROC AUC: {auc:.4f}")
    
    return {
        'accuracy': acc,
        'precision': prec,
        'recall': rec,
        'f1': f1,
        'auc': auc
    }

In [None]:
def train_random_forest(X_train, y_train, X_val, y_val):
    print("\nTraining Random Forest...")
    rf = RandomForestClassifier(n_estimators=200, max_depth=20, min_samples_split=5, 
                                min_samples_leaf=2, random_state=42, n_jobs=-1)
    rf.fit(X_train, y_train)
    
    train_metrics = evaluate_model(rf, X_train, y_train, "Train")
    val_metrics = evaluate_model(rf, X_val, y_val, "Validation")
    
    return rf, train_metrics, val_metrics

In [None]:
def train_gradient_boosting(X_train, y_train, X_val, y_val):
    print("\nTraining Gradient Boosting...")
    gb = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, max_depth=5,
                                   min_samples_split=5, min_samples_leaf=2, 
                                   random_state=42)
    gb.fit(X_train, y_train)
    
    train_metrics = evaluate_model(gb, X_train, y_train, "Train")
    val_metrics = evaluate_model(gb, X_val, y_val, "Validation")
    
    return gb, train_metrics, val_metrics

In [None]:
def train_svm(X_train, y_train, X_val, y_val):
    print("\nTraining SVM...")
    svm = SVC(kernel='rbf', C=1.0, gamma='scale', probability=True, random_state=42)
    svm.fit(X_train, y_train)
    
    train_metrics = evaluate_model(svm, X_train, y_train, "Train")
    val_metrics = evaluate_model(svm, X_val, y_val, "Validation")
    
    return svm, train_metrics, val_metrics

In [None]:
def train_logistic_regression(X_train, y_train, X_val, y_val):
    print("\nTraining Logistic Regression...")
    lr = LogisticRegression(C=1.0, max_iter=1000, random_state=42, n_jobs=-1)
    lr.fit(X_train, y_train)
    
    train_metrics = evaluate_model(lr, X_train, y_train, "Train")
    val_metrics = evaluate_model(lr, X_val, y_val, "Validation")
    
    return lr, train_metrics, val_metrics

In [None]:
if __name__ == "__main__":
    print("Loading features...")
    data = load_features()
    
    X_train = data['train_features']
    y_train = data['train_labels']
    X_val = data['val_features']
    y_val = data['val_labels']
    X_test = data['test_features']
    y_test = data['test_labels']
    
    print(f"Train samples: {X_train.shape[0]}, Features: {X_train.shape[1]}")
    print(f"Val samples: {X_val.shape[0]}")
    print(f"Test samples: {X_test.shape[0]}")
    
    print("\nNormalizing features...")
    X_train_norm, X_val_norm, X_test_norm, scaler = normalize_features(X_train, X_val, X_test)
    
    results = {}
    
    rf_model, rf_train, rf_val = train_random_forest(X_train_norm, y_train, X_val_norm, y_val)
    results['random_forest'] = {'train': rf_train, 'val': rf_val}
    
    gb_model, gb_train, gb_val = train_gradient_boosting(X_train_norm, y_train, X_val_norm, y_val)
    results['gradient_boosting'] = {'train': gb_train, 'val': gb_val}
    
    svm_model, svm_train, svm_val = train_svm(X_train_norm, y_train, X_val_norm, y_val)
    results['svm'] = {'train': svm_train, 'val': svm_val}
    
    lr_model, lr_train, lr_val = train_logistic_regression(X_train_norm, y_train, X_val_norm, y_val)
    results['logistic_regression'] = {'train': lr_train, 'val': lr_val}
    
    print("\n" + "="*50)
    print("Model Comparison on Validation Set")
    print("="*50)
    for model_name, metrics in results.items():
        print(f"\n{model_name.upper()}:")
        print(f"  Accuracy: {metrics['val']['accuracy']:.4f}")
        print(f"  F1 Score: {metrics['val']['f1']:.4f}")
        print(f"  ROC AUC: {metrics['val']['auc']:.4f}")
    
    best_model_name = max(results.items(), key=lambda x: x[1]['val']['f1'])[0]
    print(f"\nBest model by F1: {best_model_name}")
    
    models = {
        'random_forest': rf_model,
        'gradient_boosting': gb_model,
        'svm': svm_model,
        'logistic_regression': lr_model,
        'scaler': scaler
    }
    
    joblib.dump(models, 'classical_ml_models.pkl')
    joblib.dump(results, 'classical_ml_results.pkl')
    print("\nModels saved to classical_ml_models.pkl")