In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [3]:
!pip install facenet-pytorch timm albumentations --quiet


In [4]:
!pip install --upgrade --force-reinstall Pillow


Collecting Pillow
  Using cached pillow-11.2.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (8.9 kB)
Using cached pillow-11.2.1-cp311-cp311-manylinux_2_28_x86_64.whl (4.6 MB)
Installing collected packages: Pillow
  Attempting uninstall: Pillow
    Found existing installation: pillow 10.2.0
    Uninstalling pillow-10.2.0:
      Successfully uninstalled pillow-10.2.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
facenet-pytorch 2.6.0 requires Pillow<10.3.0,>=10.2.0, but you have pillow 11.2.1 which is incompatible.
dopamine-rl 4.1.2 requires gymnasium>=1.0.0, but you have gymnasium 0.29.0 which is incompatible.
bigframes 1.36.0 requires rich<14,>=12.4.4, but you have rich 14.0.0 which is incompatible.
plotnine 0.14.5 requires matplotlib>=3.8.0, but you have matplotlib 3.7.5 which is incompatible.
mlxtend 0.23.4 requires scikit-learn>=1.3.1, but you have 

In [5]:
import os
import cv2
import numpy as np
import torch
import timm
import torch.nn as nn
from tqdm import tqdm
from PIL import Image
from facenet_pytorch import MTCNN
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import classification_report, roc_auc_score


In [6]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)


Using device: cpu


In [7]:
REAL_PATH = "/kaggle/input/faceforensics/FF++/real"
FAKE_PATH = "/kaggle/input/faceforensics/FF++/fake"
NUM_FRAMES_PER_VIDEO = 5


In [8]:
mtcnn = MTCNN(image_size=224, margin=0, device=device)

def extract_faces(video_dir, label, num_frames=5):
    face_list = []
    label_list = []
    
    for video_file in tqdm(os.listdir(video_dir), desc=f"Processing {label} videos"):
        video_path = os.path.join(video_dir, video_file)
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_idxs = np.linspace(0, total_frames-1, num_frames, dtype=int)

        for idx in frame_idxs:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            success, frame = cap.read()
            if not success:
                continue

            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            face = mtcnn(rgb)

            if face is not None:
                face_list.append(face)
                label_list.append(label)
        cap.release()

    return face_list, label_list


In [9]:
real_faces, real_labels = extract_faces(REAL_PATH, 0, NUM_FRAMES_PER_VIDEO)
fake_faces, fake_labels = extract_faces(FAKE_PATH, 1, NUM_FRAMES_PER_VIDEO)

X_faces = real_faces + fake_faces
y_labels = real_labels + fake_labels


Processing 0 videos: 100%|██████████| 200/200 [09:50<00:00,  2.95s/it]
Processing 1 videos: 100%|██████████| 200/200 [09:43<00:00,  2.92s/it]


In [10]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from tqdm import tqdm

In [11]:
def prepare_rgb(face_tensor):
    return face_tensor  # already in [3, 224, 224], normalized to [0, 1]

def prepare_fft(image_tensor):
    image_np = image_tensor.numpy().transpose(1, 2, 0)
    gray = cv2.cvtColor((image_np * 255).astype(np.uint8), cv2.COLOR_RGB2GRAY)
    f = np.fft.fft2(gray)
    fshift = np.fft.fftshift(f)
    magnitude_spectrum = 20 * np.log(np.abs(fshift) + 1)
    magnitude_spectrum = cv2.resize(magnitude_spectrum, (image_np.shape[1], image_np.shape[0]))
    magnitude_spectrum = (magnitude_spectrum - magnitude_spectrum.min()) / (magnitude_spectrum.max() - magnitude_spectrum.min() + 1e-8)
    magnitude_tensor = torch.tensor(magnitude_spectrum, dtype=torch.float32).unsqueeze(0)
    return magnitude_tensor

def prepare_motion(current_tensor, previous_tensor):
    motion = current_tensor - previous_tensor
    motion = torch.abs(motion)
    return motion

