<a href="https://colab.research.google.com/github/anchal2611/Cry-Compass/blob/main/Deepfake_Detection_Notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
greatgamedota_faceforensics_path = kagglehub.dataset_download('greatgamedota/faceforensics')
nanduncs_1000_videos_split_path = kagglehub.dataset_download('nanduncs/1000-videos-split')
hungle3401_faceforensics_path = kagglehub.dataset_download('hungle3401/faceforensics')
sanikatiwarekar_deep_fake_detection_dfd_entire_original_dataset_path = kagglehub.dataset_download('sanikatiwarekar/deep-fake-detection-dfd-entire-original-dataset')
chandashekar8_deepfake_testing_videos_path = kagglehub.dataset_download('chandashekar8/deepfake-testing-videos')

print('Data source import complete.')


In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# DATA PREPARATION

In [None]:
!pip install tensorflow opencv-python tqdm scikit-learn matplotlib seaborn mtcnn

In [None]:
# Data Preparation
!pip install tensorflow opencv-python tqdm scikit-learn matplotlib seaborn mtcnn lz4  # Added lz4
import os
import cv2
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from mtcnn import MTCNN

# Paths to FF++ data in Kaggle
REAL_PATH = "/kaggle/input/faceforensics/FF++/real"
FAKE_PATH = "/kaggle/input/faceforensics/FF++/fake"
OUTPUT_FRAME_SIZE = (128, 128)  # Reduced size for memory efficiency
FRAME_COUNT = 10  # Frames per video
MAX_VIDEOS = 200  # Reduced to 200 per class to manage memory (adjust as needed)

# Verify dataset availability
if not os.path.exists(REAL_PATH) or not os.path.exists(FAKE_PATH):
    raise FileNotFoundError(f"Dataset paths not found: REAL_PATH={REAL_PATH}, FAKE_PATH={FAKE_PATH}. Please add the FaceForensics++ dataset.")

detector = MTCNN()

def extract_face(frame):
    results = detector.detect_faces(frame)
    if results:
        x, y, w, h = results[0]['box']
        x, y = max(0, x), max(0, y)
        face = frame[y:y+h, x:x+w]
        return cv2.resize(face, OUTPUT_FRAME_SIZE)
    return cv2.resize(frame, OUTPUT_FRAME_SIZE)  # Fallback if no face detected

def extract_frames(video_path, output_size=(128, 128), frame_count=10):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Warning: Could not open video file {video_path}, skipping.")
        return np.array([])
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    step = max(total_frames // frame_count, 1)
    for i in range(frame_count):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * step)
        ret, frame = cap.read()
        if not ret:
            break
        frame = extract_face(frame)
        frames.append(frame)
    cap.release()
    return np.array(frames) if len(frames) > 0 else np.zeros((frame_count, output_size[0], output_size[1], 3), dtype=np.uint8)

data = []
labels = []

print("Processing real videos...")
real_videos = os.listdir(REAL_PATH)[:MAX_VIDEOS]
for video_file in tqdm(real_videos):
    video_path = os.path.join(REAL_PATH, video_file)
    frames = extract_frames(video_path, output_size=OUTPUT_FRAME_SIZE, frame_count=FRAME_COUNT)
    if len(frames) == FRAME_COUNT:
        data.append(frames)
        labels.append(0)  # 0 for real

print("Processing fake videos...")
fake_videos = os.listdir(FAKE_PATH)[:MAX_VIDEOS]
for video_file in tqdm(fake_videos):
    video_path = os.path.join(FAKE_PATH, video_file)
    frames = extract_frames(video_path, output_size=OUTPUT_FRAME_SIZE, frame_count=FRAME_COUNT)
    if len(frames) == FRAME_COUNT:
        data.append(frames)
        labels.append(1)  # 1 for fake

if len(data) == 0:
    raise ValueError("No valid frames extracted. Check dataset or reduce MAX_VIDEOS/FRAME_COUNT.")

