In [1]:
import pandas as pd
import numpy as np
import os
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import timm
from PIL import Image

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
real_videos_dir="dataset/DFD_original sequences"
fake_videos_dir="dataset/DFD_manipulated_sequences/DFD_manipulated_sequences"
output_real_dir="dataset/output_real"
output_fake_dir="dataset/output_fake"
os.makedirs(output_real_dir, exist_ok=True)
os.makedirs(output_fake_dir, exist_ok=True)

In [3]:
def extract_frames_from_videos(videos_dir, output_dir, label, max_videos=50):
    video_files = [f for f in os.listdir(videos_dir) if f.endswith(('.mp4', '.avi', '.mov', '.mkv'))]
    # video_files = video_files[:max_videos]  # Limit to max_videos

    for video_file in video_files:
        video_path = os.path.join(videos_dir, video_file)
        cap = cv2.VideoCapture(video_path)
        frame_count = 0
        success, image = cap.read()

        while success:
            if frame_count % int(cap.get(cv2.CAP_PROP_FPS)) == 0:
                frame_filename = f"{label}_{video_file}_frame{frame_count // int(cap.get(cv2.CAP_PROP_FPS))}.jpg"
                frame_path = os.path.join(output_dir, frame_filename)
                cv2.imwrite(frame_path, image)
            success, image = cap.read()
            frame_count += 1

        cap.release()

In [4]:
# extract_frames_from_videos(real_videos_dir, output_real_dir, "real", max_videos=100)
# extract_frames_from_videos(fake_videos_dir, output_fake_dir, "manipulated", max_videos=100)
print("Frame extraction completed.")

Frame extraction completed.


In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
print(device)

cuda


In [6]:
dataset_dir = "dataset/frames"  
dataset = datasets.ImageFolder(root=dataset_dir, transform=transform)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])


train_loader = DataLoader(train_dataset, shuffle=True,)
val_loader = DataLoader(val_dataset, shuffle=False)

In [15]:
print(timm.list_pretrained("resnext50*"))
model = timm.create_model('vit_tiny_patch16_224', num_classes=2)


['resnext50_32x4d.a1_in1k', 'resnext50_32x4d.a1h_in1k', 'resnext50_32x4d.a2_in1k', 'resnext50_32x4d.a3_in1k', 'resnext50_32x4d.fb_ssl_yfcc100m_ft_in1k', 'resnext50_32x4d.fb_swsl_ig1b_ft_in1k', 'resnext50_32x4d.gluon_in1k', 'resnext50_32x4d.ra_in1k', 'resnext50_32x4d.tv2_in1k', 'resnext50_32x4d.tv_in1k', 'resnext50d_32x4d.bt_in1k']


In [9]:

model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-5, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.7)

num_epochs = 5
best_val_accuracy = 0
patience = 5
patience_counter = 0

print('entered training')
print(len(train_loader))
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0
    i=1
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        if(i%100==0):
            print(i)
        
        i+=1

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    train_accuracy = 100 * correct_train / total_train
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Training Accuracy: {train_accuracy:.2f}%")

    # Validation
    model.eval()
    correct_val = 0
    total_val = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    val_accuracy = 100 * correct_val / total_val
    print(f"Validation Accuracy: {val_accuracy:.2f}%")

    # Save best model with early stopping
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), 'best_efv2_model.pth')
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping due to no improvement.")
            break
    scheduler.step()

print(f"Best Validation Accuracy: {best_val_accuracy:.2f}%")


entered training
4988
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
2200
2300
2400
2500
2600
2700
2800
2900
3000
3100
3200
3300
3400
3500
3600
3700
3800
3900
4000
4100
4200
4300
4400
4500
4600
4700
4800
4900
Epoch [1/5], Loss: 0.6906, Training Accuracy: 54.05%
Validation Accuracy: 52.16%
100
200


KeyboardInterrupt: 