In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.datasets import ImageFolder
from sklearn.metrics import classification_report
import numpy as np
from PIL import Image
from skimage.feature import local_binary_pattern
from scipy.fftpack import dct      # importing required libraries

In [None]:
# ------------------------------------------
# Custom Dataset: includes RGB, LBP, and DCT
# ------------------------------------------
class CustomDataset(Dataset):
    def __init__(self, image_folder_dataset, transform=None):
        self.dataset = image_folder_dataset
        self.transform = transform
        self.resize = transforms.Resize((128, 128))  # Ensure consistent input size

    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        
        # Resize image to 128x128 before further processing
        resized_image = self.resize(image)
        image_np = np.array(resized_image)
        
        # Convert to grayscale if needed
        gray_image = np.array(Image.fromarray(image_np).convert('L')) if image_np.ndim == 3 else image_np

        # ----------- LBP Feature Extraction -----------
        lbp = local_binary_pattern(gray_image, 8, 1, method='uniform')
        lbp = ((lbp - lbp.min()) / (lbp.max() - lbp.min() + 1e-8) * 255).astype(np.uint8)

        # ----------- DCT Feature Extraction -----------
        h, w = gray_image.shape
        dct_block_size = 8
        dct_feature = np.zeros((h, w), dtype=np.float32)

        for i in range(0, h, dct_block_size):
            for j in range(0, w, dct_block_size):
                if i + dct_block_size <= h and j + dct_block_size <= w:
                    block = gray_image[i:i+dct_block_size, j:j+dct_block_size]
                    dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
                    dct_feature[i:i+dct_block_size, j:j+dct_block_size] = dct_block

        dct_feature = ((dct_feature - dct_feature.min()) / (dct_feature.max() - dct_feature.min() + 1e-8) * 255).astype(np.uint8)

        # ----------- Transform image tensor (RGB) -----------
        if self.transform:
            image_tensor = self.transform(resized_image)

        # Convert LBP and DCT features to PyTorch tensors
        lbp_tensor = torch.from_numpy(lbp).float().unsqueeze(0) / 255.0
        dct_tensor = torch.from_numpy(dct_feature).float().unsqueeze(0) / 255.0

        # Normalize LBP & DCT features
        lbp_tensor = (lbp_tensor - 0.5) / 0.225
        dct_tensor = (dct_tensor - 0.5) / 0.225

        # Concatenate RGB, LBP, and DCT (total 5 channels)
        combined_tensor = torch.cat([image_tensor, lbp_tensor, dct_tensor], dim=0)
        
        return combined_tensor, label