data = np.array(data)
labels = np.array(labels)

# Split into train, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(data, labels, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Normalize pixel values
X_train = X_train / 255.0
X_val = X_val / 255.0
X_test = X_test / 255.0

# Convert labels to categorical
y_train = to_categorical(y_train, num_classes=2)
y_val = to_categorical(y_val, num_classes=2)
y_test = to_categorical(y_test, num_classes=2)

print(f"Data shapes: Train - {X_train.shape}, Val - {X_val.shape}, Test - {X_test.shape}")

# Save to avoid recomputing
np.save('/kaggle/working/X_train.npy', X_train)
np.save('/kaggle/working/y_train.npy', y_train)
np.save('/kaggle/working/X_val.npy', X_val)
np.save('/kaggle/working/y_val.npy', y_val)
np.save('/kaggle/working/X_test.npy', X_test)
np.save('/kaggle/working/y_test.npy', y_test)

# Verify saved files
!ls -lh /kaggle/working/

# DATA AUGMENTATION

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(
    horizontal_flip=True,
    rotation_range=40,
    zoom_range=0.4,
    brightness_range=[0.5, 1.5],
    shear_range=0.4,
    width_shift_range=0.3,
    height_shift_range=0.3,
    channel_shift_range=10.0,
    fill_mode='nearest'
)

def augment_frames(frames):
    augmented_frames = []
    for frame in frames:
        frame = datagen.random_transform(frame)
        augmented_frames.append(frame)
    return np.array(augmented_frames)

augmented_data = []
augmented_labels = []

for i in tqdm(range(len(X_train))):
    augmented_frames = augment_frames(X_train[i])
    augmented_data.append(augmented_frames)
    augmented_labels.append(y_train[i])

X_train_augmented = np.concatenate((X_train, np.array(augmented_data)))
y_train_augmented = np.concatenate((y_train, np.array(augmented_labels)))

print(f"Augmented Train Data: {X_train_augmented.shape}")
np.save('/kaggle/working/X_train_augmented.npy', X_train_augmented)
np.save('/kaggle/working/y_train_augmented.npy', y_train_augmented)

# MODEL ARCHITECTURE

In [None]:
import tensorflow as tf
from tensorflow.keras.applications import Xception
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, TimeDistributed, Bidirectional, GRU, Dropout, BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.regularizers import l2

def build_best_model(input_shape=(FRAME_COUNT, 128, 128, 3)):
    base_model = Xception(weights='imagenet', include_top=False, input_shape=(128, 128, 3))
    base_model.trainable = True
    for layer in base_model.layers[:-30]:  # Unfreeze last 30 layers for fine-tuning
        layer.trainable = False

    model = Sequential([
        TimeDistributed(base_model),
        TimeDistributed(GlobalAveragePooling2D()),
        BatchNormalization(),
        Bidirectional(GRU(128, return_sequences=False, kernel_regularizer=l2(0.005))),
        Dropout(0.5),
        Dense(64, activation='relu', kernel_regularizer=l2(0.005)),
        BatchNormalization(),
        Dropout(0.5),
        Dense(2, activation='softmax')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001, clipnorm=1.0),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model

model = build_best_model()
model.summary()

# MODEL TRAINING

In [None]:
# Model Training
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from sklearn.utils import class_weight

# Compute class weights for balance
class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(np.argmax(y_train, axis=1)), y=np.argmax(y_train, axis=1))
class_weight_dict = {0: class_weights[0], 1: class_weights[1]}

# Place ModelCheckpoint here
checkpoint = ModelCheckpoint(
    "/kaggle/working/deepfake_detection_model.keras",
    monitor="val_accuracy",
    save_best_only=True,
    verbose=1
)

lr_scheduler = ReduceLROnPlateau(
    monitor="val_loss",
    factor=0.5,
    patience=5,
    min_lr=1e-6,
    verbose=1
)

early_stopping = EarlyStopping(
    monitor="val_accuracy",
    patience=20,
    restore_best_weights=True,
    verbose=1
)

