## Library Imports

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
from sklearn.metrics import accuracy_score
import torch.nn.functional as F
import numpy as np
import time
import os

## Data Preprocessing and Augmentation

In [8]:
# define a series of transformations for preprocessing and augmenting the training images
train_transform = transforms.Compose([
    # Resize and crop the image to 224x224 pixels
    transforms.RandomResizedCrop(224),
    
    # Horizontal flip to increase image diversity
    transforms.RandomHorizontalFlip(),
    
    # Rotation of the image with a maximum of 15 degrees
    transforms.RandomRotation(15),
    
    # Adjustment of brightness, contrast, saturation, and hue to simulate different lighting conditions
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.3),
    
    # Apply either Gaussian blur (7x7) or affine transformations (rotation, translation) with a 40% probability
    transforms.RandomApply([transforms.GaussianBlur(7), transforms.RandomAffine(degrees=15)], p=0.4),
    
    # Convert the image to a PyTorch tensor
    transforms.ToTensor(),
    
    # Normalize the pixel values using the ImageNet mean and standard deviation (commonly used for pre-trained models)
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# The transformations for validation and test images are simpler
val_test_transform = transforms.Compose([
    # Resize the image to 256 pixels along the longest side
    transforms.Resize(256),
    
    # Perform a central crop to get a 224x224 image
    transforms.CenterCrop(224),
    
    # Convert the image to a tensor
    transforms.ToTensor(),
    
    # Normalize the pixel values using the same ImageNet mean and standard deviation
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load the training dataset with the applied transformations
train_data = datasets.ImageFolder("/kaggle/input/deepfake-and-real-images/Dataset/Train", transform=train_transform)

# Load the validation dataset with the applied validation transformations
val_data = datasets.ImageFolder("/kaggle/input/deepfake-and-real-images/Dataset/Validation", transform=val_test_transform)

# Load the test dataset with the applied test transformations
test_data = datasets.ImageFolder("/kaggle/input/deepfake-and-real-images/Dataset/Test", transform=val_test_transform)


## Image Segmentation Model Setup

In [9]:
import torchvision.models.segmentation as segmentation
import torchvision.transforms.functional as TF

# Defines a model wrapper for the pre-trained DeepLabV3 segmentation model
class DeepLabV3(nn.Module):
    def __init__(self):
        super(DeepLabV3, self).__init__()
        self.model = segmentation.deeplabv3_resnet101(pretrained=True, progress=True)

    def forward(self, x):
        return self.model(x)['out']
    
# Performs segmentation on a single image, generating a mask
def segment_image(model, image, device):
    model.eval()
    image = image.unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(image)
    mask = output.argmax(1).squeeze().cpu().numpy()
    return mask

# Applies segmentation to an image and returns the masked image
class SegmentationTransform:
    def __init__(self, segmentation_model, device):
        self.segmentation_model = segmentation_model
        self.device = device

    def __call__(self, img):
        img_tensor = TF.to_tensor(img).to(self.device)
        mask = segment_image(self.segmentation_model, img_tensor, self.device)
        mask_tensor = torch.tensor(mask, dtype=torch.float32).unsqueeze(0).to(self.device)
        img = TF.to_tensor(img).to(self.device)
        return img * mask_tensor

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
segmentation_model = DeepLabV3().to(device)

Downloading: "https://download.pytorch.org/models/deeplabv3_resnet101_coco-586e9e4e.pth" to /root/.cache/torch/hub/checkpoints/deeplabv3_resnet101_coco-586e9e4e.pth
100%|██████████| 233M/233M [00:01<00:00, 191MB/s]  


## Attention Mechanism and Adaptive Weighting Module

In [10]:
# Checks if a GPU (CUDA-enabled) is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Attention mechanism
class GLCSAttention(nn.Module):
    def __init__(self, in_channels):
        super(GLCSAttention, self).__init__()
        # 1x1 convolutions that help to capture local and global channel attention information.
        self.conv1x1_local = nn.Conv2d(in_channels, in_channels, kernel_size=1)
        self.conv1x1_global_1 = nn.Conv2d(in_channels, in_channels, kernel_size=1)
        self.conv1x1_global_2 = nn.Conv2d(in_channels, in_channels, kernel_size=1)
        # Capture spatial attention at multiple scales.
        self.conv3x3 = nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1)
        self.conv5x5 = nn.Conv2d(in_channels, in_channels, kernel_size=5, padding=2)
        self.conv7x7 = nn.Conv2d(in_channels, in_channels, kernel_size=7, padding=3)
        # Activation function to normalize the attention maps between 0 and 1
        self.sigmoid = nn.Sigmoid()
# Defines the forward pass for the attention mechanism
    def forward(self, x):
        # Compute local channel attention and apply to input
        local_channel_attention = self.sigmoid(self.conv1x1_local(F.adaptive_avg_pool2d(x, (1, 1))))
        local_channel_attention = x * local_channel_attention
        # Compute global channel attention and apply to input
        global_channel_attention = F.adaptive_avg_pool2d(x, (1, 1))
        global_channel_attention = self.sigmoid(self.conv1x1_global_1(global_channel_attention) * self.conv1x1_global_2(global_channel_attention))
        global_channel_attention = x * global_channel_attention
        # Compute local spatial attention using multiple kernel sizes and apply to input
        local_spatial_attention = self.conv1x1_local(x)
        local_spatial_attention = self.conv3x3(local_spatial_attention) + self.conv5x5(local_spatial_attention) + self.conv7x7(local_spatial_attention)
        local_spatial_attention = self.sigmoid(local_spatial_attention)
        local_spatial_attention = x * local_spatial_attention
        # Compute global spatial attention and apply to input
        global_spatial_attention = self.conv1x1_local(x)
        global_spatial_attention = self.sigmoid(global_spatial_attention * self.conv1x1_local(global_spatial_attention))
        global_spatial_attention = x * global_spatial_attention
        # Combine local and global attention (both channel and spatial) to get the final attention map
        final_attention = (0.5 * local_channel_attention + 0.5 * global_channel_attention) * (0.5 * local_spatial_attention + 0.5 * global_spatial_attention)
        return final_attention


class AdaptivelyWeightedMultiScaleAttention(nn.Module):
    def __init__(self, scales):
        super(AdaptivelyWeightedMultiScaleAttention, self).__init__()
        # Define GLCSAttention modules for each scale and the learnable weights
        self.scales = scales
        self.glcs_attention = nn.ModuleList([GLCSAttention(scale) for scale in scales])
        self.weights = nn.Parameter(torch.ones(len(scales)) * 0.25)

    def forward(self, features):
        weighted_features = []
        for i, feature in enumerate(features):
            attention = self.glcs_attention[i](feature)
            weight = self.weights[i]
            weighted_feature = F.adaptive_max_pool2d(attention, (1, 1)) * weight
            weighted_features.append(weighted_feature)
        return torch.cat(weighted_features, dim=1)

## FaceNeSt Model Definition

In [11]:
class FaceNeSt(nn.Module):
    def __init__(self, num_classes=2):
        super(FaceNeSt, self).__init__()
        self.initial_layers = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        self.resnet_blocks = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(64, 256, kernel_size=1, stride=1, padding=0),
                nn.BatchNorm2d(256),
                nn.ReLU(inplace=True),
            ),
            nn.Sequential(
                nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1),
                nn.BatchNorm2d(512),
                nn.ReLU(inplace=True),
            ),
            nn.Sequential(
                nn.Conv2d(512, 1024, kernel_size=3, stride=2, padding=1),
                nn.BatchNorm2d(1024),
                nn.ReLU(inplace=True),
            ),
        ])
        self.adaptive_attention = AdaptivelyWeightedMultiScaleAttention([64, 256, 512, 1024])
        self.conv1x1 = nn.Conv2d(sum([64, 256, 512, 1024]), 512, kernel_size=1)
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.initial_layers(x)
        features = [x]
        for block in self.resnet_blocks:
            x = block(x)
            features.append(x)
        x = self.adaptive_attention(features)
        x = self.conv1x1(x)
        x = self.global_avg_pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

