## Resnext Model

# Resnext & Xception Ensemble (Inference)

- This kernel outputs the ensemble of the results from https://www.kaggle.com/khoongweihao/frames-per-video-viz and https://www.kaggle.com/greatgamedota/xception-binary-classifier-inference (not original, modified learning rate and epochs)
- Frames per video at 64 (best found)

In [None]:
import os, sys, time
import cv2
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F

%matplotlib inline
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

In [None]:
test_dir = "/kaggle/input/deepfake-detection-challenge/test_videos/"

test_videos = sorted([x for x in os.listdir(test_dir) if x[-4:] == ".mp4"])
frame_h = 5
frame_l = 5
len(test_videos)

In [None]:
print("PyTorch version:", torch.__version__)
print("CUDA version:", torch.version.cuda)
print("cuDNN version:", torch.backends.cudnn.version())

PyTorch version: 1.3.0
CUDA version: 10.0.130
cuDNN version: 7603


In [None]:
gpu = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
gpu

device(type='cuda', index=0)

In [None]:
import sys
sys.path.insert(0, "/content/blazeface-pytorch")
sys.path.insert(0, "/content/deepfakes-inference-demo")

In [None]:
from blazeface import BlazeFace
facedet = BlazeFace().to(gpu)
facedet.load_weights("/content/blazeface-pytorch/blazeface.pth")
facedet.load_anchors("/content/blazeface-pytorch/anchors.npy")
_ = facedet.train(False)

In [None]:
from helpers.read_video_1 import VideoReader
from helpers.face_extract_1 import FaceExtractor

frames_per_video = 64 #frame_h * frame_l
video_reader = VideoReader()
video_read_fn = lambda x: video_reader.read_frames(x, num_frames=frames_per_video)
face_extractor = FaceExtractor(video_read_fn, facedet)

In [None]:
input_size = 224

In [None]:
from torchvision.transforms import Normalize

mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
normalize_transform = Normalize(mean, std)

In [None]:
def isotropically_resize_image(img, size, resample=cv2.INTER_AREA):
    h, w = img.shape[:2]
    if w > h:
        h = h * size // w
        w = size
    else:
        w = w * size // h
        h = size

    resized = cv2.resize(img, (w, h), interpolation=resample)
    return resized


def make_square_image(img):
    h, w = img.shape[:2]
    size = max(h, w)
    t = 0
    b = size - h
    l = 0
    r = size - w
    return cv2.copyMakeBorder(img, t, b, l, r, cv2.BORDER_CONSTANT, value=0)

In [None]:
import torch.nn as nn
import torchvision.models as models

class MyResNeXt(models.resnet.ResNet):
    def __init__(self, training=True):
        super(MyResNeXt, self).__init__(block=models.resnet.Bottleneck,
                                        layers=[3, 4, 6, 3],
                                        groups=32,
                                        width_per_group=4)
        self.fc = nn.Linear(2048, 1)

In [None]:
def predict_on_video(video_path, batch_size):
    try:
        # Find the faces for N frames in the video.
        faces = face_extractor.process_video(video_path)

        # Only look at one face per frame.
        face_extractor.keep_only_best_face(faces)

        if len(faces) > 0:
            # NOTE: When running on the CPU, the batch size must be fixed
            # or else memory usage will blow up. (Bug in PyTorch?)
            x = np.zeros((batch_size, input_size, input_size, 3), dtype=np.uint8)

            # If we found any faces, prepare them for the model.
            n = 0
            for frame_data in faces:
                for face in frame_data["faces"]:
                    # Resize to the model's required input size.
                    # We keep the aspect ratio intact and add zero
                    # padding if necessary.
                    resized_face = isotropically_resize_image(face, input_size)
                    resized_face = make_square_image(resized_face)

                    if n < batch_size:
                        x[n] = resized_face
                        n += 1
                    else:
                        print("WARNING: have %d faces but batch size is %d" % (n, batch_size))

                    # Test time augmentation: horizontal flips.
                    # TODO: not sure yet if this helps or not
                    #x[n] = cv2.flip(resized_face, 1)
                    #n += 1

            if n > 0:
                x = torch.tensor(x, device=gpu).float()

                # Preprocess the images.
                x = x.permute((0, 3, 1, 2))

                for i in range(len(x)):
                    x[i] = normalize_transform(x[i] / 255.)

                # Make a prediction, then take the average.
                with torch.no_grad():
                    y_pred = model(x)
                    y_pred = torch.sigmoid(y_pred.squeeze())
                    return y_pred[:n].mean().item()

    except Exception as e:
        print("Prediction error on video %s: %s" % (video_path, str(e)))

    return 0.5

In [None]:
checkpoint = torch.load("/content/deepfakes-inference-demo/resnext.pth", map_location=gpu)

model = MyResNeXt().to(gpu)
model.load_state_dict(checkpoint)
_ = model.eval()

del checkpoint

In [None]:
from concurrent.futures import ThreadPoolExecutor

def predict_on_video_set(videos, num_workers):
    def process_file(i):
        filename = videos[i]
        y_pred = predict_on_video(os.path.join(test_dir, filename), batch_size=frames_per_video)
        return y_pred

    with ThreadPoolExecutor(max_workers=num_workers) as ex:
        predictions = ex.map(process_file, range(len(videos)))

    return list(predictions)

