<a href="https://colab.research.google.com/github/DeadtommiVR/DF-Detection-Exploration-/blob/main/TESSA_EDWARDS_DFD_V1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



### **Project Overview**
This project focuses on detecting deepfake videos by analyzing **spatial artifacts (within frames) and temporal inconsistencies (across frames)**. Since deepfake generation struggles to accurately replicate **natural motion**, our approach leverages **optical flow analysis, CNN-based feature extraction, and LSTM for temporal modeling**.

# **Deepfake Detection Dataset Processing & Preparation**
### **What I Did**
Given time and compute constraints, we:
- **Used a subset of videos** instead of the full dataset.
- **Extracted frames** to analyze sequences instead of raw video files.
- **Standardized sequences** so all videos have the same frame count.
- **Used a CNN + LSTM pipeline** instead of a full 3D convolutional model.

### **What I Would Have Done with More Compute**
- **Used 3D CNNs (e.g., I3D, C3D, or SlowFast)** to model spatio-temporal patterns without needing separate feature extraction.
- **Applied self-supervised learning** for unsupervised deepfake feature discovery.
- **Implemented transformer-based architectures (e.g., ViViT, TimeSformer)** for direct sequence modeling.

---

# **Deepfake Detection Dataset Handling Notes**

## **1. Downloading the Dataset**
The dataset consists of **high-resolution video files**, making direct processing difficult. Instead of working with the entire dataset, we:
- **Downloaded a subset** from the original dataset source.
- **Stored it on an external hard drive** to manage storage constraints.
- **Uploaded a smaller working subset to Google Drive** for processing in Google Colab.

---

## **2. Inspecting the Data**
Before using the dataset, we performed an initial inspection to:
- **Verify file formats** (ensure all files are valid `.mp4` videos).
- **Detect corrupted files** (videos that fail to open or have broken frames).
- **Remove mislabeled or unnecessary metadata** to avoid bias.

---

## **3. Cleaning the Dataset**
To **prevent data leakage** and ensure a fair evaluation, we:
- **Removed corrupted videos** that could not be processed.
- **Anonymized filenames** to prevent the model from learning patterns from names instead of content.
- **Shuffled the dataset** to avoid structured order bias in training.

---

## **4. Extracting Frames from Videos**
Since raw videos cannot be directly processed in most machine learning models, we:
- **Converted each video into a sequence of image frames**.
- **Ensured frames were extracted at equal intervals** (to preserve temporal consistency).
- **Organized extracted frames into structured folders**, maintaining a distinction between real and fake videos.

---

## **5. Standardizing Frame Sequences**
Videos in the dataset varied in length, which would cause inconsistencies in training. To address this:
- We **identified the video with the fewest frames** in our dataset.
- We **trimmed all other videos** to match this sequence length.
- This step ensured **all input sequences were of equal length**, preventing bias in model training.

---

## **6. Uploading to Colab**
To efficiently handle dataset processing, we:
- **Uploaded the selected subset to Google Drive** to enable access from Colab.
- **Mounted Google Drive in Colab** to avoid memory limitations.
- **Structured the dataset in a clean format** (`DFD_REAL` and `DFD_FAKE` folders).

---

## **Next Steps**
With the dataset now fully processed, we are ready to:
1. **Extract Optical Flow features** to detect motion inconsistencies.
2. **Train a CNN (ResNet/EfficientNet) to analyze spatial artifacts in frames**.
3. **Feed CNN features into an LSTM** to learn temporal inconsistencies.
4. **Optimize the model using Bayesian Hyperparameter Tuning**.

This structured pipeline ensures that our deepfake detection approach **focuses on motion inconsistencies**, an area where deepfake models struggle the most.



In [1]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import os
from glob import glob

# Define paths to real and fake video folders in Google Drive
real_videos_path = "/content/drive/My Drive/DFD_REAL"
fake_videos_path = "/content/drive/My Drive/DFD_FAKE"

# Get list of video files in each folder
real_videos = sorted(glob(os.path.join(real_videos_path, "*.mp4")))
fake_videos = sorted(glob(os.path.join(fake_videos_path, "*.mp4")))

# Print counts and sample files
print(f" Found {len(real_videos)} real videos")
print(f" Found {len(fake_videos)} fake videos")
print("Sample real video:", real_videos[:2])
print("Sample fake video:", fake_videos[:2])


 Found 16 real videos
 Found 16 fake videos