In [12]:
from torch.utils.data import Dataset

class MultiStreamDeepfakeDataset(Dataset):
    def __init__(self, rgb_faces, fft_images, motion_images, labels, transform=None):
        self.rgb_faces = rgb_faces
        self.fft_images = fft_images
        self.motion_images = motion_images
        self.labels = labels
        self.transform = transform  # Optional

    def __len__(self):
        return len(self.rgb_faces)

    def __getitem__(self, idx):
        rgb = self.rgb_faces[idx]
        fft = self.fft_images[idx]
        motion = self.motion_images[idx]
        label = self.labels[idx]

        # Apply transforms if needed
        if self.transform:
            rgb = self.transform(rgb)
            fft = self.transform(fft)
            motion = self.transform(motion)

        return rgb, fft, motion, torch.tensor(label, dtype=torch.float32)



In [13]:
import torch.nn.functional as F

X_faces_tensor = [
    F.interpolate(face.float().unsqueeze(0), size=(224, 224), mode='bilinear', align_corners=False).squeeze(0)
    for face in X_faces
]



In [14]:
#dont use this code
class CBAM(nn.Module):
    def __init__(self, channels, reduction_ratio=16, kernel_size=7):
        super(CBAM, self).__init__()
        self.channel_attention = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, channels // reduction_ratio, 1, bias=False),
            nn.ReLU(),
            nn.Conv2d(channels // reduction_ratio, channels, 1, bias=False),
            nn.Sigmoid()
        )
        self.spatial_attention = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size=kernel_size, padding=kernel_size // 2, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        ca = self.channel_attention(x)
        x = x * ca
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        sa = self.spatial_attention(torch.cat([avg_out, max_out], dim=1))
        x = x * sa
        return x

In [15]:
class MultiStreamEfficientNet(nn.Module):
    def __init__(self):
        super(MultiStreamEfficientNet, self).__init__()
        self.rgb_model = models.efficientnet_b0(pretrained=True).features
        self.fft_model = models.efficientnet_b0(pretrained=True).features
        self.motion_model = models.efficientnet_b0(pretrained=True).features

        self.rgb_attention = CBAM(1280)
        self.fft_attention = CBAM(1280)
        self.motion_attention = CBAM(1280)

        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=1280, nhead=8, batch_first=True),
            num_layers=2
        )

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(1280 * 3, 1),
            nn.Sigmoid()
        )

    def forward(self, rgb, fft, motion):
        rgb_feat = self.rgb_model(rgb)
        fft_feat = self.fft_model(fft)
        motion_feat = self.motion_model(motion)

        rgb_feat = self.rgb_attention(rgb_feat)
        fft_feat = self.fft_attention(fft_feat)
        motion_feat = self.motion_attention(motion_feat)

        b, c, h, w = rgb_feat.size()
        rgb_feat = rgb_feat.view(b, c, -1).permute(0, 2, 1)
        fft_feat = fft_feat.view(b, c, -1).permute(0, 2, 1)
        motion_feat = motion_feat.view(b, c, -1).permute(0, 2, 1)

        combined = torch.cat([rgb_feat, fft_feat, motion_feat], dim=1)
        fused = self.transformer(combined)
        fused = fused.permute(0, 2, 1).view(b, 1280 * 3, h, w)

        output = self.classifier(fused)
        return output.squeeze(1)

