In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
import torch.hub
from torchvision.models import efficientnet_b3,densenet161, EfficientNet_B3_Weights, DenseNet161_Weights, resnet152, ResNet152_Weights, vgg16, VGG16_Weights, vgg19, VGG19_Weights
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
from PIL import Image
import os
import numpy as np

# Dataset definition

In [None]:
class ImageAuthenticityDataset(Dataset):
    """Dataset for image quality assessment."""

    def __init__(self, csv_file, transform=None):
        """
        Args:
            csv_file (string): Path to the CSV file with annotations.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.data = pd.read_csv(csv_file)
        self.transform = transform
        self.dir_path = os.path.dirname(csv_file)  # Directory of the CSV file

    def __len__(self):
        """Returns the number of samples in the dataset."""
        return len(self.data)

    def __getitem__(self, idx):
        """
        Retrieves an image and its labels by index.

        Args:
            idx (int): Index of the sample to retrieve.

        Returns:
            tuple: A tuple (image, labels) where:
                image (PIL.Image): The image.
                labels (torch.Tensor): Tensor containing quality and authenticity scores.
        """
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # TODO: to be fixed, right now is folder dependent
        # Assuming your CSV has the relative path in the 4th column (index 3)
        # And you need to adjust the path to be relative to where you run the script
        img_name = self.data.iloc[idx, 3].replace("./", "../")
        image = Image.open(img_name).convert('RGB')
        
        # Assuming authenticity is in the 2nd column (index 1) and is a float score
        authenticity = self.data.iloc[idx, 1]  
        # Note: Your dataset returns a float tensor of shape [1]. 
        # This is suitable for regression tasks or specific binary setups.
        # For standard binary classification (e.g., with CrossEntropyLoss), 
        # you might need an integer tensor (0 or 1). Adjust if needed.
        labels = torch.tensor([authenticity], dtype=torch.float) 


        if self.transform:
            image = self.transform(image)

        return image, labels

# Models definitions

In [None]:
class BarlowTwinsAuthenticityPredictor(nn.Module):
    def __init__(self, freeze_backbone=True):
        super().__init__()
        # Load pre-trained BarlowTwins ResNet50 instead of ResNet-152
        barlow_twins_resnet = torch.hub.load('facebookresearch/barlowtwins:main', 'resnet50')
        
        # Freeze backbone if requested
        if freeze_backbone:
            for param in barlow_twins_resnet.parameters():
                param.requires_grad = False
                
        self.features = nn.Sequential(*list(barlow_twins_resnet.children())[:-2])
        self.avgpool = barlow_twins_resnet.avgpool
        
        
        self.regression_head = nn.Sequential(
                nn.Linear(2048, 512),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(512, 128),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(128, 1)
            )    
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        predictions = self.regression_head(x)
        return predictions, x 
    
class EfficientNetB3AuthenticityPredictor(nn.Module):
    def __init__(self, freeze_backbone=True):
        super().__init__()
        # Load pre-trained VGG16
        efficent_net = efficientnet_b3(weights=EfficientNet_B3_Weights.DEFAULT)
        
        # Freeze backbone if requested
        if freeze_backbone:
            for param in efficent_net.features.parameters():
                param.requires_grad = False
                
        # Extract features up to fc2
        self.features = efficent_net.features
        self.avgpool = efficent_net.avgpool
        
        
        # New regression head for EfficientNet
        self.regression_head = nn.Sequential(
            nn.Linear(1536, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 1)  # Predict authenticity
        )
        
    def forward(self, x):
        # Pass through the backbone features
        x = self.features(x)
        # Apply pooling
        x = self.avgpool(x)
        # Flatten the features
        features = torch.flatten(x, 1)
        # Pass through regression head
        predictions = self.regression_head(features)
        
        return predictions, features
    
class DenseNet161AuthenticityPredictor(nn.Module):
    def __init__(self, freeze_backbone=True):
        super().__init__()
        # Load pre-trained DenseNet-161
        densenet = densenet161(weights=DenseNet161_Weights.DEFAULT)
        
        # Freeze backbone if requested
        if freeze_backbone:
            for param in densenet.parameters():
                param.requires_grad = False
                
        # Store the features
        self.features = densenet.features
        
        # DenseNet already includes a ReLU and pooling after features
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        
        # DenseNet-161's output feature dimension is 2208 instead of 2048
        self.regression_head = nn.Sequential(
                nn.Linear(2208, 512),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(512, 128),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(128, 1)
            )
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        predictions = self.regression_head(x)
        return predictions, x  # Return predictions and features
    
class ResNet152AuthenticityPredictor(nn.Module):
    def __init__(self, freeze_backbone=True):
        super().__init__()
        # Load pre-trained ResNet-152 instead of VGG16
        resnet = resnet152(weights=ResNet152_Weights.DEFAULT)
        
        # Freeze backbone if requested
        if freeze_backbone:
            for param in resnet.parameters():
                param.requires_grad = False
                
        # Store the backbone (excluding the final fc layer)
        self.features = nn.Sequential(*list(resnet.children())[:-2])
        self.avgpool = resnet.avgpool
        
        self.regression_head = nn.Sequential(
                nn.Linear(2048, 512),
                nn.ReLU(),
                nn.Dropout(0.5),  # Reduced dropout ratio
                nn.Linear(512, 128),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(128, 1)
            )    
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        predictions = self.regression_head(x)
        return predictions, x 
        
class VGG16AuthenticityPredictor(nn.Module):
    def __init__(self, freeze_backbone=True):
        super().__init__()
        # Load pre-trained VGG16
        vgg = vgg16(weights=VGG16_Weights.DEFAULT)
        
        # Freeze backbone if requested
        if freeze_backbone:
            for param in vgg.features.parameters():
                param.requires_grad = False
                
        # Extract features up to fc2
        self.features = vgg.features
        self.avgpool = vgg.avgpool
        self.fc1 = vgg.classifier[:-1]  # Up to fc2 (4096 -> 128)
        
        # New regression head
        self.regression_head = nn.Sequential(
            nn.Linear(4096, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 1),  
        )
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        features = self.fc1(x)
        predictions = self.regression_head(features)
        return predictions, features
    
class VGG19AuthenticityPredictor(nn.Module):
    def __init__(self, freeze_backbone=True):
        super().__init__()
        # Load pre-trained VGG16
        vgg = vgg19(weights=VGG19_Weights.DEFAULT)
        
        # Freeze backbone if requested
        if freeze_backbone:
            for param in vgg.features.parameters():
                param.requires_grad = False
                
        # Extract features up to fc2
        self.features = vgg.features
        self.avgpool = vgg.avgpool
        self.fc1 = vgg.classifier[:-1]  # Up to fc2 (4096 -> 128)
        
        # New regression head
        self.regression_head = nn.Sequential(
            nn.Linear(4096, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 1),  
        )
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        features = self.fc1(x)
        predictions = self.regression_head(features)
        return predictions, features

# Test function

In [None]:
def test_model(model, dataloader, criterion, device='cuda'):

    """
    Tests the model on the test dataset.

    Args:
        model (nn.Module): The trained model.
        dataloader (DataLoader): The test data loader.
        criterion (nn.Module): The loss function.
        device (str): Device to use for testing ('cuda' or 'cpu'). Defaults to 'cuda'.

    Returns:
        float: The average loss on the test dataset.
    """
    model.eval()  # Set the model to evaluation mode
    model.to(device)
    running_loss = 0.0

    with torch.no_grad():  # Disable gradient calculation
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs, _ = model(inputs)
            
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)

    test_loss = running_loss / len(dataloader.dataset)
    print(f'Test Loss: {test_loss:.4f}')
    return test_loss

# Setup

In [None]:
IMAGENET_TRANSFORM = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

DENSENET_TRANSFORM = transforms.Compose([
    transforms.Resize((320, 320)),
    transforms.CenterCrop(300),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

ANNOTATION_FILE =  '../Dataset/AIGCIQA2023/real_images_annotations.csv'
BATCH_SIZE = 64
NUM_WORKERS = 10


# Create the dataset
imageNet_dataset = ImageAuthenticityDataset(csv_file=ANNOTATION_FILE, transform=IMAGENET_TRANSFORM)
denseNet_dataset = ImageAuthenticityDataset(csv_file=ANNOTATION_FILE, transform=DENSENET_TRANSFORM)

# Set random seeds for reproducibility
torch.manual_seed(42)
torch.cuda.manual_seed(42)
np.random.seed(42)

# Split the dataset into training, validations for imageNet and DenseNet
imagenet_train_size = int(0.7 * len(imageNet_dataset))
imagenet_val_size = int(0.2 * len(imageNet_dataset))
imagenet_test_size = len(imageNet_dataset) - imagenet_train_size - imagenet_val_size
train_dataset, val_dataset, test_dataset = random_split(imageNet_dataset, [imagenet_train_size, imagenet_val_size, imagenet_test_size])

# Split the dataset into training, validations for DenseNet
denseNet_train_size = int(0.7 * len(denseNet_dataset))
denseNet_val_size = int(0.2 * len(denseNet_dataset))
denseNet_test_size = len(denseNet_dataset) - denseNet_train_size - denseNet_val_size
train_dataset, val_dataset, test_dataset = random_split(denseNet_dataset, [denseNet_train_size, denseNet_val_size, denseNet_test_size])



imagenet_train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
imagenet_val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
imagenet_test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

denseNet_train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
denseNet_val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
denseNet_test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)


# Create a dictionary containing the data loaders
imageNet_dataloaders = {
    'train': imagenet_train_dataloader,
    'val': imagenet_val_dataloader,
    'test': imagenet_test_dataloader
}

denseNet_dataloaders = {
    'train': denseNet_train_dataloader,
    'val': denseNet_val_dataloader,
    'test': denseNet_test_dataloader
}


# Models load

In [None]:
barlow_twins_base = BarlowTwinsAuthenticityPredictor(freeze_backbone=True)
barlow_twins_base.load_state_dict(torch.load('../Models/BarlowTwins/Weights/BarlowTwins_real_authenticity_finetuned.pth'))
barlow_twins_base.eval()
barlow_twins_base.to('cuda')

barlow_twins_negative_impact = BarlowTwinsAuthenticityPredictor(freeze_backbone=True)
barlow_twins_negative_impact.load_state_dict(torch.load('../Models/BarlowTwins/Weights/real_authenticity_negative_impact_pruned_model.pth'))
barlow_twins_negative_impact.eval()
barlow_twins_negative_impact.to('cuda')

barlow_twins_noise_out = BarlowTwinsAuthenticityPredictor(freeze_backbone=True)
barlow_twins_noise_out.load_state_dict(torch.load('../Models/BarlowTwins/Weights/real_authenticity_noise_out_pruned_model.pth'))
barlow_twins_noise_out.eval()
barlow_twins_noise_out.to('cuda')

# Test the models
print("Testing Barlow Twins Base Model")
test_model(barlow_twins_base, imagenet_test_dataloader, nn.MSELoss(), device='cuda')
print("Testing Barlow Twins Negative Impact Model")
test_model(barlow_twins_negative_impact, imagenet_test_dataloader, nn.MSELoss(), device='cuda')
print("Testing Barlow Twins Noise Out Model")
test_model(barlow_twins_noise_out, imagenet_test_dataloader, nn.MSELoss(), device='cuda')