Sample real video: ['/content/drive/My Drive/DFD_REAL/01__exit_phone_room.mp4', '/content/drive/My Drive/DFD_REAL/01__hugging_happy.mp4']
Sample fake video: ['/content/drive/My Drive/DFD_FAKE/07_02__outside_talking_pan_laughing__1ZE4HC06.mp4', '/content/drive/My Drive/DFD_FAKE/07_02__outside_talking_pan_laughing__O4SXNLRL.mp4']


In [3]:
import os
import random

# Define paths to real and fake video folders in Google Drive
real_videos_path = "/content/drive/My Drive/DFD_REAL"
fake_videos_path = "/content/drive/My Drive/DFD_FAKE"

# Get list of video files in each folder
real_videos = sorted([f for f in os.listdir(real_videos_path) if f.endswith(".mp4")])
fake_videos = sorted([f for f in os.listdir(fake_videos_path) if f.endswith(".mp4")])

# Shuffle files to remove any order bias
random.shuffle(real_videos)
random.shuffle(fake_videos)

# Rename real videos
for i, filename in enumerate(real_videos):
    old_path = os.path.join(real_videos_path, filename)
    new_filename = f"video_{i:03d}.mp4"
    new_path = os.path.join(real_videos_path, new_filename)
    os.rename(old_path, new_path)

# Rename fake videos
for i, filename in enumerate(fake_videos):
    old_path = os.path.join(fake_videos_path, filename)
    new_filename = f"video_{i + len(real_videos):03d}.mp4"
    new_path = os.path.join(fake_videos_path, new_filename)
    os.rename(old_path, new_path)

print("All filenames have been anonymized.")


All filenames have been anonymized.


In [4]:
import cv2
import os

# Define input and output paths
real_videos_path = "/content/drive/My Drive/DFD_REAL"
fake_videos_path = "/content/drive/My Drive/DFD_FAKE"
output_path = "/content/extracted_frames"

# Ensure the base output path exists
os.makedirs(output_path, exist_ok=True)

# Process both real and fake videos
for category, video_path in [("real", real_videos_path), ("fake", fake_videos_path)]:
    videos = sorted([f for f in os.listdir(video_path) if f.endswith(".mp4")])

    for video_file in videos:
        video_full_path = os.path.join(video_path, video_file)
        video_name = os.path.splitext(video_file)[0]
        frame_output_folder = os.path.join(output_path, category, video_name)

        os.makedirs(frame_output_folder, exist_ok=True)

        cap = cv2.VideoCapture(video_full_path)
        frame_count = 0

        if not cap.isOpened():
            print(f"ERROR: Could not open {video_file}")
            continue

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_filename = os.path.join(frame_output_folder, f"frame_{frame_count:04d}.jpg")
            success = cv2.imwrite(frame_filename, frame)

            if not success:
                print(f"ERROR: Failed to write frame {frame_count} for {video_file}")
                break  # Stop processing if frame saving fails

            frame_count += 1

        cap.release()

        if frame_count == 0:
            print(f"WARNING: No frames extracted from {video_file}")
        else:
            print(f"Extracted {frame_count} frames from {video_file}.")

print("Frame extraction complete.")


Extracted 851 frames from video_000.mp4.
Extracted 1045 frames from video_001.mp4.
Extracted 860 frames from video_002.mp4.
Extracted 1085 frames from video_003.mp4.
Extracted 306 frames from video_004.mp4.
Extracted 800 frames from video_005.mp4.
Extracted 834 frames from video_006.mp4.
Extracted 305 frames from video_007.mp4.
Extracted 560 frames from video_008.mp4.
Extracted 610 frames from video_009.mp4.
Extracted 787 frames from video_010.mp4.
Extracted 626 frames from video_011.mp4.
Extracted 902 frames from video_012.mp4.
Extracted 371 frames from video_013.mp4.
Extracted 965 frames from video_014.mp4.
Extracted 1489 frames from video_015.mp4.
Extracted 679 frames from video_016.mp4.
Extracted 1567 frames from video_017.mp4.
Extracted 560 frames from video_018.mp4.
Extracted 701 frames from video_019.mp4.
Extracted 625 frames from video_020.mp4.
Extracted 552 frames from video_021.mp4.
Extracted 579 frames from video_022.mp4.
Extracted 736 frames from video_023.mp4.
Extracted 57