# Train the model
history = model.fit(
    X_train_augmented, y_train_augmented,
    validation_data=(X_val, y_val),
    epochs=50,  # Reduced for testing, adjust to 100 if needed
    batch_size=8,
    class_weight=class_weight_dict,
    callbacks=[checkpoint, lr_scheduler, early_stopping]
)

# Place model.save() here
model.save("/kaggle/working/deepfake_detection_model_final.keras")

# Verify saved files
!ls -lh /kaggle/working/

# MODEL TESTING

In [None]:
from tensorflow.keras.models import load_model
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

model = load_model('/kaggle/working/deepfake_detection_model.keras')

y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

accuracy = accuracy_score(y_true, y_pred_classes)
print(f"Test Accuracy: {accuracy * 100:.2f}%")
print("Classification Report:")
print(classification_report(y_true, y_pred_classes, target_names=['REAL', 'FAKE']))

def plot_training_history(history):
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    cm = confusion_matrix(y_true, y_pred_classes)
    cm_labels = ['Real', 'Fake']
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=cm_labels, yticklabels=cm_labels)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.show()
    print(classification_report(y_true, y_pred_classes, target_names=cm_labels))

plot_training_history(history)
plot_confusion_matrix(model, X_test, y_test)

# REAL TIME DETECTION

In [None]:
# Real-Time Detection
import cv2
import numpy as np
import os
from tensorflow.keras.models import load_model

# Function to extract frames from a video
def extract_frames(video_path, output_size=(128, 128), frame_count=10):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}")
        return np.array([])  # Return empty array if video fails to open
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    step = max(total_frames // frame_count, 1)  # Uniform sampling

    for i in range(frame_count):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * step)
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, output_size)  # Resize frame
        frames.append(frame)
    cap.release()

    # Convert to NumPy array and ensure 4D shape
    if len(frames) == 0:
        print(f"Warning: No frames extracted from {video_path}")
        return np.zeros((0, output_size[0], output_size[1], 3), dtype=np.uint8)
    return np.array(frames, dtype=np.uint8)

# Function to predict video
def predict_video(video_path, model, output_size=(128, 128), frame_count=10):
    frames = extract_frames(video_path, output_size, frame_count)
    if len(frames) == 0:
        print(f"Error: No frames to predict for {video_path}")
        return

    # Ensure frames is 4D
    if len(frames.shape) == 3:  # (height, width, channels)
        frames = np.expand_dims(frames, axis=0)  # Make it (1, height, width, channels)
    elif len(frames.shape) == 4 and frames.shape[0] < frame_count:
        pad_length = frame_count - frames.shape[0]
        pad_width = [(0, pad_length)] + [(0, 0) for _ in range(len(frames.shape) - 1)]
        frames = np.pad(frames, pad_width, mode='constant', constant_values=0)
    elif len(frames.shape) != 4:
        print(f"Error: Unexpected frame shape {frames.shape} for {video_path}")
        return

    frames = frames / 255.0  # Normalize
    frames = np.expand_dims(frames, axis=0)  # Add batch dimension
    prediction = model.predict(frames)
    label = "FAKE" if np.argmax(prediction) == 1 else "REAL"
    confidence = float(prediction[0][np.argmax(prediction)])
    print(f"Prediction: {label} (Confidence: {confidence:.2f})")

# Load the model with file existence check
model_path = '/kaggle/working/deepfake_detection_model.keras'
if os.path.exists(model_path):
    loaded_model = load_model(model_path)
    print(f"Model loaded successfully from {model_path}")
else:
    print(f"Error: Model file not found at {model_path}. Please run the training section first.")
    raise FileNotFoundError(f"Model file not found at {model_path}")

# Use the same dataset paths as data loading
REAL_PATH = "/kaggle/input/faceforensics/FF++/real"
FAKE_PATH = "/kaggle/input/faceforensics/FF++/fake"

