In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
abdelrahmantarekm_final_merged_dataset_path = kagglehub.dataset_download('abdelrahmantarekm/final-merged-dataset')

print('Data source import complete.')


In [None]:
!pip install efficientnet-pytorch mlflow

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms
from pathlib import Path
import mlflow
import numpy as np
import pickle
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, confusion_matrix

# Custom Dataset with Limited Debug Output
class DeepfakeDataset(Dataset):
    def __init__(self, data_dir, split='train', transform=None):
        self.data_dir = Path(data_dir) / 'final_dataset' / split
        self.transform = transform
        self.images = []
        self.labels = []

        print(f"Scanning directory for split {split}: {self.data_dir}")
        for item in self.data_dir.glob('*'):
            print(f"Found item: {item}")

        # Look for real and fake subdirectories
        for label, class_name in enumerate(['real', 'fake']):
            class_dir = self.data_dir / class_name
            if class_dir.exists():
                print(f"Scanning {class_name} directory: {class_dir}")
                for img_path in class_dir.glob('*.[jp][pn][gf]*'):  # Broad extension match
                    self.images.append(img_path)
                    self.labels.append(label)
            else:
                print(f"Directory {class_name} not found in {self.data_dir}")

        # Limit image list print to first 5 for brevity
        image_count = len(self.images)
        sample_images = self.images[:5] if image_count > 0 else []
        print(f"Found {image_count} images: {sample_images[:5] if sample_images else []}")
        print(f"Labels length: {len(self.labels)}")

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        image = plt.imread(img_path)
        if self.transform:
            image = self.transform(image)
        return image, label

# Data Transforms
data_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Model and Trainer
class DeepfakeTrainer:
    def __init__(self, model, device):
        self.model = model
        self.device = device
        self.model.to(device)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(model.parameters(), lr=0.001)
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, mode='min', factor=0.1, patience=5)
        self.train_losses = []
        self.val_accuracies = []

    def train(self, train_loader, val_loader, epochs=5):
        for epoch in range(epochs):
            self.model.train()
            running_loss = 0.0
            for images, labels in train_loader:
                images, labels = images.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(images)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()
            epoch_loss = running_loss / len(train_loader)
            self.train_losses.append(epoch_loss)

            self.model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for images, labels in val_loader:
                    images, labels = images.to(self.device), labels.to(self.device)
                    outputs = self.model(images)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()
            val_accuracy = 100 * correct / total
            self.val_accuracies.append(val_accuracy)
            self.scheduler.step(epoch_loss)
            print(f'Epoch {epoch+1}, Loss: {epoch_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%')

    def test(self, test_loader):
        self.model.eval()
        correct = 0
        total = 0
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(self.device), labels.to(self.device)
                outputs = self.model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                all_preds.extend(predicted.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
        test_accuracy = 100 * correct / total
        auc = roc_auc_score(all_labels, all_preds)
        cm = confusion_matrix(all_labels, all_preds)
        print(f'Final Test Accuracy: {test_accuracy:.2f}%, AUC: {auc:.4f}')
        return test_accuracy, auc, cm

# Main Execution
def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1)
    num_ftrs = model.classifier[1].in_features
    model.classifier[1] = nn.Linear(num_ftrs, 2)  # Binary classification
    mlflow.set_tracking_uri("file:///kaggle/working/mlruns1")
    mlflow.set_experiment("Deepfake_Detection")
    with mlflow.start_run():
        DATA_DIR = Path("/kaggle/input/final-merged-dataset/")

        # Create datasets for each split
        train_dataset = DeepfakeDataset(DATA_DIR, split='train', transform=data_transforms)
        val_dataset = DeepfakeDataset(DATA_DIR, split='validation', transform=data_transforms)
        test_dataset = DeepfakeDataset(DATA_DIR, split='test', transform=data_transforms)

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32)
        test_loader = DataLoader(test_dataset, batch_size=32)

        trainer = DeepfakeTrainer(model, device)
        trainer.train(train_loader, val_loader)
        test_acc, auc, cm = trainer.test(test_loader)

        # Save model
        model_path_pth = Path("/kaggle/working/deepfake_model.pth")
        torch.save(model.state_dict(), model_path_pth)
        model_path_pickle = Path("/kaggle/working/deepfake_model.pkl")
        with open(model_path_pickle, 'wb') as f:
            pickle.dump(trainer.model, f)

        # Log metrics
        mlflow.log_metric("test_accuracy", test_acc)
        mlflow.log_metric("auc", auc)
        mlflow.log_artifact(model_path_pth)
        mlflow.log_artifact(model_path_pickle)

# Add directory listing command (limited output)
print("Directory structure (top level):")
!ls /kaggle/input/final-merged-dataset/final_dataset

if __name__ == "__main__":
    main()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import os
import mlflow
from data_handler import get_transforms
from train_efficientnet import DeepfakeEfficientNet
import logging
import numpy as np
from PIL import Image
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Custom Dataset
class DeepfakeDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

# Training function
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device):
    best_acc = 0.0
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        scheduler.step()

        # Validation
        model.eval()
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_acc = 100 * val_correct / val_total

        logger.info(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.2f}%, Val Acc: {val_acc:.2f}%')
        mlflow.log_metric("train_loss", epoch_loss, step=epoch)
        mlflow.log_metric("train_accuracy", epoch_acc, step=epoch)
        mlflow.log_metric("val_accuracy", val_acc, step=epoch)

        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')

    return model

def load_data(data_dir, split, transform):
    fake_dir = os.path.join(data_dir, split, "fake")
    real_dir = os.path.join(data_dir, split, "real")

    fake_images = [Path(os.path.join(fake_dir, img)) for img in os.listdir(fake_dir) if img.endswith(('.jpg', '.jpeg', '.png'))]
    real_images = [Path(os.path.join(real_dir, img)) for img in os.listdir(real_dir) if img.endswith(('.jpg', '.jpeg', '.png'))]

    image_paths = fake_images + real_images
    labels = [0] * len(fake_images) + [1] * len(real_images)
    logger.info(f"Found {len(image_paths)} images in {split}")

    return image_paths, labels

def main():
    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info(f"Training on: {device}")

    # Data directory
    data_dir = "/kaggle/input/final-merged-dataset/final_dataset/"

    # Load datasets
    train_paths, train_labels = load_data(data_dir, "train", None)
    val_paths, val_labels = load_data(data_dir, "validation", None)

    # Data augmentation and transforms
    train_transform = transforms.Compose([
        transforms.Resize((300, 300)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    val_transform = transforms.Compose([
        transforms.Resize((300, 300)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    train_dataset = DeepfakeDataset(train_paths, train_labels, transform=train_transform)
    val_dataset = DeepfakeDataset(val_paths, val_labels, transform=val_transform)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)

    # Model
    model = DeepfakeEfficientNet().to(device)
    logger.info("Model initialized")

    # Loss and optimizer
    num_fake = len([p for p in train_paths if "fake" in str(p)])
    num_real = len(train_paths) - num_fake
    total_samples = num_fake + num_real
    class_weights = torch.tensor([num_real / total_samples, num_fake / total_samples]).to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

    # Start MLflow run
    with mlflow.start_run():
        mlflow.log_params({
            "batch_size": 64,
            "learning_rate": 0.0005,
            "epochs": 15,
            "class_weights": class_weights.tolist(),
            "weight_decay": 1e-4
        })
        model = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=15, device=device)

        # Save the final model
        torch.save(model.state_dict(), 'final_model.pth')
        mlflow.log_artifact('final_model.pth')

if __name__ == "__main__":
    main()