In [5]:
import os

output_path = "/content/extracted_frames"

for category in ["real", "fake"]:
    category_path = os.path.join(output_path, category)

    if not os.path.exists(category_path):
        print(f"ERROR: Path does not exist: {category_path}")
        continue

    video_folders = sorted(os.listdir(category_path))
    print(f"Checking {category}: Found {len(video_folders)} subfolders.")

    for video in video_folders[:5]:  # Check first 5 video folders
        video_path = os.path.join(category_path, video)
        frames = [f for f in os.listdir(video_path) if f.endswith(".jpg")]

        print(f"  {video}: {len(frames)} frames")

    print("-" * 40)


Checking real: Found 16 subfolders.
  video_000: 851 frames
  video_001: 1045 frames
  video_002: 860 frames
  video_003: 1085 frames
  video_004: 306 frames
----------------------------------------
Checking fake: Found 16 subfolders.
  video_016: 679 frames
  video_017: 1567 frames
  video_018: 560 frames
  video_019: 701 frames
  video_020: 625 frames
----------------------------------------


In [6]:
import os

# Path to extracted frames
output_path = "/content/extracted_frames"

# Find the minimum number of frames across all sequences
min_frames = float("inf")

for category in ["real", "fake"]:
    category_path = os.path.join(output_path, category)
    video_folders = sorted(os.listdir(category_path))

    for video in video_folders:
        video_path = os.path.join(category_path, video)
        frame_count = len([f for f in os.listdir(video_path) if f.endswith(".jpg")])

        if frame_count > 0:
            min_frames = min(min_frames, frame_count)

print(f"Calculated minimum sequence length: {min_frames} frames.")

# Handle case where no valid frames were found
if min_frames == float("inf"):
    print("ERROR: No frames found in dataset. Trimming aborted.")
else:
    print("Proceeding with trimming.")

    # Trim all sequences to match the shortest sequence
    for category in ["real", "fake"]:
        category_path = os.path.join(output_path, category)
        video_folders = sorted(os.listdir(category_path))

        for video in video_folders:
            video_path = os.path.join(category_path, video)
            frames = sorted([f for f in os.listdir(video_path) if f.endswith(".jpg")])

            # Remove excess frames
            for frame in frames[min_frames:]:
                os.remove(os.path.join(video_path, frame))

    print("Frame sequences have been standardized.")


Calculated minimum sequence length: 305 frames.
Proceeding with trimming.
Frame sequences have been standardized.


# ML Pipeline for Deepfake Detection

## Why This Architecture?
Deepfake detection is challenging because synthetic videos often contain subtle artifacts that are difficult to identify in individual frames. However, a key weakness of deepfake generation is its struggle to accurately replicate natural motion over time.

To exploit this weakness, our model will focus on both **spatial inconsistencies (within frames)** and **temporal inconsistencies (across frames)**. This approach ensures we capture unnatural frame transitions, smoothing effects, and interpolation artifacts commonly found in deepfakes.

We will achieve this by leveraging **Optical Flow**, **CNNs for spatial analysis**, **LSTMs for temporal modeling**, and **Bayesian Optimization for hyperparameter tuning**.

## Final ML Pipeline

### Step 1: Extract Optical Flow Features → Capture Motion Anomalies
- Optical Flow measures how pixels move between consecutive frames.
- Real videos exhibit natural, non-uniform motion, while deepfake videos often show overly smooth, algorithmic motion patterns due to interpolation artifacts.
- We compute optical flow vectors and extract motion features to highlight unnatural consistencies between frames.

### Step 2: Feed Optical Flow into CNN → Learn Spatial Deepfake Artifacts
- We use a pretrained **CNN (ResNet or EfficientNet)** to extract spatial features from the optical flow images.
- CNNs can detect face warping, blurring, and inconsistencies in deepfake video frames.
- These spatial features act as input to the LSTM, which will analyze their behavior over time.

### Step 3: Pass CNN Outputs to LSTM → Learn Temporal Motion Patterns
- The **LSTM (Long Short-Term Memory network)** is crucial because it learns how features change across frames.
- Real videos exhibit natural, chaotic motion, whereas deepfakes tend to have more consistent and predictable transitions due to synthetic smoothing.
- The LSTM helps model these temporal dependencies, allowing the network to recognize unnatural motion transitions.

