In [1]:
#imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, ConcatDataset
from torchvision import transforms, datasets
from timm import create_model
import timm.data
import timm
import time
import os
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import _LRScheduler
from sklearn.model_selection import KFold
import torch.nn.functional as F
import numpy as np

In [None]:
#checking CUDA-torch compatibility
!nvcc --version
print(f"\ntorch CUDA version: {torch.cuda_version}")

In [None]:
#checking for GPU availability (local_runtime)
print(f'CUDA available: {torch.cuda.is_available()}')
print(f'Number of GPUs: {torch.cuda.device_count()}')
print(f'Current GPU: {torch.cuda.current_device()}')
print(f'GPU name: {torch.cuda.get_device_name(torch.cuda.current_device())}')

In [94]:
#creation of custom dataset
class DeepfakeDataset(Dataset):
    def __init__(self, root_dir, transform=None, multi= False):
        self.multi= multi
        if self.multi:
          self.dataset= ConcatDataset([datasets.ImageFolder(root= root, transform= transform) for root in root_dir])
        else:
          self.dataset= datasets.ImageFolder(root=root_dir, transform=transform)

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        return self.dataset[idx]
      
#corruption function
def corrupt_data(image, alpha_t):
    image_cpu= image.cpu().numpy()
    noise= np.random.normal(0, 1, image_cpu.shape)
    corrupted_data= np.sqrt(alpha_t)* image_cpu+ np.sqrt(1 - alpha_t)* noise
    
    return torch.tensor(corrupted_data).to(image.device)