In [None]:
import cv2
import torch
from torchvision import transforms

def process_video_frames(video_path, max_frames=64):
    """
    Reads a video file, extracts frames, and processes them into tensors.

    Args:
        video_path (str): Path to the video file.
        max_frames (int): Maximum number of frames to extract from the video.

    Returns:
        torch.Tensor: Tensor containing the processed frames.
    """
    cap = cv2.VideoCapture(video_path)
    frames = []
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((299, 299)),  # Resizing frames for Xception
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    frame_count = 0
    while cap.isOpened() and frame_count < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        frame_tensor = transform(frame)  # Apply transformations
        frames.append(frame_tensor)
        frame_count += 1

    cap.release()

#Stack frames into a tensor (assuming max_frames)
    return torch.stack(frames)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
from blazeface import BlazeFace
#from helpers.read_video_1 import process_video_frames  # Ensure this script is uploaded

# Define UADFV Directory
UADFV_dir = "/kaggle/input/videos"

# Dataset class for UADFV videos
class UADFVVideoDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.video_paths = []
        self.labels = []

        # Load video paths and their corresponding labels
        for label in ['real', 'fake']:
            label_dir = os.path.join(data_dir, label)
            for video_file in os.listdir(label_dir):
                self.video_paths.append(os.path.join(label_dir, video_file))
                self.labels.append(0 if label == 'real' else 1)  # Assign 0 for real, 1 for fake

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        video_frames = process_video_frames(video_path)  # Extract frames from video
        label = self.labels[idx]

        if self.transform:
            video_frames = self.transform(video_frames)

        return video_frames, label


In [None]:
!pip install ../input/deepfake-xception-trained-model/pytorchcv-0.0.55-py2.py3-none-any.whl --quiet

In [None]:
import torch

Scale pixel values from [0, 255] to [0, 1]
def scale_and_normalize(image_tensor):
    if len(image_tensor.shape) == 3:  # Expecting [C, H, W]
        image_tensor = image_tensor / 255.0  # Scale to [0, 1]
        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        return normalize(image_tensor)
    else:
        raise TypeError(f"Expected tensor of shape [C, H, W], but got {image_tensor.shape}")

#Apply directly to video frames
#

In [None]:
# Data transformations (if needed)
"""transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])"""


# Load BlazeFace Model for face detection
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
facedet = BlazeFace().to(device)
facedet.load_weights('/kaggle/input/blazeface-pytorch/blazeface.pth')  # Load BlazeFace weights
facedet.load_anchors('/kaggle/input/blazeface-pytorch/anchors.npy')    # Load BlazeFace anchors
facedet.train(False)

# Load Xception model with your custom implementation
from pytorchcv.model_provider import get_model
class FCN(nn.Module):
    def __init__(self, base, in_f):
        super(FCN, self).__init__()
        self.base = base
        self.h1 = Head(in_f, 1)

    def forward(self, x):
        x = self.base(x)
        return self.h1(x)

# Load Xception model and remove the output layer
xception_base = get_model("xception", pretrained=False)
xception_base = nn.Sequential(*list(xception_base.children())[:-1])

# Replace final block's pooling layer
xception_base[0].final_block.pool = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)))

# Create the full Xception model with a custom head
class Head(nn.Module):
    def __init__(self, in_f, out_f):
        super(Head, self).__init__()
        self.f = nn.Flatten()
        self.l = nn.Linear(in_f, 512)
        self.d = nn.Dropout(0.5)
        self.o = nn.Linear(512, out_f)
        self.b1 = nn.BatchNorm1d(in_f)
        self.b2 = nn.BatchNorm1d(512)
        self.r = nn.ReLU()

    def forward(self, x):
        x = self.f(x)
        x = self.b1(x)
        x = self.d(x)
        x = self.l(x)
        x = self.r(x)
        x = self.b2(x)
        x = self.d(x)
        return self.o(x)

xception_model = FCN(xception_base, 2048)
xception_model = xception_model.to(device)
xception_model.load_state_dict(torch.load('../input/deepfake-xception-trained-model/model.pth'))

# Load ResNeXt model with your custom implementation
import torchvision.models as models
class MyResNeXt(models.resnet.ResNet):
    def __init__(self, training=True):
        super(MyResNeXt, self).__init__(block=models.resnet.Bottleneck,
                                        layers=[3, 4, 6, 3],
                                        groups=32,
                                        width_per_group=4)
        self.fc = nn.Linear(2048, 1)

resnext_model = MyResNeXt().to(device)
resnext_model.load_state_dict(torch.load('/kaggle/input/deepfakes-inference-demo/resnext.pth'))