### Step 4: Train Model + Optimize with Bayesian Tuning
- **Bayesian Optimization** is used to tune the CNN & LSTM hyperparameters, ensuring:
  - **Optimal learning rates** (to improve convergence).
  - **Best batch size & sequence length** (to maximize feature extraction).
  - **Regularization parameters** (to prevent overfitting).
- Bayesian methods are preferred over grid search because they reduce computation time while finding the best hyperparameters.

### Step 5: Evaluate on Real vs. Fake Video Sequences
- We test the model using unseen deepfake and real video sequences.
- **Performance metrics** will include:
  - **Accuracy & AUC-ROC** (overall classification performance).
  - **Motion Anomaly Scores** (to measure how unnatural motion patterns contribute to classification).
  - **Feature Visualizations** (heatmaps on optical flow to show deepfake artifacts).
- If the model performs well, we can experiment with other optimizations, such as **Neural Architecture Search (NAS)** or **Transformer-based temporal modeling**.


In [9]:
import os
import shutil
import random

# Paths
base_path = "/content/extracted_frames"
train_path = "/content/train"
test_path = "/content/test"
holdout_path = "/content/holdout"

# Ensure directories exist
os.makedirs(train_path, exist_ok=True)
os.makedirs(test_path, exist_ok=True)
os.makedirs(holdout_path, exist_ok=True)

# Categories
categories = ["real", "fake"]

# Step 1: Move one sequence from each category to the holdout set
for category in categories:
    category_path = os.path.join(base_path, category)
    holdout_dest = os.path.join(holdout_path, category)
    os.makedirs(holdout_dest, exist_ok=True)

    # Check if holdout already contains a file
    if os.listdir(holdout_dest):
        print(f"Skipping {category}: Holdout set already contains a video.")
        continue

    video_folders = sorted(os.listdir(category_path))



In [8]:
import os

train_path = "/content/train"
test_path = "/content/test"
holdout_path = "/content/holdout"

for category in ["real", "fake"]:
    train_category_path = os.path.join(train_path, category)
    test_category_path = os.path.join(test_path, category)
    holdout_category_path = os.path.join(holdout_path, category)

    train_videos = os.listdir(train_category_path) if os.path.exists(train_category_path) else []
    test_videos = os.listdir(test_category_path) if os.path.exists(test_category_path) else []
    holdout_videos = os.listdir(holdout_category_path) if os.path.exists(holdout_category_path) else []

    print(f"{category.upper()} - Train: {len(train_videos)} videos, Test: {len(test_videos)} videos, Holdout: {len(holdout_videos)} video(s)")


REAL - Train: 0 videos, Test: 0 videos, Holdout: 0 video(s)
FAKE - Train: 0 videos, Test: 0 videos, Holdout: 0 video(s)


In [10]:
import os
import shutil
import random

# Paths
base_path = "/content/extracted_frames"
train_path = "/content/train"
test_path = "/content/test"
holdout_path = "/content/holdout"

# Ensure directories exist
os.makedirs(train_path, exist_ok=True)
os.makedirs(test_path, exist_ok=True)
os.makedirs(holdout_path, exist_ok=True)

# Categories
categories = ["real", "fake"]

# Step 1: Move one sequence from each category to the holdout set
for category in categories:
    category_path = os.path.join(base_path, category)
    holdout_dest = os.path.join(holdout_path, category)
    os.makedirs(holdout_dest, exist_ok=True)

    video_folders = sorted(os.listdir(category_path))

    if not video_folders:
        print(f"WARNING: No videos found in {category_path}. Skipping holdout selection.")
        continue

    holdout_sample = random.choice(video_folders)  # Select one video randomly
    shutil.move(os.path.join(category_path, holdout_sample), holdout_dest)

    print(f"Moved {holdout_sample} to holdout set.")

# Step 2: Split the remaining data into train (80%) and test (20%)
for category in categories:
    category_path = os.path.join(base_path, category)
    train_dest = os.path.join(train_path, category)
    test_dest = os.path.join(test_path, category)

    os.makedirs(train_dest, exist_ok=True)
    os.makedirs(test_dest, exist_ok=True)




Moved video_003 to holdout set.
Moved video_018 to holdout set.


#  Checkpoint: Saving Dataset to Google Drive  

