<a href="https://colab.research.google.com/github/RosalynYeldo/ADG/blob/main/deepfake_analysis_me.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import cv2
import kagglehub

# Download latest version
path = kagglehub.dataset_download("xdxd003/ff-c23")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/ff-c23


In [None]:
import os

# List all files
for root, dirs, files in os.walk(path):
    for file in files:
            print(os.path.join(root, file))


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/02_09__kitchen_pan__9TDCEK1Q.mp4
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/02_13__exit_phone_room__CP5HFV3K.mp4
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/24_10__walking_down_street_outside_angry__356HPS7R.mp4
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/09_14__talking_angry_couch__6TEK3ZX0.mp4
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/20_03__hugging_happy__Z6V05FXO.mp4
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/03_14__talking_against_wall__ZC2KYASW.mp4
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/03_15__outside_talking_pan_laughing__Y11NT1YX.mp4
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/26_12__walking_down_street_outside_angry__OVOQACG8.mp4
/kaggle/input/ff-c23/FaceForensics++_C23/DeepFakeDetection/15_06__exit_phone_room__E98QYYXO.mp4
/kaggle/input/ff-c2

In [None]:
def extract_frames_per_second(video_path, output_folder,fps_skip=2):
      os.makedirs(output_folder, exist_ok=True) #checks if output folder already exists.
      cap = cv2.VideoCapture(video_path) #cap is video capture object
      if not cap.isOpened(): #checks if video is opened correctly
             print(f"Error: Cannot open video file: {video_path}")
             return
      fps = cap.get(cv2.CAP_PROP_FPS) #gets the fps in decimal
      frame_interval = int(fps*fps_skip)

      frame_idx = 0
      saved_count = 0
      while True:
        ret, frame = cap.read() #frame stores the frame img (Numpy array)
        if not ret:
          break
        if frame_idx % frame_interval == 0:
          resized_frame = cv2.resize(frame, (640, 360))
          frame_filename = os.path.join(output_folder, f"frame_{saved_count:04d}.jpg")
          cv2.imwrite(frame_filename, resized_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 85]) #saves frame files to collasb disk temp
          saved_count += 1
        frame_idx +=1

      cap.release()
      print(f"Done: Saved {saved_count} frames to '{output_folder}'.")



def process_all_videos(base_path):
  for root, dirs, files in os.walk(base_path):
     for file in files:
        if file.lower().endswith(('.mp4', '.avi', '.mov')):
          video_path = os.path.join(root, file)

          # Use relative path to create output folder
          video_name = os.path.splitext(file)[0]
          output_folder = f"/content/frames/{video_name}"
          extract_frames_per_second(video_path, output_folder)

process_all_videos("/root/.cache/kagglehub/datasets/xdxd003/ff-c23/versions/1")

Done: Saved 8 frames to '/content/frames/788_710'.
Done: Saved 15 frames to '/content/frames/815_730'.
Done: Saved 10 frames to '/content/frames/274_412'.
Done: Saved 7 frames to '/content/frames/081_087'.
Done: Saved 14 frames to '/content/frames/808_829'.
Done: Saved 8 frames to '/content/frames/448_361'.
Done: Saved 12 frames to '/content/frames/405_393'.
Done: Saved 8 frames to '/content/frames/835_651'.
Done: Saved 6 frames to '/content/frames/759_755'.
Done: Saved 7 frames to '/content/frames/028_068'.
Done: Saved 7 frames to '/content/frames/424_408'.
Done: Saved 8 frames to '/content/frames/353_383'.
Done: Saved 7 frames to '/content/frames/647_622'.
Done: Saved 17 frames to '/content/frames/983_113'.
Done: Saved 6 frames to '/content/frames/016_209'.
Done: Saved 11 frames to '/content/frames/775_742'.
Done: Saved 5 frames to '/content/frames/193_030'.
Done: Saved 11 frames to '/content/frames/977_075'.
Done: Saved 7 frames to '/content/frames/168_222'.
Done: Saved 12 frames to

KeyboardInterrupt: 

In [None]:
!pip install mtcnn
!pip install facenet-pytorch



In [None]:
import os
import cv2
from tqdm import tqdm
import torch
from facenet_pytorch import MTCNN
from concurrent.futures import ThreadPoolExecutor

device = 'cuda' if torch.cuda.is_available() else 'cpu'
detector = MTCNN(keep_all=False, device=device)  # keep_all=False = only one face per image

input_folder = "/content/frames"
output_folder = "/content/faces"
os.makedirs(output_folder, exist_ok=True)

# Helper function to process one image
def process_image(frame_path, rel_root):
  try:
    image = cv2.imread(frame_path)
    if image is None:
      return
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    face = detector(image_rgb)

    if face is not None:
      face = face.permute(1, 2, 0).int().cpu().numpy()
      save_dir = os.path.join(output_folder, rel_root)
      os.makedirs(save_dir, exist_ok=True)
      base_name = os.path.splitext(os.path.basename(frame_path))[0]
      save_path = os.path.join(save_dir, f"{base_name}_face.jpg")
      cv2.imwrite(save_path, face)

  except Exception as e:
    print(f"Error processing {frame_path}: {e}")