# Get the first available video files (verify they exist)
real_videos = os.listdir(REAL_PATH)
fake_videos = os.listdir(FAKE_PATH)
if not real_videos or not fake_videos:
    print("Error: No video files found in REAL_PATH or FAKE_PATH. Check dataset input.")
else:
    real_sample_path = os.path.join(REAL_PATH, real_videos[0])  # First real video
    fake_sample_path = os.path.join(FAKE_PATH, fake_videos[0])  # First fake video

    # Test predictions
    print("Real Video Prediction:")
    predict_video(real_sample_path, loaded_model)

    print("Fake Video Prediction:")
    predict_video(fake_sample_path, loaded_model)

# List contents of /kaggle/working/ to verify model or other files


In [None]:
!ls -lh /kaggle/working/

In [None]:
!ls -lh /kaggle/working/deepfake_detection_model.keras

# UI CODE

In [None]:
import os
import cv2
import numpy as np
import customtkinter as ctk
from tkinter import filedialog, messagebox
from tensorflow.keras.models import load_model
from mtcnn import MTCNN
from PIL import Image, ImageTk
import csv

# ---------------------------
# PARAMETERS
# ---------------------------
OUTPUT_FRAME_SIZE = (128, 128)
FRAME_COUNT = 10
MODEL_PATH = "deepfake_detection_model.keras"
FEEDBACK_FILE = "feedback_log.csv"

detector = MTCNN()

# ---------------------------
# FACE + FRAME EXTRACTION
# ---------------------------
def extract_face(frame):
    results = detector.detect_faces(frame)
    if results:
        x, y, w, h = results[0]['box']
        x, y = max(0, x), max(0, y)
        face = frame[y:y + h, x:x + w]
        return cv2.resize(face, OUTPUT_FRAME_SIZE)
    return cv2.resize(frame, OUTPUT_FRAME_SIZE)

def extract_frames(video_path, frame_count=FRAME_COUNT, output_size=OUTPUT_FRAME_SIZE):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return np.array([])
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    step = max(total_frames // frame_count, 1)
    for i in range(frame_count):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * step)
        ret, frame = cap.read()
        if not ret:
            break
        frame = extract_face(frame)
        frames.append(frame)
    cap.release()
    return np.array(frames) if len(frames) > 0 else np.zeros(
        (frame_count, output_size[0], output_size[1], 3), dtype=np.uint8
    )

# ---------------------------
# LOAD TRAINED MODEL
# ---------------------------
if os.path.exists(MODEL_PATH):
    model = load_model(MODEL_PATH)
    print("‚úÖ Model loaded successfully!")
else:
    raise FileNotFoundError(f"‚ùå Model not found: {MODEL_PATH}")

# ---------------------------
# PREDICTION FUNCTION
# ---------------------------
def predict_video(video_path):
    frames = extract_frames(video_path)
    if frames.shape[0] != FRAME_COUNT:
        return "Error"
    frames = frames / 255.0
    frames = np.expand_dims(frames, axis=0)
    prediction = model.predict(frames, verbose=0)
    class_idx = np.argmax(prediction, axis=1)[0]
    return "Real" if class_idx == 0 else "Deepfake"

# ---------------------------
# FEEDBACK SAVE FUNCTION
# ---------------------------
def save_feedback(video_path, model_prediction, user_feedback):
    file_exists = os.path.isfile(FEEDBACK_FILE)
    with open(FEEDBACK_FILE, mode="a", newline="") as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(["Video", "Prediction", "UserFeedback"])
        writer.writerow([os.path.basename(video_path), model_prediction, user_feedback])
    print(f"üíæ Feedback saved: {user_feedback}")