In [16]:
def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for batch in tqdm(dataloader, desc="Training"):
        rgb = batch['rgb'].to(device)
        fft = batch['fft'].to(device)
        motion = batch['motion'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        outputs = model(rgb, fft, motion)
        loss = criterion(outputs.unsqueeze(1), labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    return running_loss / len(dataloader)

def validate(model, dataloader, device):
    model.eval()
    preds, targets = [], []
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Validation"):
            rgb = batch['rgb'].to(device)
            fft = batch['fft'].to(device)
            motion = batch['motion'].to(device)
            labels = batch['label'].to(device)

            outputs = model(rgb, fft, motion)
            preds.extend(outputs.cpu().numpy().squeeze().tolist())
            targets.extend(labels.cpu().numpy().squeeze().tolist())

    preds = np.array(preds) > 0.5
    accuracy = accuracy_score(targets, preds)
    precision = precision_score(targets, preds)
    recall = recall_score(targets, preds)
    f1 = f1_score(targets, preds)
    auc = roc_auc_score(targets, preds)
    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1, "auc": auc}

In [17]:
# Utility functions
def prepare_fft(tensor):
    gray = tensor.mean(dim=0, keepdim=True)
    fft = torch.fft.fft2(gray)
    fft_mag = torch.abs(fft)
    fft_norm = (fft_mag - fft_mag.min()) / (fft_mag.max() - fft_mag.min() + 1e-8)
    fft_img = fft_norm.expand(3, -1, -1)
    return fft_img

def prepare_motion(current_tensor, prev_tensor):
    return torch.abs(current_tensor - prev_tensor)

# Input: X_faces_tensor (list of torch tensors) and y_labels (list of 0/1)
fft_tensors = [prepare_fft(face) for face in X_faces_tensor]

motion_tensors = [
    prepare_motion(X_faces_tensor[i], X_faces_tensor[i-1]) if i > 0 else torch.zeros_like(X_faces_tensor[i])
    for i in range(len(X_faces_tensor))
]

# Create dataset
dataset = MultiStreamDeepfakeDataset(X_faces_tensor, fft_tensors, motion_tensors, y_labels)


In [18]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

train_set, val_set = train_test_split(dataset, test_size=0.2, random_state=42)

train_loader = DataLoader(train_set, batch_size=16, shuffle=True)
val_loader = DataLoader(val_set, batch_size=16, shuffle=False)


In [19]:
import torch
import torch.nn as nn
from torchvision import models

# CBAM Attention Block (simplified)
class CBAM(nn.Module):
    def __init__(self, channels, reduction=16):
        super(CBAM, self).__init__()
        self.channel_attention = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, channels // reduction, 1),
            nn.ReLU(),
            nn.Conv2d(channels // reduction, channels, 1),
            nn.Sigmoid()
        )
        self.spatial_attention = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size=7, padding=3),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Channel attention
        ca = self.channel_attention(x)
        x = x * ca

        # Spatial attention
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        sa = torch.cat([avg_out, max_out], dim=1)
        sa = self.spatial_attention(sa)
        x = x * sa
        return x

# Stream module (EfficientNet + CBAM)
class StreamNet(nn.Module):
    def __init__(self):
        super(StreamNet, self).__init__()
        base_model = models.efficientnet_b0(pretrained=True)
        self.features = base_model.features
        self.cbam = CBAM(1280)
        self.pool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        x = self.features(x)
        x = self.cbam(x)
        x = self.pool(x).view(x.size(0), -1)  # Flatten
        return x

# MultiStream Model
class MultiStreamModel(nn.Module):
    def __init__(self):
        super(MultiStreamModel, self).__init__()
        self.rgb_stream = StreamNet()
        self.fft_stream = StreamNet()
        self.motion_stream = StreamNet()

        self.classifier = nn.Sequential(
            nn.Linear(1280 * 3, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 1),
            nn.Sigmoid()
        )

    def forward(self, rgb, fft, motion):
        rgb_feat = self.rgb_stream(rgb)
        fft_feat = self.fft_stream(fft)
        motion_feat = self.motion_stream(motion)

        combined = torch.cat([rgb_feat, fft_feat, motion_feat], dim=1)
        return self.classifier(combined)


In [20]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MultiStreamModel().to(device)

criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)




In [None]:
!pip install tqdm


In [None]:
from tqdm import tqdm