Since Colab runtimes **reset after inactivity** or if the session crashes, I am saving a **checkpoint** to Google Drive. This ensures i'm don’t lose:  

- **Extracted and standardized frame sequences** (`/content/train`, `/content/test`, `/content/holdout`)  
- **Preprocessed data for Optical Flow computation**  

This saves time and prevents me from having to redo all preprocessing if the runtime disconnects.  

###  Restore Later  
If the session resets, **restore the dataset** by running:  

```python
import shutil

# Restore from Google Drive
restore_path = "/content/drive/My Drive/DFD_checkpoint.zip"
shutil.unpack_archive(restore_path, "/content")

print("Checkpoint restored successfully.")



In [14]:
import shutil
import os

# Define paths
dataset_folders = ["/content/train", "/content/test", "/content/holdout"]
zip_path = "/content/DFD_checkpoint.zip"
save_path = "/content/drive/My Drive/DFD_checkpoint.zip"

# Ensure only the dataset folders are zipped
shutil.make_archive("/content/DFD_checkpoint", 'zip', root_dir="/content", base_dir="train")
shutil.make_archive("/content/DFD_checkpoint", 'zip', root_dir="/content", base_dir="test")
shutil.make_archive("/content/DFD_checkpoint", 'zip', root_dir="/content", base_dir="holdout")

# Move zip file to Google Drive
shutil.move(zip_path, save_path)

print(f"Checkpoint saved to Google Drive at: {save_path}")


Checkpoint saved to Google Drive at: /content/drive/My Drive/DFD_checkpoint.zip


In [None]:
import os
import cv2
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class OpticalFlowDataset(Dataset):
    def __init__(self, data_path, sequence_length=305, transform=None):
        """
        PyTorch dataset for loading video sequences and computing Optical Flow dynamically.

        Args:
        - data_path (str): Path to the dataset (train/test folder)
        - sequence_length (int): Number of frames per sequence
        - transform: Torchvision transformations (optional)
        """
        self.data_path = data_path
        self.sequence_length = sequence_length
        self.transform = transform
        self.video_folders = sorted(os.listdir(data_path))  # Ensure correct order

    def compute_optical_flow(self, prev_frame, next_frame):
        """
        Compute Optical Flow using Farneback method.
        Returns a grayscale magnitude map highlighting motion inconsistencies.
        """
        prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
        next_gray = cv2.cvtColor(next_frame, cv2.COLOR_BGR2GRAY)

        flow = cv2.calcOpticalFlowFarneback(prev_gray, next_gray, None,
                                            0.5, 3, 15, 3, 5, 1.2, 0)

        mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        mag = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
        return mag.astype(np.uint8)

    def __len__(self):
        return len(self.video_folders)

    def __getitem__(self, idx):
        """
        Load a sequence of frames, compute Optical Flow maps, and return a processed tensor.
        """
        video_folder = self.video_folders[idx]
        video_path = os.path.join(self.data_path, video_folder)

        # Get frame file names in correct order
        frame_files = sorted([f for f in os.listdir(video_path) if f.endswith(".jpg")])

        # Ensure the sequence length is correct
        if len(frame_files) < self.sequence_length:
            raise ValueError(f"Video {video_folder} has fewer frames ({len(frame_files)}) than expected {self.sequence_length}")

        # Load and process Optical Flow between consecutive frames
        optical_flow_sequence = []
        for i in range(self.sequence_length - 1):  # -1 because we're computing flow between pairs
            frame1 = cv2.imread(os.path.join(video_path, frame_files[i]))
            frame2 = cv2.imread(os.path.join(video_path, frame_files[i + 1]))

            flow_map = self.compute_optical_flow(frame1, frame2)

            if self.transform:
                flow_map = self.transform(flow_map)

            optical_flow_sequence.append(flow_map)

        # Convert to Torch tensor (shape: [sequence_length-1, H, W])
        optical_flow_sequence = np.array(optical_flow_sequence)
        optical_flow_sequence = torch.tensor(optical_flow_sequence, dtype=torch.float32).unsqueeze(1)  # Add channel dimension

        return optical_flow_sequence

# Example Usage:
train_dataset = OpticalFlowDataset(data_path="/content/train/real", sequence_length=305)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)

# Verify batch shape
for batch in train_loader:
    print("Batch shape:", batch.shape)  # Expected shape: [batch_size, sequence_length-1, 1, H, W]
    break


ok ran out of time......