#model training sqeuence
def train_model_fine(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=25, type='efficient'):
    start_time= time.time()

    train_losses= []
    val_losses= []
    accuracies= []
    best_accuracy= 0.0
    best_val_loss= 0.0

    for epoch in range(num_epochs):
        current_time= time.time()
        
        #training sequence
        model.train()
        running_loss= 0.0
        print(f"Epoch {epoch+1}/{num_epochs} is Starting")

        for images, labels in train_loader:
            images, labels= images.to('cuda'), labels.to('cuda')
            optimizer.zero_grad()

            if type== 'diffusion':
              corrupted_data= images.clone()
              for i in range(corrupted_data.size(0)// 2):
                  alpha_t= np.random.uniform(0.1, 0.9)
                  corrupted_data[i]= corrupt_data(images[i], alpha_t)
              
              outputs= model(corrupted_data).to('cuda')
              loss= criterion(outputs, labels)
            
            else:
              outputs= model(images)
              loss= criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            running_loss+= loss.item()

        epoch_loss= running_loss/ len(train_loader)
        train_losses.append(epoch_loss)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')

        #validation sequence
        model.eval()
        val_loss= 0.0
        correct= 0
        total= 0

        with torch.no_grad():
            for images, labels in val_loader:
                images, labels= images.to('cuda'), labels.to('cuda')

                if type== 'diffusion':
                  corrupted_data= images.clone()
                  for i in range(corrupted_data.size(0)// 2):
                      alpha_t= np.random.uniform(0.1, 0.9)
                      corrupted_data[i]= corrupt_data(images[i], alpha_t)
                      
                  outputs= model(corrupted_data).to('cuda')
                  loss= criterion(outputs, labels)
                
                else:
                  outputs= model(images)
                  loss= criterion(outputs, labels)
                
                val_loss+= loss.item()
                _, predicted= torch.max(outputs.data, 1)
                total+= labels.size(0)
                correct+= (predicted== labels).sum().item()

        val_loss/= len(val_loader)
        accuracy= 100 * correct/ total
        val_losses.append(val_loss)
        accuracies.append(accuracy)

        print(f'Validation Loss: {val_loss:.4f}, Accuracy: {accuracy:.2f}%')
        print("--- %s Seconds ---\n" % (time.time()- current_time))
        
        #early stoppage
        if best_accuracy> accuracy and best_val_loss< val_loss:
          print(f'Early Stopping at Epoch {epoch+ 1}')
          break
        else:
          best_accuracy= accuracy
          best_val_loss= val_loss

        scheduler.step()

    print('Training Complete')
    print("Total Time:--- %s Minutes ---" % ((time.time()- start_time)/ 60))

    #plotting loss and accuracy
    plt.figure(figsize=(10, 5))

    #plotting training and validation loss
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    #plotting validation accuracy
    plt.subplot(1, 2, 2)
    plt.plot(accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Validation Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [48]:
#dataset configurations
data_cfg= timm.data.resolve_data_config({'input_size': (3, 224, 224)})
data_transforms= {
    'train': transforms.Compose([timm.data.create_transform(**data_cfg, is_training= True),
                                 transforms.Normalize(mean= [0.485, 0.456, 0.406], std= [0.229, 0.224, 0.225])]),
    'val': transforms.Compose([timm.data.create_transform(**data_cfg, is_training= False),
                                 transforms.Normalize(mean= [0.485, 0.456, 0.406], std= [0.229, 0.224, 0.225])]),
}

#dataset paths
train_data_path= r'TRAINING_IMAGES_PATHS'
val_data_path= r'VALIDATION_IMAGES_PATHS'

#initializing datasets
train_dataset_fine= DeepfakeDataset(root_dir= train_data_path, transform= data_transforms['train'])
val_dataset_fine= DeepfakeDataset(root_dir= val_data_path, transform= data_transforms['val'], multi= True)

#initializing dataloaders
batch_size= 64
train_loader_fine= DataLoader(train_dataset_fine, batch_size= batch_size, shuffle= True, num_workers= 0)
val_loader_fine= DataLoader(val_dataset_fine, batch_size= batch_size, shuffle= False, num_workers= 0)

In [None]:
#loading pre-trained EfficientNetV2 model
model_pre= timm.create_model("hf_hub:timm/tf_efficientnetv2_b0.in1k", pretrained= True)
model_pre.reset_classifier(num_classes= 2)
model_pre= model_pre.to('cuda')
print("Model Successfully Loaded")

#defining loss function, optimizer and scheduler
criterion= nn.CrossEntropyLoss()
learning_rate= 1e-4
optimizer= optim.AdamW(model_pre.parameters(), lr= learning_rate , betas= (0.925, 0.995), weight_decay= 0.01125, amsgrad= True)
scheduler= optim.lr_scheduler.StepLR(optimizer, step_size= 7, gamma= 0.1)

In [None]:
#fine-tuning main
train_model_fine(model_pre, train_loader_fine, val_loader_fine, criterion, optimizer, scheduler, num_epochs= 15)

In [None]:
#saving fine-truned EfficientNetV2 model
model_save_path = r'MODEL_SAVE_PATH'
torch.save(model_pre, model_save_path)
print(f'Model Saved to {model_save_path}')

In [95]:
#diffusion layer replacing classifier
class DiffusionLayer(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes, num_steps):
        super(DiffusionLayer, self).__init__()
        self.num_steps= num_steps
        
        self.encoder= nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim// 2),
            nn.ReLU()
        )
        
        self.decoder= nn.Sequential(
            nn.Linear(hidden_dim// 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )
        self.classifier= nn.Sequential(
            nn.Linear(input_dim, input_dim// 2),
            nn.BatchNorm1d(input_dim// 2),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(input_dim// 2, input_dim// 4),
            nn.BatchNorm1d(input_dim// 4),
            nn.Tanh(),
            nn.Dropout(0.6),
            nn.Linear(input_dim// 4, input_dim// 6),
            nn.BatchNorm1d(input_dim// 6),
            nn.SELU(),
            nn.Dropout(0.7),
            nn.Linear(input_dim// 6, num_classes)
        )
    
    def forward(self, x):
        noise_levels= [1/ (step+ 1)** 2 for step in range(self.num_steps)]
        for step in reversed(range(self.num_steps)):
            noise= torch.randn_like(x)* noise_levels[step]
            x_noisy= x+ noise
            prior= self.encoder(x)
            denoised= self.decoder(prior)
            x= x_noisy- denoised
        x= self.classifier(x)
        return x

In [None]:
#dataset configurations
data_cfg= timm.data.resolve_data_config({'input_size': (3, 224, 224)})
data_transforms= {
    'train': transforms.Compose([timm.data.create_transform(**data_cfg, is_training= True),
                                 transforms.Normalize(mean= [0.485, 0.456, 0.406], std= [0.229, 0.224, 0.225])]),
    'val': transforms.Compose([timm.data.create_transform(**data_cfg, is_training= False),
                                 transforms.Normalize(mean= [0.485, 0.456, 0.406], std= [0.229, 0.224, 0.225])]),
}

#dataset paths
train_data_path= r'TRAINING_IMAGES_PATHS'
val_data_path= r'VALIDATION_IMAGES_PATHS'

#initializing datasets
train_dataset_diff= DeepfakeDataset(root_dir= train_data_path, transform= data_transforms['train'])
val_dataset_diff= DeepfakeDataset(root_dir= val_data_path, transform= data_transforms['val'])

#initializing dataloaders
batch_size= 128
train_loader_diff= DataLoader(train_dataset_diff, batch_size= batch_size, shuffle= True, num_workers= 0)
val_loader_diff= DataLoader(val_dataset_diff, batch_size= batch_size, shuffle= False, num_workers= 0)

#load the fine-tuned EfficientNetV2 model
model_fine= torch.load(r'MODEL_LOAD_PATH')
model_fine.train()

#freezing layers
for param in model_fine.parameters():
    param.requires_grad= False

#replacing classifier
input_dim= model_fine.classifier.in_features
hidden_dim= input_dim* 4
num_classes= 2
num_steps= 3
model_fine.classifier= DiffusionLayer(input_dim, hidden_dim, num_classes, num_steps).to('cuda')

#unfreezing DiffusionLayer()
for name, param in model_fine.named_parameters():
  if 'classifier' in name or 'fc' in name:
    param.requires_grad= True

model_fine= model_fine.to('cuda')
print("Model Successfully Loaded")

In [None]:
#defining loss function, optimizer and scheduler
learning_rate= 0.00095
criterion= nn.CrossEntropyLoss()
optimizer= optim.AdamW(filter(lambda p: p.requires_grad, model_fine.parameters()), lr= learning_rate, betas= (0.895, 0.995), weight_decay= 0.01250, amsgrad=True)
scheduler= optim.lr_scheduler.StepLR(optimizer, step_size= 12, gamma= 0.1111)

#training sequence for DiffusionLayer()
train_model_fine(model_fine, train_loader_diff, val_loader_diff, criterion, optimizer, scheduler, num_epochs= 10, type= 'diffusion')

#unfreezing all layers
for param in model_fine.parameters():
    param.requires_grad= True
learning_rate= 0.000127

#training sequence for diffusion classification model
train_model_fine(model_fine, train_loader_diff, val_loader_diff, criterion, optimizer, scheduler, num_epochs= 15, type= 'diffusion')

In [None]:
#saving diffusion classification model
model_save_path = r'MODEL_SAVE_PATH'
torch.save(model_fine.state_dict(), model_save_path)
print(f'Model Saved to {model_save_path}')

In [None]:
#load the EfficientNetV2-Diffusion model
model_diff= model_fine
model_load_state= r'MODEL_LOAD_PATH'
model_diff.load_state_dict(torch.load(model_load_state))
model_diff= model_diff.to('cuda')  # Move the model to GPU
model_diff.eval()
print("Model Successfully Loaded")

#load ground truth CSV
ground_truth_path= r'GROUND_TRUTH_CSV'
ground_truth_df= pd.read_csv(ground_truth_path)

#dataset configurations
data_cfg= timm.data.resolve_data_config({'input_size': (3, 224, 224)})
transform= transforms.Compose([timm.data.create_transform(**data_cfg, is_training= False),
                                transforms.Resize((224, 224)),
                                 transforms.Normalize(mean= [0.485, 0.456, 0.406], std= [0.229, 0.224, 0.225])])

#test data path
test_data_path= r'TEST_IMAGES_PATH'

#loading images for testing
def load_image(image_path):
    image= Image.open(image_path).convert('RGB')
    image= transform(image)
    return image.unsqueeze(0)

#predictions on test set
predictions = []

for index, row in ground_truth_df.iterrows():
    start_time= time.time()
    image_path= os.path.join(test_data_path, row['Image'])
    image= load_image(image_path)
    image= image.to('cuda')  # Move the image to GPU
    with torch.no_grad():
        output= model_fine(image)
        _, predicted= torch.max(output, 1)
        predictions.append(predicted.item())
    if index% 1000== 0:
        print(f'Processed {index} Images.')
        print("--- %s Seconds ---\n" % (time.time()- start_time))
ground_truth_df['Prediction']= predictions


#calculating accuracy
accuracy= (ground_truth_df['Label']== ground_truth_df['Prediction']).mean()
print(f'Accuracy: {accuracy:.2f}')