model = FaceNeSt(num_classes=2).to(device)

## Data Sampling and DataLoader Setup

In [12]:
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from sklearn.metrics import accuracy_score
from torch.amp import GradScaler, autocast
import numpy as np
import os
import matplotlib.pyplot as plt

subset_proportion = 0.1

train_indices = np.random.choice(len(train_data), int(len(train_data) * subset_proportion), replace=False)
val_indices = np.random.choice(len(val_data), int(len(val_data) * subset_proportion), replace=False)
test_indices = np.random.choice(len(test_data), int(len(test_data) * subset_proportion), replace=False)

train_subset = Subset(train_data, train_indices)
val_subset = Subset(val_data, val_indices)
test_subset = Subset(test_data, test_indices)

train_loader = DataLoader(train_subset, batch_size=32, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_subset, batch_size=32, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_subset, batch_size=32, shuffle=False, num_workers=2, pin_memory=True)

## Training Setup and Optimization Configuration

In [13]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.01)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)
scaler = GradScaler('cuda')

## Training & Evaluation with Accumulation

In [14]:
def train_with_accumulation(model, loader, criterion, optimizer, device, accumulation_steps):
    model.train()
    running_loss = 0.0
    optimizer.zero_grad()
    for i, (inputs, labels) in enumerate(loader):
        inputs, labels = inputs.to(device), labels.to(device)
        with autocast('cuda'):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
        loss = loss / accumulation_steps
        scaler.scale(loss).backward()
        if (i + 1) % accumulation_steps == 0:
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
        running_loss += loss.item() * inputs.size(0) * accumulation_steps
    epoch_loss = running_loss / len(loader.dataset)
    return epoch_loss