# Load dataset
test_dataset = UADFVVideoDataset(data_dir=UADFV_dir + '/test', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Inference on test data
def predict_on_test_data():
    xception_model.eval()
    resnext_model.eval()

    for videos, labels in test_loader:
        videos = videos.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            xception_output = xception_model(videos)
            resnext_output = resnext_model(videos)

            # Average the predictions from both models (ensemble)
            final_output = (xception_output + resnext_output) / 2
            prediction = torch.argmax(final_output, dim=1)

            print(f"Prediction: {'Deepfake' if prediction.item() == 1 else 'Real'}")


In [None]:
# Fine-tuning on the training data
train_dataset = UADFVVideoDataset(data_dir=UADFV_dir + '/train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Fine-tuning setup
for param in xception_model.parameters():
    param.requires_grad = False
for param in resnext_model.parameters():
    param.requires_grad = False

# Unfreeze final layers for training
for param in xception_model.h1.parameters():
    param.requires_grad = True
for param in resnext_model.fc.parameters():
    param.requires_grad = True

# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer_xception = optim.Adam(xception_model.h1.parameters(), lr=0.0001)
optimizer_resnext = optim.Adam(resnext_model.fc.parameters(), lr=0.0001)

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    xception_model.train()
    resnext_model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for videos, labels in train_loader:
        videos, labels = videos.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer_xception.zero_grad()
        optimizer_resnext.zero_grad()

        # Forward pass
        xception_output = xception_model(videos)
        resnext_output = resnext_model(videos)

        # Average predictions
        final_output = (xception_output + resnext_output) / 2
        loss = criterion(final_output, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer_xception.step()
        optimizer_resnext.step()

        # Calculate accuracy
        running_loss += loss.item()
        predicted = (torch.sigmoid(final_output) > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")

# Save the fine-tuned models
torch.save(xception_model.state_dict(), 'fine_tuned_xception.pth')
torch.save(resnext_model.state_dict(), 'fine_tuned_resnext.pth')

# Run predictions on test data
predict_on_test_data()

TypeError: tensor is not a torch image.

----------------------------------------------------------------------------------------------------

Kamalna ---------------------


In [None]:
speed_test = False  # you have to enable this manually

In [None]:
if speed_test:
    start_time = time.time()
    speedtest_videos = test_videos[:5]
    predictions = predict_on_video_set(speedtest_videos, num_workers=4)
    elapsed = time.time() - start_time
    print("Elapsed %f sec. Average per video: %f sec." % (elapsed, elapsed / len(speedtest_videos)))

In [None]:
predictions = predict_on_video_set(test_videos[:5], num_workers=4)

In [None]:
submission_df_resnext = pd.DataFrame({"filename": test_videos[:5], "label": predictions})
submission_df_resnext.to_csv("submission_resnext.csv", index=False)

## Xception Net

In [None]:
!pip install ../input/deepfake-xception-trained-model/pytorchcv-0.0.55-py2.py3-none-any.whl --quiet

In [None]:
test_dir = "/kaggle/input/deepfake-detection-challenge/test_videos/"

test_videos = sorted([x for x in os.listdir(test_dir) if x[-4:] == ".mp4"])
len(test_videos)

In [None]:
gpu = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
import sys
sys.path.insert(0, "/kaggle/input/blazeface-pytorch")
sys.path.insert(0, "/kaggle/input/deepfakes-inference-demo")

In [None]:
from blazeface import BlazeFace
facedet = BlazeFace().to(gpu)
facedet.load_weights("/kaggle/input/blazeface-pytorch/blazeface.pth")
facedet.load_anchors("/kaggle/input/blazeface-pytorch/anchors.npy")
_ = facedet.train(False)

In [None]:
from helpers.read_video_1 import VideoReader
from helpers.face_extract_1 import FaceExtractor

frames_per_video = 64 # originally 4

video_reader = VideoReader()
video_read_fn = lambda x: video_reader.read_frames(x, num_frames=frames_per_video)
face_extractor = FaceExtractor(video_read_fn, facedet)

In [None]:
input_size = 150

In [None]:
from torchvision.transforms import Normalize

mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
normalize_transform = Normalize(mean, std)

In [None]:
def isotropically_resize_image(img, size, resample=cv2.INTER_AREA):
    h, w = img.shape[:2]
    if w > h:
        h = h * size // w
        w = size
    else:
        w = w * size // h
        h = size

    resized = cv2.resize(img, (w, h), interpolation=resample)
    return resized


def make_square_image(img):
    h, w = img.shape[:2]
    size = max(h, w)
    t = 0
    b = size - h
    l = 0
    r = size - w
    return cv2.copyMakeBorder(img, t, b, l, r, cv2.BORDER_CONSTANT, value=0)

In [None]:
!ls ../input/deepfake-xception-trained-model

In [None]:
# Load Xception model with your custom implementation
from pytorchcv.model_provider import get_model
class FCN(nn.Module):
    def __init__(self, base, in_f):
        super(FCN, self).__init__()
        self.base = base
        self.h1 = Head(in_f, 1)

    def forward(self, x):
        x = self.base(x)
        return self.h1(x)

# Load Xception model and remove the output layer
xception_base = get_model("xception", pretrained=False)
xception_base = nn.Sequential(*list(xception_base.children())[:-1])

# Replace final block's pooling layer
xception_base[0].final_block.pool = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)))

# Create the full Xception model with a custom head
class Head(nn.Module):
    def __init__(self, in_f, out_f):
        super(Head, self).__init__()
        self.f = nn.Flatten()
        self.l = nn.Linear(in_f, 512)
        self.d = nn.Dropout(0.5)
        self.o = nn.Linear(512, out_f)
        self.b1 = nn.BatchNorm1d(in_f)
        self.b2 = nn.BatchNorm1d(512)
        self.r = nn.ReLU()

    def forward(self, x):
        x = self.f(x)
        x = self.b1(x)
        x = self.d(x)
        x = self.l(x)
        x = self.r(x)
        x = self.b2(x)
        x = self.d(x)
        return self.o(x)

xception_model = FCN(xception_base, 2048)
xception_model = xception_model.to(device)
xception_model.load_state_dict(torch.load('../input/deepfake-xception-trained-model/model.pth'))


In [None]:
from pytorchcv.model_provider import get_model
model = get_model("xception", pretrained=False)
model = nn.Sequential(*list(model.children())[:-1]) # Remove original output layer

class Pooling(nn.Module):
  def __init__(self):
    super(Pooling, self).__init__()

    self.p1 = nn.AdaptiveAvgPool2d((1,1))
    self.p2 = nn.AdaptiveMaxPool2d((1,1))

  def forward(self, x):
    x1 = self.p1(x)
    x2 = self.p2(x)
    return (x1+x2) * 0.5

model[0].final_block.pool = nn.Sequential(nn.AdaptiveAvgPool2d((1,1)))

class Head(torch.nn.Module):
  def __init__(self, in_f, out_f):
    super(Head, self).__init__()

    self.f = nn.Flatten()
    self.l = nn.Linear(in_f, 512)
    self.d = nn.Dropout(0.5)
    self.o = nn.Linear(512, out_f)
    self.b1 = nn.BatchNorm1d(in_f)
    self.b2 = nn.BatchNorm1d(512)
    self.r = nn.ReLU()

  def forward(self, x):
    x = self.f(x)
    x = self.b1(x)
    x = self.d(x)

    x = self.l(x)
    x = self.r(x)
    x = self.b2(x)
    x = self.d(x)

    out = self.o(x)
    return out

class FCN(torch.nn.Module):
  def __init__(self, base, in_f):
    super(FCN, self).__init__()
    self.base = base
    self.h1 = Head(in_f, 1)

  def forward(self, x):
    x = self.base(x)
    return self.h1(x)

net = []
model = FCN(model, 2048)
model = model.cuda()
model.load_state_dict(torch.load('../input/deepfake-xception-trained-model/model.pth')) # new, updated
net.append(model)

## Prediction loop

In [None]:
def predict_on_video(video_path, batch_size):
    try:
        # Find the faces for N frames in the video.
        faces = face_extractor.process_video(video_path)

        # Only look at one face per frame.
        face_extractor.keep_only_best_face(faces)

        if len(faces) > 0:
            # NOTE: When running on the CPU, the batch size must be fixed
            # or else memory usage will blow up. (Bug in PyTorch?)
            x = np.zeros((batch_size, input_size, input_size, 3), dtype=np.uint8)

            # If we found any faces, prepare them for the model.
            n = 0
            for frame_data in faces:
                for face in frame_data["faces"]:
                    # Resize to the model's required input size.
                    # We keep the aspect ratio intact and add zero
                    # padding if necessary.
                    resized_face = isotropically_resize_image(face, input_size)
                    resized_face = make_square_image(resized_face)

                    if n < batch_size:
                        x[n] = resized_face
                        n += 1
                    else:
                        print("WARNING: have %d faces but batch size is %d" % (n, batch_size))

                    # Test time augmentation: horizontal flips.
                    # TODO: not sure yet if this helps or not
                    #x[n] = cv2.flip(resized_face, 1)
                    #n += 1

            if n > 0:
                x = torch.tensor(x, device=gpu).float()

                # Preprocess the images.
                x = x.permute((0, 3, 1, 2))

                for i in range(len(x)):
                    x[i] = normalize_transform(x[i] / 255.)
#                     x[i] = x[i] / 255.

                # Make a prediction, then take the average.
                with torch.no_grad():
                    y_pred = model(x)
                    y_pred = torch.sigmoid(y_pred.squeeze())
                    return y_pred[:n].mean().item()

    except Exception as e:
        print("Prediction error on video %s: %s" % (video_path, str(e)))

    return 0.5

In [None]:
from concurrent.futures import ThreadPoolExecutor

def predict_on_video_set(videos, num_workers):
    def process_file(i):
        filename = videos[i]
        y_pred = predict_on_video(os.path.join(test_dir, filename), batch_size=frames_per_video)
        return y_pred

    with ThreadPoolExecutor(max_workers=num_workers) as ex:
        predictions = ex.map(process_file, range(len(videos)))

    return list(predictions)

In [None]:
speed_test = False

In [None]:
if speed_test:
    start_time = time.time()
    speedtest_videos = test_videos[:5]
    predictions = predict_on_video_set(speedtest_videos, num_workers=4)
    elapsed = time.time() - start_time
    print("Elapsed %f sec. Average per video: %f sec." % (elapsed, elapsed / len(speedtest_videos)))

In [None]:
%%time
model.eval()
predictions = predict_on_video_set(test_videos, num_workers=4)

In [None]:
submission_df_xception = pd.DataFrame({"filename": test_videos, "label": predictions})
submission_df_xception.to_csv("submission_xception.csv", index=False)

In [None]:
submission_df_resnext.head()

In [None]:
submission_df_xception.head()

In [None]:
# Load ResNeXt model with your custom implementation
import torchvision.models as models
class MyResNeXt(models.resnet.ResNet):
    def __init__(self, training=True):
        super(MyResNeXt, self).__init__(block=models.resnet.Bottleneck,
                                        layers=[3, 4, 6, 3],
                                        groups=32,
                                        width_per_group=4)
        self.fc = nn.Linear(2048, 1)

resnext_model = MyResNeXt().to(device)
resnext_model.load_state_dict(torch.load('resnext.pth'))

In [None]:


# Load dataset
test_dataset = UADFVVideoDataset(data_dir=UADFV_dir + '/test', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Inference on test data
def predict_on_test_data():
    xception_model.eval()
    resnext_model.eval()

    for videos, labels in test_loader:
        videos = videos.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            xception_output = xception_model(videos)
            resnext_output = resnext_model(videos)

            # Average the predictions from both models (ensemble)
            final_output = (xception_output + resnext_output) / 2
            prediction = torch.argmax(final_output, dim=1)

            print(f"Prediction: {'Deepfake' if prediction.item() == 1 else 'Real'}")


In [None]:
# Fine-tuning on the training data
train_dataset = UADFVVideoDataset(data_dir=UADFV_dir + '/train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Fine-tuning setup
for param in xception_model.parameters():
    param.requires_grad = False
for param in resnext_model.parameters():
    param.requires_grad = False

# Unfreeze final layers for training
for param in xception_model.h1.parameters():
    param.requires_grad = True
for param in resnext_model.fc.parameters():
    param.requires_grad = True

# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer_xception = optim.Adam(xception_model.h1.parameters(), lr=0.0001)
optimizer_resnext = optim.Adam(resnext_model.fc.parameters(), lr=0.0001)

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    xception_model.train()
    resnext_model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for videos, labels in train_loader:
        videos, labels = videos.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer_xception.zero_grad()
        optimizer_resnext.zero_grad()

        # Forward pass
        xception_output = xception_model(videos)
        resnext_output = resnext_model(videos)

        # Average predictions
        final_output = (xception_output + resnext_output) / 2
        loss = criterion(final_output, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer_xception.step()
        optimizer_resnext.step()

        # Calculate accuracy
        running_loss += loss.item()
        predicted = (torch.sigmoid(final_output) > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")

# Save the fine-tuned models
torch.save(xception_model.state_dict(), 'fine_tuned_xception.pth')
torch.save(resnext_model.state_dict(), 'fine_tuned_resnext.pth')

# Run predictions on test data
predict_on_test_data()

## Ensemble of Resnext and Xception

In [None]:
submission_df = pd.DataFrame({"filename": test_videos})

In [None]:
r1 = 0.46441
r2 = 0.52189
total = r1 + r2
r11 = r1/total
r22 = r2/total

In [None]:
submission_df["label"] = r22*submission_df_resnext["label"] + r11*submission_df_xception["label"]

In [None]:
submission_df.to_csv("submission.csv", index=False)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
from blazeface import BlazeFace
from read_video_1 import process_video_frames  # Ensure this script is uploaded

# Define UADFV Directory
UADFV_dir = "/kaggle/input/videos"

# Dataset class for UADFV videos
class UADFVVideoDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.video_paths = []
        self.labels = []

        # Load video paths and their corresponding labels
        for label in ['real', 'fake']:
            label_dir = os.path.join(data_dir, label)
            for video_file in os.listdir(label_dir):
                self.video_paths.append(os.path.join(label_dir, video_file))
                self.labels.append(0 if label == 'real' else 1)  # Assign 0 for real, 1 for fake

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        video_frames = process_video_frames(video_path)  # Extract frames from video
        label = self.labels[idx]

        if self.transform:
            video_frames = self.transform(video_frames)

        return video_frames, label

# Data transformations (if needed)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load BlazeFace Model for face detection
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
facedet = BlazeFace().to(device)
facedet.load_weights('blazeface.pth')  # Load BlazeFace weights
facedet.load_anchors('anchors.npy')    # Load BlazeFace anchors
facedet.train(False)

# Load Xception model with your custom implementation
from pytorchcv.model_provider import get_model
class FCN(nn.Module):
    def __init__(self, base, in_f):
        super(FCN, self).__init__()
        self.base = base
        self.h1 = Head(in_f, 1)

    def forward(self, x):
        x = self.base(x)
        return self.h1(x)

# Load Xception model and remove the output layer
xception_base = get_model("xception", pretrained=False)
xception_base = nn.Sequential(*list(xception_base.children())[:-1])

# Replace final block's pooling layer
xception_base[0].final_block.pool = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)))

# Create the full Xception model with a custom head
class Head(nn.Module):
    def __init__(self, in_f, out_f):
        super(Head, self).__init__()
        self.f = nn.Flatten()
        self.l = nn.Linear(in_f, 512)
        self.d = nn.Dropout(0.5)
        self.o = nn.Linear(512, out_f)
        self.b1 = nn.BatchNorm1d(in_f)
        self.b2 = nn.BatchNorm1d(512)
        self.r = nn.ReLU()

    def forward(self, x):
        x = self.f(x)
        x = self.b1(x)
        x = self.d(x)
        x = self.l(x)
        x = self.r(x)
        x = self.b2(x)
        x = self.d(x)
        return self.o(x)

xception_model = FCN(xception_base, 2048)
xception_model = xception_model.to(device)
xception_model.load_state_dict(torch.load('../input/deepfake-xception-trained-model/model.pth'))

# Load ResNeXt model with your custom implementation
import torchvision.models as models
class MyResNeXt(models.resnet.ResNet):
    def __init__(self, training=True):
        super(MyResNeXt, self).__init__(block=models.resnet.Bottleneck,
                                        layers=[3, 4, 6, 3],
                                        groups=32,
                                        width_per_group=4)
        self.fc = nn.Linear(2048, 1)

resnext_model = MyResNeXt().to(device)
resnext_model.load_state_dict(torch.load('resnext.pth'))

# Load dataset
test_dataset = UADFVVideoDataset(data_dir=UADFV_dir + '/test', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Inference on test data
def predict_on_test_data():
    xception_model.eval()
    resnext_model.eval()

    for videos, labels in test_loader:
        videos = videos.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            xception_output = xception_model(videos)
            resnext_output = resnext_model(videos)

            # Average the predictions from both models (ensemble)
            final_output = (xception_output + resnext_output) / 2
            prediction = torch.argmax(final_output, dim=1)

            print(f"Prediction: {'Deepfake' if prediction.item() == 1 else 'Real'}")

# Fine-tuning on the training data
train_dataset = UADFVVideoDataset(data_dir=UADFV_dir + '/train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Fine-tuning setup
for param in xception_model.parameters():
    param.requires_grad = False
for param in resnext_model.parameters():
    param.requires_grad = False

# Unfreeze final layers for training
for param in xception_model.h1.parameters():
    param.requires_grad = True
for param in resnext_model.fc.parameters():
    param.requires_grad = True

# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer_xception = optim.Adam(xception_model.h1.parameters(), lr=0.0001)
optimizer_resnext = optim.Adam(resnext_model.fc.parameters(), lr=0.0001)

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    xception_model.train()
    resnext_model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for videos, labels in train_loader:
        videos, labels = videos.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer_xception.zero_grad()
        optimizer_resnext.zero_grad()

        # Forward pass
        xception_output = xception_model(videos)
        resnext_output = resnext_model(videos)

        # Average predictions
        final_output = (xception_output + resnext_output) / 2
        loss = criterion(final_output, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer_xception.step()
        optimizer_resnext.step()

        # Calculate accuracy
        running_loss += loss.item()
        predicted = (torch.sigmoid(final_output) > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")

# Save the fine-tuned models
torch.save(xception_model.state_dict(), 'fine_tuned_xception.pth')
torch.save(resnext_model.state_dict(), 'fine_tuned_resnext.pth')

# Run predictions on test data
predict_on_test_data()


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import cv2
from PIL import Image
import os
from blazeface import BlazeFace
from helpers.read_video_1 import VideoReader
from helpers.face_extract_1 import FaceExtractor

# Define UADFV Directory
UADFV_dir = "/kaggle/input/videos"

# Initialize BlazeFace
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
facedet = BlazeFace().to(device)
facedet.load_weights("/kaggle/input/blazeface-pytorch/blazeface.pth")
facedet.load_anchors("/kaggle/input/blazeface-pytorch/anchors.npy")
facedet.train(False)

frames_per_video = 64
video_reader = VideoReader()
video_read_fn = lambda x: video_reader.read_frames(x, num_frames=frames_per_video)
face_extractor = FaceExtractor(video_read_fn, facedet)

# Custom transformations
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
normalize_transform = transforms.Normalize(mean, std)

# Helper functions to resize images
def isotropically_resize_image(img, size, resample=cv2.INTER_AREA):
    h, w = img.shape[:2]
    if w > h:
        h = h * size // w
        w = size
    else:
        w = w * size // h
        h = size
    resized = cv2.resize(img, (w, h), interpolation=resample)
    return resized

def make_square_image(img):
    h, w = img.shape[:2]
    size = max(h, w)
    t = 0
    b = size - h
    l = 0
    r = size - w
    return cv2.copyMakeBorder(img, t, b, l, r, cv2.BORDER_CONSTANT, value=0)

# Dataset class for UADFV videos
class UADFVVideoDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.video_paths = []
        self.labels = []

        # Load video paths and their corresponding labels
        for label in ['real', 'fake']:
            label_dir = os.path.join(data_dir, label)
            for video_file in os.listdir(label_dir):
                self.video_paths.append(os.path.join(label_dir, video_file))
                self.labels.append(0 if label == 'real' else 1)  # Assign 0 for real, 1 for fake

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]
        print (video_path,label)
        return video_path, label


In [None]:
!pip install ../input/deepfake-xception-trained-model/pytorchcv-0.0.55-py2.py3-none-any.whl --quiet

In [None]:
def preprocess_video(video_frames):
    """
    Preprocess a batch of video frames:

Resize
Normalize
Convert to Tensor

    Args:
    video_frames (list or ndarray): List of frames (could be loaded from a video).

    Returns:
    torch.Tensor: Preprocessed video frames ready for model input.
    """

    # Assuming you need to resize to a specific size, like (224, 224)
    resize_transform = transforms.Resize((224, 224))

    # Normalize based on the model's requirements (mean and std from the dataset)
    normalize_transform = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

    # ToTensor converts the images to PyTorch tensors
    preprocess_transform = transforms.Compose([
        resize_transform,
        transforms.ToTensor(),
        normalize_transform
    ])

    # Apply the transform to each frame in the video
    preprocessed_frames = [preprocess_transform(frame) for frame in video_frames]

    # Stack the frames along the batch dimension to create a single tensor
    video_tensor = torch.stack(preprocessed_frames)

    return video_tensor

In [None]:
# Define input sizes for each model
input_size_resnext = 224  # ResNeXt input size
input_size_xception = 150  # XceptionNet input size

# Helper function to process faces for both models
def predict_on_video(video_path, batch_size):
    try:
        # Find the faces for N frames in the video.
        faces = face_extractor.process_video(video_path)

        # Only look at one face per frame.
        face_extractor.keep_only_best_face(faces)

        if len(faces) > 0:
            # Prepare inputs for ResNeXt and Xception
            x_resnext = np.zeros((batch_size, input_size_resnext, input_size_resnext, 3), dtype=np.uint8)
            x_xception = np.zeros((batch_size, input_size_xception, input_size_xception, 3), dtype=np.uint8)

            # If we found any faces, resize them for each model.
            n = 0
            for frame_data in faces:
                for face in frame_data["faces"]:
                    # Resize for ResNeXt
                    resized_face_resnext = isotropically_resize_image(face, input_size_resnext)
                    resized_face_resnext = make_square_image(resized_face_resnext)
                    # Resize for Xception
                    resized_face_xception = isotropically_resize_image(face, input_size_xception)
                    resized_face_xception = make_square_image(resized_face_xception)

                    if n < batch_size:
                        x_resnext[n] = resized_face_resnext
                        x_xception[n] = resized_face_xception
                        n += 1

            if n > 0:
                x_resnext = torch.tensor(x_resnext, device=device).float().permute((0, 3, 1, 2))
                x_xception = torch.tensor(x_xception, device=device).float().permute((0, 3, 1, 2))

                # Preprocess the images (normalize them)
                for i in range(len(x_resnext)):
                    x_resnext[i] = normalize_transform(x_resnext[i] / 255.)
                for i in range(len(x_xception)):
                    x_xception[i] = normalize_transform(x_xception[i] / 255.)

                # Make predictions for both models and return the average
                with torch.no_grad():
                    y_pred_resnext = resnext_model(x_resnext)
                    y_pred_xception = xception_model(x_xception)
                    y_pred_resnext = torch.sigmoid(y_pred_resnext.squeeze())
                    y_pred_xception = torch.sigmoid(y_pred_xception.squeeze())

                    return (y_pred_resnext[:n].mean().item() + y_pred_xception[:n].mean().item()) / 2

    except Exception as e:
        print(f"Prediction error on video {video_path}: {str(e)}")

    return 0.5

In [None]:


# Load models
from pytorchcv.model_provider import get_model

# Xception model
class FCN(nn.Module):
    def __init__(self, base, in_f):
        super(FCN, self).__init__()
        self.base = base
        self.h1 = Head(in_f, 1)

    def forward(self, x):
        x = self.base(x)
        return self.h1(x)

# Custom head for Xception
class Head(nn.Module):
    def __init__(self, in_f, out_f):
        super(Head, self).__init__()
        self.f = nn.Flatten()
        self.l = nn.Linear(in_f, 512)
        self.d = nn.Dropout(0.5)
        self.o = nn.Linear(512, out_f)
        self.b1 = nn.BatchNorm1d(in_f)
        self.b2 = nn.BatchNorm1d(512)
        self.r = nn.ReLU()

    def forward(self, x):
        x = self.f(x)
        x = self.b1(x)
        x = self.d(x)
        x = self.l(x)
        x = self.r(x)
        x = self.b2(x)
        x = self.d(x)
        return self.o(x)

xception_base = get_model("xception", pretrained=False)
xception_base = nn.Sequential(*list(xception_base.children())[:-1])
xception_base[0].final_block.pool = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)))

xception_model = FCN(xception_base, 2048).to(device)
xception_model.load_state_dict(torch.load('../input/deepfake-xception-trained-model/model.pth'))

# ResNeXt model
import torchvision.models as models
class MyResNeXt(models.resnet.ResNet):
    def __init__(self, training=True):
        super(MyResNeXt, self).__init__(block=models.resnet.Bottleneck,
                                        layers=[3, 4, 6, 3],
                                        groups=32,
                                        width_per_group=4)
        self.fc = nn.Linear(2048, 1)

resnext_model = MyResNeXt().to(device)
resnext_model.load_state_dict(torch.load('/kaggle/input/deepfakes-inference-demo/resnext.pth'))

# Load dataset
test_dataset = UADFVVideoDataset(data_dir=UADFV_dir + '/test', transform=None)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Prediction on test data
def predict_on_test_data():
    xception_model.eval()
    resnext_model.eval()

    for videos, labels in test_loader:
        videos = videos.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            xception_output = xception_model(videos)
            resnext_output = resnext_model(videos)

            # Ensemble predictions
            final_output = (xception_output + resnext_output) / 2
            prediction = torch.argmax(final_output, dim=1)

            print(f"Prediction: {'Deepfake' if prediction.item() == 1 else 'Real'}")

# Speed test on videos
from concurrent.futures import ThreadPoolExecutor
import time

def predict_on_video_paths(video_paths, num_workers):
    def process_file(video_path):
        y_pred = predict_on_video(video_path, batch_size=frames_per_video)
        return y_pred

    with ThreadPoolExecutor(max_workers=num_workers) as ex:
        predictions = ex.map(process_file, video_paths)

    return list(predictions)
def predict_on_data_loader(dataloader, batch_size):
    all_predictions = []
    all_labels = []

    for video_path, label in dataloader:
        print(f"Video path type: {type(video_path)}")
        print(f"Video label: {type(label)}")
        # Unpack the video path if it's inside a tuple
        if isinstance(video_path, tuple):
            video_path = video_path[0]

        # Call the predict_on_video function for each video path
        prediction = predict_on_video(video_path, batch_size)
        all_predictions.append(prediction)
        all_labels.append(label)

    return all_predictions, all_labels
#Run predictions on a small batch (using file paths)
speed_test = True
if speed_test:
    start_time = time.time()
    speedtest_videos = ['/kaggle/input/videos/test/fake/0046_fake.mp4', '/kaggle/input/videos/test/real/0046.mp4']  # Adjust your video paths
    predictions = predict_on_video_paths(speedtest_videos, num_workers=4)
    elapsed = time.time() - start_time
    print(f"Elapsed {elapsed:.4f} sec. Average per video: {elapsed / len(speedtest_videos):.4f} sec.")
    print(predictions)
#Inference on full test set (using DataLoader)
xception_model.eval()
resnext_model.eval()

predictions = predict_on_data_loader(test_loader,4)


Elapsed 1.4304 sec. Average per video: 0.7152 sec.
[0.48261022567749023, 0.45802871882915497]
/kaggle/input/videos/test/real/0046.mp4 0
Video path type: <class 'tuple'>
Video label: <class 'torch.Tensor'>
/kaggle/input/videos/test/real/0047.mp4 0
Video path type: <class 'tuple'>
Video label: <class 'torch.Tensor'>
/kaggle/input/videos/test/real/0048.mp4 0
Video path type: <class 'tuple'>
Video label: <class 'torch.Tensor'>
/kaggle/input/videos/test/fake/0046_fake.mp4 1
Video path type: <class 'tuple'>
Video label: <class 'torch.Tensor'>
/kaggle/input/videos/test/fake/0048_fake.mp4 1
Video path type: <class 'tuple'>
Video label: <class 'torch.Tensor'>
/kaggle/input/videos/test/fake/0047_fake.mp4 1
Video path type: <class 'tuple'>
Video label: <class 'torch.Tensor'>


In [None]:
print (predictions)

([0.4993215650320053, 0.0613549305126071, 0.7446506321430206, 0.6337763518095016, 0.9603795409202576, 0.5451347678899765], [tensor([0]), tensor([0]), tensor([0]), tensor([1]), tensor([1]), tensor([1])])


In [None]:
#Save ResNeXt model
torch.save(resnext_model.state_dict(), 'resnext_model.pth')
#Save Xception model
torch.save(xception_model.state_dict(), 'xception_model.pth')

In [None]:
test_dir = "/kaggle/input/deepfake-detection-challenge/test_videos/"

test_videos = sorted([x for x in os.listdir(test_dir) if x[-4:] == ".mp4"])
frame_h = 5
frame_l = 5
len(test_videos)

400

In [None]:
predictions = predict_on_video_paths(test_videos, num_workers=4)

Prediction error on video aassnaulhq.mp4: need at least one array to concatenate
Prediction error on video aayfryxljh.mp4: need at least one array to concatenate
Prediction error on video acazlolrpz.mp4: need at least one array to concatenate
Prediction error on video adohdulfwb.mp4: need at least one array to concatenate
Prediction error on video aktnlyqpah.mp4: need at least one array to concatenate
Prediction error on video ahjnxtiamx.mp4: need at least one array to concatenate
Prediction error on video alrtntfxtd.mp4: need at least one array to concatenate
Prediction error on video apedduehoy.mp4: need at least one array to concatenate
Prediction error on video ajiyrjfyzp.mp4: need at least one array to concatenatePrediction error on video apvzjkvnwn.mp4: need at least one array to concatenate
Prediction error on video aomqqjipcp.mp4: need at least one array to concatenate
Prediction error on video aqrsylrzgi.mp4: need at least one array to concatenate
Prediction error on video ayi

In [None]:
submission_df_resnext = pd.DataFrame({"filename": test_videos, "label": predictions})
submission_df_resnext.head()