In [2]:
import os, random, numpy as np, torch

SEED = 42
os.environ["PYTHONHASHSEED"] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)


os.environ["OMP_NUM_THREADS"]  = "1"   
os.environ["MKL_NUM_THREADS"]  = "1"   
torch.set_num_threads(1)
torch.set_num_interop_threads(1)

torch.set_deterministic_debug_mode(True)


os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":16:8"   # or ":4096:8" for big convs

torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)         
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark     = False
torch.use_deterministic_algorithms(True)   

import os
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

# PyTorch and related imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

# For splitting and metrics
from sklearn.model_selection import train_test_split
from scipy.stats import spearmanr, pearsonr
from sklearn.metrics import r2_score
import torch.nn.functional as F
from sklearn.metrics import mean_absolute_error


import random; import numpy as np; import torch
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False



In [3]:
#### 1) Load CSV and stratified split data

test_df = pd.read_csv("../../dataset/ratings/test.csv")
test_df.shape


(90, 2)

In [4]:
# Set image dimensions
HEIGHT, WIDTH = 384, 512

# For validation and testing we only need to convert to tensor.
eval_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])


In [5]:
class ImageMOSDataset(Dataset):
    def __init__(self, dataframe, img_dir, transform=None):
        """
        Args:
            dataframe (pd.DataFrame): DataFrame with columns 'filename' and 'MOS'
            img_dir (str): Directory with all the images.
            transform: torchvision transforms to apply to images.
        """

        missing = [f for f in dataframe['filename'] if not os.path.isfile(os.path.join(img_dir, f))]
        assert not missing, f"Missing image files: {missing[:5]}…"

        self.dataframe = dataframe.reset_index(drop=True)
        self.img_dir = img_dir
        self.transform = transform
        

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Get the image filename and full path
        filename = self.dataframe.loc[idx, 'filename']
        img_path = os.path.join(self.img_dir, filename)
        # Open image and ensure it is RGB
        image = Image.open(img_path).convert("RGB")

        
        if self.transform:
            image = self.transform(image)
        

        # Get the normalized MOS value as a float tensor
        mos = torch.tensor(self.dataframe.loc[idx, 'MOS'], dtype=torch.float32)

        assert isinstance(image, torch.Tensor)
        assert image.shape == (3, HEIGHT, WIDTH), f"Got shape {image.shape}"
        assert  torch.isfinite(mos), "MOS is not finite"


        return image, mos


In [6]:
test_dataset  = ImageMOSDataset(test_df,  "../../dataset/images/test_images/", transform=eval_transforms)

print(f"Number of images in test dataset: {len(test_dataset)}")



Number of images in test dataset: 90


In [8]:
BATCH_SIZE = 32 # Create DataLoaders


g = torch.Generator().manual_seed(SEED)

def worker_init_fn(worker_id):
    # guarantees that each DataLoader worker reproduces its own stream
    worker_seed = SEED + worker_id
    np.random.seed(worker_seed)
    random.seed(worker_seed)
    torch.manual_seed(worker_seed)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,          
    generator=g,
    worker_init_fn=worker_init_fn  
)


In [9]:
class SimpleCNN(nn.Module):
    def __init__(self, dropout_rate=0.3):
        super(SimpleCNN, self).__init__()

        # First convolutional block
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2)  # (384x512 → 192x256)
        )

        # Second convolutional block
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2)  # → 96x128
        )

        # Third convolutional block
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)  # → 48x64
        )

        # Fourth convolutional block
        self.conv_block4 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)  # → 24x32
        )
        
        # Fifth convolutional block (added)
        self.conv_block5 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2)  # → 12x16
        )

        # Global average pooling instead of flattening
        self.global_pool = nn.AdaptiveAvgPool2d(1)  # Output: 256x1x1
        
        # Fully connected layers
        self.fc1 = nn.Linear(256, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)
        
        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = self.conv_block5(x)
        
        # Global average pooling
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)  # Flatten to (batch_size, 256)
        
        # Fully connected layers with dropout
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        
        return x


if torch.backends.mps.is_available():
    torch.mps.manual_seed(SEED)

# Instantiate the model and send to device (GPU if available)
device = torch.device("mps" if torch.mps.is_available() else "cpu")
# device = torch.device("cpu")

print(device)
model = SimpleCNN().to(device)
print(model)


mps
SimpleCNN(
  (conv_block1): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv_block2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv_block3): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv_block4): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), paddin

In [10]:
checkpoint_path = "./weights/baseCNN_2025-05-25_time_00-51-37_SROCC_62_Pearson_69.pth"    # your .pth file
state_dict = torch.load(checkpoint_path)  # or "cuda:0"
model.load_state_dict(state_dict)


  state_dict = torch.load(checkpoint_path)  # or "cuda:0"


<All keys matched successfully>

In [11]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

num_params = count_parameters(model)
print(f"Total number of trainable parameters: {num_params:,}")


Total number of trainable parameters: 434,817


In [12]:
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        outputs = model(images)
        # Detach and move to CPU; flatten the predictions.
        preds = outputs.cpu().numpy().flatten()
        all_preds.extend(preds)
        all_labels.extend(labels.numpy())

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)

# Compute Spearman's rank correlation coefficient (SROCC)
spearman_corr, _ = spearmanr(all_labels, all_preds)

# Compute Mean Absolute Error (MAE)
mae_val = mean_absolute_error(all_labels, all_preds)

# Compute Pearson correlation coefficient
pearson_corr, _ = pearsonr(all_labels, all_preds)
# Compute R² score
r2_val = r2_score(all_labels, all_preds)

print(f"SROCC:    {spearman_corr:.4f}")
print(f"Pearson:  {pearson_corr:.4f}")
print(f"R^2:      {r2_val:.4f}")
print(f"MAE: {mae_val:.4f}")


SROCC:    0.6216
Pearson:  0.6944
R^2:      0.4593
MAE: 0.5105