def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            with autocast('cuda'):
                outputs = model(inputs)
                loss = criterion(outputs, labels)
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    epoch_loss = running_loss / len(loader.dataset)
    accuracy = accuracy_score(all_labels, all_preds)
    return epoch_loss, accuracy

## Training Parameters and Metrics Initialization

In [15]:
num_epochs = 15
best_val_acc = 0.0
accumulation_steps = 8
patience = 5
early_stopping_counter = 0

train_losses = []
val_losses = []
val_accuracies = []

##  Training

In [16]:
from tqdm import tqdm
import time

best_val_acc = 0
early_stopping_counter = 0

for epoch in range(num_epochs):
    
    start_time = time.time()
    
    # Training loop with progress bar
    model.train()  # Set model to training mode
    train_loss = 0
    correct_predictions = 0
    total_samples = 0
    for batch_idx, (inputs, targets) in enumerate(tqdm(train_loader, desc=f"Training Epoch {epoch+1}", leave=False)):
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        
        if (batch_idx + 1) % accumulation_steps == 0:  # Gradient accumulation
            optimizer.step()
        
        train_loss += loss.item()
        
        # Calculate accuracy for training set (if classification)
        _, predicted = torch.max(outputs, 1)
        total_samples += targets.size(0)
        correct_predictions += (predicted == targets).sum().item()

    # Calculate average training loss and accuracy
    train_loss /= len(train_loader)
    train_accuracy = correct_predictions / total_samples

    # Validation loop with progress bar
    model.eval()  # Set model to evaluation mode
    val_loss = 0
    correct_predictions = 0
    total_samples = 0
    with torch.no_grad():  
        for inputs, targets in tqdm(val_loader, desc=f"Validating Epoch {epoch+1}", leave=False):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            val_loss += loss.item()
            
            # Calculate accuracy for validation set (if classification)
            _, predicted = torch.max(outputs, 1)
            total_samples += targets.size(0)
            correct_predictions += (predicted == targets).sum().item()

    # Calculate average validation loss and accuracy
    val_loss /= len(val_loader)
    val_accuracy = correct_predictions / total_samples

    # Scheduler step based on validation loss
    scheduler.step(val_loss)

    end_time = time.time()

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)
    
    total_time = int(end_time - start_time)

    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, '
          f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}\n'
          f'Time taken in Epoch {epoch+1}: {total_time//60:.0f} minutes and {total_time%60} seconds', end="\n\n")
    
    if val_accuracy > best_val_acc:
        best_val_acc = val_accuracy
        torch.save(model.state_dict(), 'best_model.pth')
#         early_stopping_counter = 0
#     else:
#         early_stopping_counter += 1

#     if early_stopping_counter >= patience:
#         print("Early stopping")
#         break


                                                                     

Epoch 1/15, Train Loss: 0.8054, Train Accuracy: 0.5179, Val Loss: 0.6926, Val Accuracy: 0.5335
Time taken in Epoch 1: 13 minutes and 15 seconds



                                                                     

Epoch 2/15, Train Loss: 0.6922, Train Accuracy: 0.5388, Val Loss: 0.6866, Val Accuracy: 0.5596
Time taken in Epoch 2: 13 minutes and 23 seconds



                                                                     

Epoch 3/15, Train Loss: 0.6900, Train Accuracy: 0.5397, Val Loss: 0.6823, Val Accuracy: 0.5540
Time taken in Epoch 3: 13 minutes and 23 seconds



                                                                     

Epoch 4/15, Train Loss: 0.6820, Train Accuracy: 0.5591, Val Loss: 0.6660, Val Accuracy: 0.5994
Time taken in Epoch 4: 13 minutes and 23 seconds



                                                                     

Epoch 5/15, Train Loss: 0.6923, Train Accuracy: 0.5464, Val Loss: 0.6739, Val Accuracy: 0.5619
Time taken in Epoch 5: 13 minutes and 23 seconds



                                                                     