In [None]:
# ------------------------------------------
# Self-Attention Layer for each branch
# ------------------------------------------
class SelfAttention(nn.Module):
    def __init__(self, in_channels):
        super(SelfAttention, self).__init__()
        self.query = nn.Conv2d(in_channels, in_channels // 8, kernel_size=1)
        self.key = nn.Conv2d(in_channels, in_channels // 8, kernel_size=1)
        self.value = nn.Conv2d(in_channels, in_channels, kernel_size=1)
        self.gamma = nn.Parameter(torch.zeros(1))  # Learnable scaling
        self.softmax = nn.Softmax(dim=-1)
        
    def forward(self, x):
        batch_size, C, H, W = x.size()
        query = self.query(x).view(batch_size, -1, H*W).permute(0, 2, 1)  # (B, N, C')   B= Batch Size  C = No of Channels   C' = Reduced size of channel   H,W = height,width 
        key = self.key(x).view(batch_size, -1, H*W)                       # (B, C', N)
        value = self.value(x).view(batch_size, -1, H*W)                   # (B, C, N)
        
        attention = self.softmax(torch.bmm(query, key))                  # (B, N, N)
        out = torch.bmm(value, attention.permute(0, 2, 1))               # (B, C, N)
        out = out.view(batch_size, C, H, W)                              # (B, C, H, W)
        
        return self.gamma * out + x  # Residual connection

In [None]:
# ------------------------------------------
# Full Deepfake Detection Model with 3 branches
# ------------------------------------------
class DeepfakeDetectionModel(nn.Module):
    def __init__(self):
        super(DeepfakeDetectionModel, self).__init__()

        # Image CNN branch
        self.image_branch = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        self.image_attention = SelfAttention(128)

        # LBP branch
        self.lbp_branch = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
        )
        
        self.lbp_attention = SelfAttention(128)

        # DCT branch
        self.dct_branch = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )
        
        self.dct_attention = SelfAttention(128)

        # Flatten layers
        self.image_flatten = nn.Flatten()
        self.lbp_flatten = nn.Flatten()
        self.dct_flatten = nn.Flatten()

        # Feature dimensions
        self.image_feature_dim = 128 * 16 * 16
        self.lbp_feature_dim = 128 * 16 * 16
        self.dct_feature_dim = 128 * 16 * 16

        # Fusion & Classification
        self.fusion = nn.Sequential(
            nn.Linear(self.image_feature_dim + self.lbp_feature_dim + self.dct_feature_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 1)
        )

        self.train_loss = []
        self.train_accuracy = []
        self.val_loss = []
        self.val_accuracy = []

    def forward(self, x):
        # Split input into RGB, LBP, DCT
        image = x[:, :3, :, :]
        lbp = x[:, 3:4, :, :]
        dct = x[:, 4:5, :, :]

        # Apply CNN + attention for each stream
        image_features = self.image_branch(image)
        lbp_features = self.lbp_branch(lbp)
        dct_features = self.dct_branch(dct)

        image_features = self.image_attention(image_features)
        lbp_features = self.lbp_attention(lbp_features)
        dct_features = self.dct_attention(dct_features)

        image_features = self.image_flatten(image_features)
        lbp_features = self.lbp_flatten(lbp_features)
        dct_features = self.dct_flatten(dct_features)
        # Flatten and concatenate
        combined = torch.cat((image_features, lbp_features, dct_features), dim=1)
        output = self.fusion(combined)
        return output
    # ------------------------------------------
    # Model Training Loop
    # ------------------------------------------
    def train_model(self, train_loader, valid_loader, num_epochs, device):
        criterion = nn.BCEWithLogitsLoss()
        optimizer = optim.Adam(self.parameters(), lr=0.001, weight_decay=1e-5)
        scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=3, verbose=True, min_lr=1e-6)

        for epoch in range(num_epochs):
            self.train()
            total_loss = 0.0
            correct_train = 0
            total_train = 0

            print(f"\nEpoch [{epoch+1}/{num_epochs}] - Training...")

            for batch_idx, (images, labels) in enumerate(train_loader):
                images, labels = images.to(device), labels.to(device)
                optimizer.zero_grad()

                outputs = self(images)
                loss = criterion(outputs, labels.unsqueeze(1).float())
                loss.backward()
                optimizer.step()

                total_loss += loss.item() * images.size(0)
                predicted_labels = (outputs >= 0.0).float()
                correct_train += (predicted_labels == labels.unsqueeze(1)).sum().item()
                total_train += labels.size(0)

                if (batch_idx + 1) % 10 == 0:
                    print(f"Batch [{batch_idx+1}/{len(train_loader)}] - "
                          f"Loss: {loss.item():.4f} - Train Accuracy: {correct_train / total_train:.4f}")

            average_loss = total_loss / len(train_loader.dataset)
            train_accuracy = correct_train / total_train

            self.train_loss.append(average_loss)
            self.train_accuracy.append(train_accuracy)

            val_loss, val_accuracy = self.evaluate_model(valid_loader, device, eval_mode='Validation')
            self.val_loss.append(val_loss)
            self.val_accuracy.append(val_accuracy)

            print(f"Epoch [{epoch+1}/{num_epochs}] - "
                  f"Train Loss: {average_loss:.4f} - Train Accuracy: {train_accuracy:.4f} - "
                  f"Val Loss: {val_loss:.4f} - Val Accuracy: {val_accuracy:.4f} - "
                  f"LR: {scheduler.optimizer.param_groups[0]['lr']:.6f}")

            scheduler.step(val_loss)
    # ------------------------------------------
    # Evaluation for Validation and Test Set
    # ------------------------------------------
    def evaluate_model(self, data_loader, device, eval_mode='Test'):
        self.eval()
        y_true = []
        y_pred = []
        correct = 0
        total = 0
        criterion = nn.BCEWithLogitsLoss()
        total_loss = 0.0

        with torch.no_grad():
            for images, labels in data_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = self(images)
                loss = criterion(outputs, labels.unsqueeze(1).float())
                total_loss += loss.item() * images.size(0)

                predicted_labels = (outputs >= 0.0).float()
                correct += (predicted_labels == labels.unsqueeze(1)).sum().item()
                total += labels.size(0)

                y_true.extend(labels.cpu().numpy())
                y_pred.extend(predicted_labels.cpu().numpy())

        average_loss = total_loss / len(data_loader.dataset)
        accuracy = correct / total

        print(f"\n{eval_mode} Loss: {average_loss:.4f} - {eval_mode} Accuracy: {accuracy:.4f}")

        if eval_mode == 'Test':
            print("\nClassification Report:")
            print(classification_report(y_true, y_pred))

        return average_loss, accuracy

In [None]:
train_dir = "path to train dataset"
val_dir = "path to valid dataset"
test_dir = "path to test dataset"

    # Transforms for better learning
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

    # Create datasets
train_dataset = CustomDataset(ImageFolder(train_dir), train_transform)
val_dataset = CustomDataset(ImageFolder(val_dir), val_test_transform)
test_dataset = CustomDataset(ImageFolder(test_dir), val_test_transform)

    # Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

    # Initialize model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DeepfakeDetectionModel().to(device)

    # Train the model
model.train_model(train_loader, val_loader, num_epochs=20, device=device)

    # Final evaluation
print("\nFinal Test Evaluation:")
model.evaluate_model(test_loader, device)


Epoch [1/20] - Training...
Batch [10/391] - Loss: 2.8962 - Train Accuracy: 0.4969
Batch [20/391] - Loss: 1.1342 - Train Accuracy: 0.5062
Batch [30/391] - Loss: 0.8650 - Train Accuracy: 0.5073
Batch [40/391] - Loss: 0.7600 - Train Accuracy: 0.5027
Batch [50/391] - Loss: 0.7060 - Train Accuracy: 0.5028
Batch [60/391] - Loss: 0.6785 - Train Accuracy: 0.5086
Batch [70/391] - Loss: 0.6326 - Train Accuracy: 0.5196
Batch [80/391] - Loss: 0.6984 - Train Accuracy: 0.5240
Batch [90/391] - Loss: 0.6799 - Train Accuracy: 0.5297
Batch [100/391] - Loss: 0.6627 - Train Accuracy: 0.5364
Batch [110/391] - Loss: 0.5917 - Train Accuracy: 0.5435
Batch [120/391] - Loss: 0.6634 - Train Accuracy: 0.5520
Batch [130/391] - Loss: 0.6216 - Train Accuracy: 0.5573
Batch [140/391] - Loss: 0.6616 - Train Accuracy: 0.5590
Batch [150/391] - Loss: 0.5683 - Train Accuracy: 0.5640
Batch [160/391] - Loss: 0.7065 - Train Accuracy: 0.5676
Batch [170/391] - Loss: 0.6313 - Train Accuracy: 0.5712
Batch [180/391] - Loss: 0.624

(0.43363695738613606, 0.9284)