# ---------------------------
# MODERN GUI (CTk)
# ---------------------------
class DeepfakeDetectorApp(ctk.CTk):
    def __init__(self):
        super().__init__()

        self.title("üé≠ Deepfake Detector")
        self.geometry("750x600")
        ctk.set_appearance_mode("dark")
        ctk.set_default_color_theme("blue")

        self.video_path = None
        self.prediction_result = None

        # Title
        self.title_label = ctk.CTkLabel(
            self, text="AI Deepfake Detector",
            font=("Poppins Bold", 28), text_color="white"
        )
        self.title_label.pack(pady=20)

        # Upload Button
        self.upload_btn = ctk.CTkButton(
            self, text="üìÇ Upload Video",
            command=self.upload_video,
            font=("Poppins", 18),
            width=220, height=50, corner_radius=15
        )
        self.upload_btn.pack(pady=20)

        # Progress Bar
        self.progress = ctk.CTkProgressBar(self, width=400)
        self.progress.pack(pady=10)
        self.progress.set(0)

        # Preview Frame
        self.preview_label = ctk.CTkLabel(self, text="", width=400, height=200)
        self.preview_label.pack(pady=15)

        # Result Stamp
        self.result_label = ctk.CTkLabel(
            self, text="", font=("Poppins Bold", 26),
            text_color="white"
        )
        self.result_label.pack(pady=25)

        # Feedback Buttons (Initially hidden)
        self.feedback_frame = ctk.CTkFrame(self)
        self.correct_btn = ctk.CTkButton(
            self.feedback_frame, text="üëç Correct",
            fg_color="#4CAF50", hover_color="#388E3C",
            command=lambda: self.save_user_feedback("Correct")
        )
        self.wrong_btn = ctk.CTkButton(
            self.feedback_frame, text="üëé Wrong",
            fg_color="#FF5252", hover_color="#D32F2F",
            command=lambda: self.save_user_feedback("Wrong")
        )
        self.correct_btn.pack(side="left", padx=20, pady=10)
        self.wrong_btn.pack(side="left", padx=20, pady=10)

        # Status Bar
        self.status_label = ctk.CTkLabel(
            self, text="Ready", font=("Poppins", 14),
            text_color="gray"
        )
        self.status_label.pack(side="bottom", pady=10)

    def upload_video(self):
        file_path = filedialog.askopenfilename(
            title="Select a video",
            filetypes=[("Video files", "*.mp4 *.avi *.mov *.mkv")]
        )
        if file_path:
            self.video_path = file_path
            try:
                self.progress.set(0.3)
                self.status_label.configure(text="Processing video...")

                # Show first frame as preview
                cap = cv2.VideoCapture(file_path)
                ret, frame = cap.read()
                cap.release()
                if ret:
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    img = Image.fromarray(frame_rgb)
                    img.thumbnail((400, 200))
                    imgtk = ImageTk.PhotoImage(img)
                    self.preview_label.configure(image=imgtk)
                    self.preview_label.image = imgtk

                # Prediction
                self.progress.set(0.7)
                result = predict_video(file_path)
                self.prediction_result = result

                if result == "Real":
                    self.result_label.configure(
                        text="‚úÖ REAL VIDEO",
                        text_color="#4CAF50"
                    )
                else:
                    self.result_label.configure(
                        text="‚ùå DEEPFAKE DETECTED",
                        text_color="#FF5252"
                    )

                # Show feedback options
                self.feedback_frame.pack(pady=10)
                self.progress.set(1.0)
                self.status_label.configure(text="Done")

            except Exception as e:
                messagebox.showerror("Error", f"Failed:\n{e}")
                self.status_label.configure(text="Error")

    def save_user_feedback(self, feedback):
        if self.video_path and self.prediction_result:
            save_feedback(self.video_path, self.prediction_result, feedback)
            messagebox.showinfo("Feedback", "‚úÖ Thanks for your feedback!")
            self.status_label.configure(text="Feedback submitted")
            # Hide feedback buttons after submission
            self.feedback_frame.pack_forget()

# ---------------------------
# ENTRY POINT
# ---------------------------
if __name__ == "__main__":
    app = DeepfakeDetectorApp()
    app.mainloop()


# MADE BY : ANCHAL GUPTA