Epoch 6/15, Train Loss: 0.6752, Train Accuracy: 0.5813, Val Loss: 0.6649, Val Accuracy: 0.5987
Time taken in Epoch 6: 13 minutes and 23 seconds



                                                                     

Epoch 7/15, Train Loss: 0.6862, Train Accuracy: 0.5625, Val Loss: 0.7020, Val Accuracy: 0.5096
Time taken in Epoch 7: 13 minutes and 23 seconds



                                                                     

Epoch 8/15, Train Loss: 0.6772, Train Accuracy: 0.5736, Val Loss: 0.6565, Val Accuracy: 0.6175
Time taken in Epoch 8: 13 minutes and 23 seconds



                                                                     

Epoch 9/15, Train Loss: 0.6775, Train Accuracy: 0.5881, Val Loss: 0.6524, Val Accuracy: 0.6284
Time taken in Epoch 9: 13 minutes and 23 seconds



                                                                      

Epoch 10/15, Train Loss: 0.6705, Train Accuracy: 0.5851, Val Loss: 0.6494, Val Accuracy: 0.6225
Time taken in Epoch 10: 13 minutes and 24 seconds



                                                                      

Epoch 11/15, Train Loss: 0.6685, Train Accuracy: 0.5926, Val Loss: 0.6301, Val Accuracy: 0.6547
Time taken in Epoch 11: 13 minutes and 24 seconds



                                                                      

Epoch 12/15, Train Loss: 0.6610, Train Accuracy: 0.6069, Val Loss: 0.6411, Val Accuracy: 0.6385
Time taken in Epoch 12: 13 minutes and 23 seconds



                                                                      

Epoch 13/15, Train Loss: 0.6514, Train Accuracy: 0.6139, Val Loss: 0.6175, Val Accuracy: 0.6555
Time taken in Epoch 13: 13 minutes and 23 seconds



                                                                      

Epoch 14/15, Train Loss: 0.6496, Train Accuracy: 0.6170, Val Loss: 0.6164, Val Accuracy: 0.6626
Time taken in Epoch 14: 13 minutes and 23 seconds



                                                                      

Epoch 15/15, Train Loss: 0.6457, Train Accuracy: 0.6238, Val Loss: 0.6202, Val Accuracy: 0.6550
Time taken in Epoch 15: 13 minutes and 24 seconds





In [17]:
# for epoch in range(num_epochs):
    
#     start_time = time.time()
#     train_loss = train_with_accumulation(model, train_loader, criterion, optimizer, device, accumulation_steps)
#     val_loss, val_acc = evaluate(model, val_loader, criterion, device)
#     scheduler.step(val_loss)
#     end_time = time.time()

#     train_losses.append(train_loss)
#     val_losses.append(val_loss)
#     val_accuracies.append(val_acc)
    
#     total_time = int(end_time - start_time)

#     print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}\nTime taken in Epoch {epoch + 1}: {total_time/60:.0f} minutes and {total_time%60} seconds', end="\n\n")
    
#     if val_acc > best_val_acc:
#         best_val_acc = val_acc
#         torch.save(model.state_dict(), 'best_model.pth')
#         early_stopping_counter = 0
#     else:
#         early_stopping_counter += 1

#     if early_stopping_counter >= patience:
#         print("Early stopping")
#         break

## Visualization of Training and Validation Loss/Accuracy Over Epochs

In [28]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Create a subplot with 1 row and 2 columns
fig = make_subplots(rows=1, cols=2, subplot_titles=("Loss over Epochs", "Accuracy over Epochs"))

# First subplot - Train and Validation Loss
fig.add_trace(go.Scatter(x=list(range(len(train_losses))), y=train_losses, mode='lines', name='Train Loss'), row=1, col=1)
fig.add_trace(go.Scatter(x=list(range(len(val_losses))), y=val_losses, mode='lines', name='Validation Loss'), row=1, col=1)

# Second subplot - Validation Accuracy
fig.add_trace(go.Scatter(x=list(range(len(val_accuracies))), y=val_accuracies, mode='lines', name='Validation Accuracy'), row=1, col=2)

# Update layout and axis titles
fig.update_layout(title_text="Training and Validation Metrics Over Epochs")
fig.update_xaxes(title_text="Epochs", row=1, col=1)
fig.update_yaxes(title_text="Loss", row=1, col=1)
fig.update_xaxes(title_text="Epochs", row=1, col=2)
fig.update_yaxes(title_text="Accuracy", row=1, col=2)

fig.show()

## Model Evaluation on Test Dataset

In [22]:
model.load_state_dict(torch.load('best_model.pth'))
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')


You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.



Test Loss: 0.6209, Test Accuracy: 0.6716