In [21]:
from tqdm import tqdm

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    print(f"\nEpoch [{epoch+1}/{num_epochs}]")
    train_loader_tqdm = tqdm(train_loader, desc="Training", leave=False)

    for rgb, fft, motion, labels in train_loader_tqdm:
        rgb, fft, motion, labels = rgb.to(device), fft.to(device), motion.to(device), labels.to(device).float()

        optimizer.zero_grad()
        outputs = model(rgb, fft, motion).squeeze()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        train_loader_tqdm.set_postfix(loss=loss.item())

    model.eval()
    val_loss = 0.0
    correct, total = 0, 0

    val_loader_tqdm = tqdm(val_loader, desc="Validating", leave=False)

    with torch.no_grad():
        for rgb, fft, motion, labels in val_loader_tqdm:
            rgb, fft, motion, labels = rgb.to(device), fft.to(device), motion.to(device), labels.to(device).float()

            outputs = model(rgb, fft, motion).squeeze()
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            preds = (outputs > 0.5).long()
            correct += (preds == labels.long()).sum().item()
            total += labels.size(0)

    acc = correct / total
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {running_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {acc:.4f}")



Epoch [1/10]


                                                                     

Epoch [1/10], Train Loss: 53.7125, Val Loss: 9.2838, Val Acc: 0.8141

Epoch [2/10]


                                                                      

Epoch [2/10], Train Loss: 24.8465, Val Loss: 7.5348, Val Acc: 0.8639

Epoch [3/10]


                                                                      

Epoch [3/10], Train Loss: 14.2963, Val Loss: 6.6434, Val Acc: 0.8796

Epoch [4/10]


                                                                       

Epoch [4/10], Train Loss: 7.1388, Val Loss: 6.5018, Val Acc: 0.9005

Epoch [5/10]


                                                                       

Epoch [5/10], Train Loss: 6.4973, Val Loss: 7.4225, Val Acc: 0.9058

Epoch [6/10]


                                                                       

Epoch [6/10], Train Loss: 5.0614, Val Loss: 9.2778, Val Acc: 0.8796

Epoch [7/10]


                                                                       

Epoch [7/10], Train Loss: 4.7427, Val Loss: 9.0656, Val Acc: 0.8743

Epoch [8/10]


                                                                       

Epoch [8/10], Train Loss: 3.5539, Val Loss: 9.1444, Val Acc: 0.8822

Epoch [9/10]


                                                                        

Epoch [9/10], Train Loss: 3.1153, Val Loss: 9.7535, Val Acc: 0.8822

Epoch [10/10]


                                                                        

Epoch [10/10], Train Loss: 3.3978, Val Loss: 9.6538, Val Acc: 0.8953




In [None]:
from sklearn.metrics import classification_report

all_preds, all_labels = [], []

model.eval()
with torch.no_grad():
    for rgb, fft, motion, labels in val_loader:
        rgb, fft, motion = rgb.to(device), fft.to(device), motion.to(device)
        outputs = model(rgb, fft, motion).squeeze().cpu()
        preds = (outputs > 0.5).long()

        all_preds.extend(preds.tolist())
        all_labels.extend(labels.tolist())

print(classification_report(all_labels, all_preds, target_names=["Real", "Fake"]))


In [None]:
print(type(X_faces), len(X_faces))
print(type(y_labels), len(y_labels))
print(type(X_faces[0]), X_faces[0].shape if hasattr(X_faces[0], "shape") else type(X_faces[0]))
print(type(y_labels[0]), y_labels[0])


In [22]:
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score

all_preds, all_labels = [], []

model.eval()
with torch.no_grad():
    for rgb, fft, motion, labels in val_loader:
        rgb, fft, motion = rgb.to(device), fft.to(device), motion.to(device)
        labels = labels.to(device).long()

        outputs = model(rgb, fft, motion).squeeze().cpu()
        preds = (outputs > 0.5).long()

        all_preds.extend(preds.tolist())
        all_labels.extend(labels.cpu().tolist())

# Classification report
print("\nClassification Report:")
print(classification_report(all_labels, all_preds, target_names=["Real", "Fake"]))

# Confusion matrix
cm = confusion_matrix(all_labels, all_preds)
print("Confusion Matrix:\n", cm)

# AUC-ROC Score
prob_outputs = [float(o) for o in outputs]  # Last batch only, optional improvement below
roc_auc = roc_auc_score(all_labels, all_preds)
print("ROC AUC Score:", roc_auc)



Classification Report:
              precision    recall  f1-score   support

        Real       0.93      0.85      0.89       188
        Fake       0.87      0.94      0.90       194

    accuracy                           0.90       382
   macro avg       0.90      0.89      0.89       382
weighted avg       0.90      0.90      0.90       382

Confusion Matrix:
 [[160  28]
 [ 12 182]]
ROC AUC Score: 0.8946040798420706


In [23]:
torch.save(model, '/kaggle/working/deepfake_detector1.pth')


In [None]:
from IPython.display import FileLink

FileLink('/kaggle/working/deepfake_detector_multistream_full.pth')


In [None]:
!zip -j /kaggle/working/deepfake_model.zip /kaggle/working/deepfake_detector_multistream_full.pth


In [None]:
import os

print(os.path.exists('/kaggle/working/deepfake_detector_multistream_full.pth'))


In [None]:
!zip -j /kaggle/working/deepfake_model.zip /kaggle/working/deepfake_detector_multistream_full.pth


In [None]:
print(os.path.exists('/kaggle/working/deepfake_model.zip'))


In [None]:
from IPython.display import FileLink
FileLink(r'/kaggle/working/deepfake_model.zip')


In [None]:
import cv2
import torch
import numpy as np
from facenet_pytorch import MTCNN
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt

# Define the video prediction function
def predict_video(video_path, model_path, device='cpu'):
    # Load the saved model
    model = torch.load(model_path, map_location=device)  # Ensure model is loaded on CPU
    model.eval()
    model.to(device)

    # Initialize MTCNN for face detection
    mtcnn = MTCNN(keep_all=True, device=device)

    # Open the video file
    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define codec and create VideoWriter object to save output video
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    output_video_path = '/kaggle/working/predicted_video.avi'
    out = cv2.VideoWriter(output_video_path, fourcc, 30, (frame_width, frame_height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Convert to RGB for MTCNN
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        pil_image = Image.fromarray(rgb_frame)

        # Detect faces in the frame
        boxes, probs = mtcnn.detect(pil_image)

        if boxes is not None:
            for box in boxes:
                # Crop the face from the frame
                x1, y1, x2, y2 = [int(i) for i in box]
                face = frame[y1:y2, x1:x2]

                # Preprocess the face for model input
                preprocess = transforms.Compose([
                    transforms.ToPILImage(),
                    transforms.Resize((224, 224)),
                    transforms.ToTensor(),
                    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                ])
                face_tensor = preprocess(face).unsqueeze(0).to(device)

                # Create dummy inputs for fft and motion (you can refine this later if needed)
                fft_tensor = torch.zeros_like(face_tensor).to(device)  # Placeholder tensor
                motion_tensor = torch.zeros_like(face_tensor).to(device)  # Placeholder tensor

                # Make prediction
                with torch.no_grad():
                    output = model(face_tensor, fft_tensor, motion_tensor)
                    prediction = torch.sigmoid(output).item()

                # Determine if it's real or fake
                label = "Real" if prediction < 0.5 else "Fake"
                confidence = prediction * 100 if label == "Fake" else (1 - prediction) * 100

                # Draw bounding box and label on the frame
                color = (0, 255, 0) if label == "Real" else (0, 0, 255)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(frame, f'{label} ({confidence:.2f}%)', (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

        # Write the frame with predictions to the output video
        out.write(frame)

        # Display the frame
        cv2.imshow('Video Prediction', frame)

        # Press 'q' to quit the display window
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release resources
    cap.release()
    out.release()
    cv2.destroyAllWindows()

    print(f"Prediction video saved at {output_video_path}")
    return output_video_path

# Example usage
video_path = '/kaggle/input/real-videos/videos/video3.mp4'  # replace with your video path
model_path = '/kaggle/working/deepfake_detector_multistream_full.pth'

predicted_video_path = predict_video(video_path, model_path, device='cpu')