all_images = []
for root, _, files in os.walk(input_folder):
  rel_root = os.path.relpath(root, input_folder)
  for file in files:
    if file.endswith(('.jpg', '.png')):
      full_path = os.path.join(root, file)
      all_images.append((full_path, rel_root))

with ThreadPoolExecutor(max_workers=4) as executor:
      list(tqdm(executor.map(lambda args: process_image(*args), all_images), total=len(all_images)))


100%|██████████| 12461/12461 [32:37<00:00,  6.37it/s]


In [None]:
import cv2
import os
import concurrent.futures

# CLAHE enhancer
def apply_clahe(face_crop):
    gray = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    enhanced = clahe.apply(gray)
    return cv2.cvtColor(enhanced, cv2.COLOR_GRAY2BGR)

def enhance_and_save(img_file, input_path, output_path):
  img_path = os.path.join(input_path, img_file)
  img = cv2.imread(img_path)
  if img is not None:
    enhanced = apply_clahe(img)
    out_img_path = os.path.join(output_path, img_file)
    cv2.imwrite(out_img_path, enhanced)

# Paths
input_root = "/content/faces"
output_root = "/content/enhanced_faces"

os.makedirs(output_root, exist_ok=True)

# Loop through each video folder
for video_folder in os.listdir(input_root):
  input_path = os.path.join(input_root, video_folder)
  output_path = os.path.join(output_root, video_folder)
  os.makedirs(output_path, exist_ok=True)

  img_files = os.listdir(input_path)import os                                                                                                                                                                                                                                                                        val_acc = evaluate(model, test_loader)
                                                                                                                                                                                                                                                                                                                    print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Test Acc = {val_acc:.4f}")
  with concurrent.futures.ThreadPoolExecutor() as executor:
      executor.map(enhance_and_save, img_files, [input_path] * len(img_files), [output_path] * len(img_files))
print("CLAHE applied to all face crops.")


CLAHE applied to all face crops.


In [None]:
#checking the output of preprocessing on some examples
import os
import random
import cv2
import matplotlib.pyplot as plt

video_frame_root = "/content/frames"
enhanced_face_root = "/content/enhanced_faces"

# Get all video files
all_videos = []
for root, dirs, files in os.walk(path):
  for file in files:
    if file.endswith('.mp4'):
      all_videos.append(os.path.join(root, file))
# Select 2 sample videos
sample_videos = random.sample(all_videos, 3)

print("Sample videos:", sample_videos)




Sample videos: ['/kaggle/input/ff-c23/FaceForensics++_C23/original/964.mp4', '/kaggle/input/ff-c23/FaceForensics++_C23/FaceSwap/323_302.mp4', '/kaggle/input/ff-c23/FaceForensics++_C23/FaceShifter/106_198.mp4']


In [None]:
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
from torchvision import models
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
class DeepfakeCNN(nn.Module):fbv v
  def __init__(self):
    super(DeepfakeCNN, self).init()
    base_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
    num_ftrs = base_model.fc.in_features
    base_model.fc = nn.Linear(num_ftrs, 2)  # 2 classes: real & fake
    self.model = base_model
  def forward(self, x):
          return self.model(x)

train_transform = transforms.Compose([
      transforms.Resize((224, 224)),
      transforms.RandomHorizontalFlip(),
      transforms.ToTensor(),
      transforms.Normalize([0.485, 0.456, 0.406],
      [0.229, 0.224, 0.225])
])
test_transform = transforms.Compose([
      transforms.Resize((224, 224)),
      transforms.ToTensor(),
      transforms.Normalize([0.485, 0.456, 0.406],
      [0.229, 0.224, 0.225])

])
train_dataset = datasets.ImageFolder("/content/enhanced_faces/train", transform=train_transform)
test_dataset = datasets.ImageFolder("/content/enhanced_faces/test", transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DeepfakeCNN().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
def train(model, loader):
  model.train()
  running_loss = 0.0
  for images, labels in loader:
    images, labels = images.to(device), labels.to(device)
    optimizer.zero_grad()
    outputs = model(images)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    running_loss += loss.item() * images.size(0)
  return running_loss / len(loader.dataset)
def evaluate(model, loader):
  model.eval()
  preds, targets = [], []
  with torch.no_grad():
    for images, labels in loader:
      images, labels = images.to(device), labels.to(device)
      outputs = model(images)
      _, predicted = torch.max(outputs, 1)
      preds.extend(predicted.cpu().numpy())
      targets.extend(labels.cpu().numpy())
  return accuracy_score(targets, preds)
num_epochs = 3
train_loss = train(model, train_loader)
val_acc = evaluate(model, test_loader)
print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Test Acc = {val_acc:.4f}")
torch.save(model.state_dict(), "deepfake_cnn.pth")









FileNotFoundError: [Errno 2] No such file or directory: '/content/enhanced_faces/train'