## Load and split data

### Subtask:
Load the metadata from the JSON file and split it into training and testing sets. Save the split metadata into `train.json` and `test.json`.


**Reasoning**:
Import necessary libraries, load the metadata, split it into training and testing sets, and save the splits to separate JSON files.



In [None]:
import os
import random
import json
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from google.colab import files
import shutil

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Install pv for progress visualization
!apt-get -qq install pv

DATASET_PATH = "/content/drive/MyDrive/Samsung_Project/tid2013"
REF_PATH = os.path.join(DATASET_PATH, "reference_images")
DIST_PATH = os.path.join(DATASET_PATH, "distorted_images")
MOS_FILE = os.path.join(DATASET_PATH, "mos_with_names.txt")
PATCHES_PATH = "/content/drive/MyDrive/Samsung_Project/tid2013_patches"

Mounted at /content/drive
Selecting previously unselected package pv.
(Reading database ... 126380 files and directories currently installed.)
Preparing to unpack .../pv_1.6.6-1build2_amd64.deb ...
Unpacking pv (1.6.6-1build2) ...
Setting up pv (1.6.6-1build2) ...
Processing triggers for man-db (2.10.2-1) ...


# Sanity check on dataset and patches

In [None]:
# Check if the dataset directory exists and contains files
if os.path.exists(DATASET_PATH) and os.listdir(DATASET_PATH):
    print(f"Dataset directory '{DATASET_PATH}' exists and contains files.")
else:
    print(f"Error: Dataset directory '{DATASET_PATH}' not found or empty.")

# Check if reference and distorted image directories exist and contain files
if os.path.exists(REF_PATH) and os.listdir(REF_PATH):
    print(f"Reference images directory '{REF_PATH}' exists and contains files.")
else:
    print(f"Error: Reference images directory '{REF_PATH}' not found or empty.")

if os.path.exists(DIST_PATH) and os.listdir(DIST_PATH):
    print(f"Distorted images directory '{DIST_PATH}' exists and contains files.")
else:
    print(f"Error: Distorted images directory '{DIST_PATH}' not found or empty.")

# Check if MOS file exists
if os.path.exists(MOS_FILE):
    print(f"MOS file '{MOS_FILE}' found.")
else:
    print(f"Error: MOS file '{MOS_FILE}' not found.")

# Check if patches directory exists (it will be created later, so this is just a check)
if os.path.exists(PATCHES_PATH):
    print(f"Patches directory '{PATCHES_PATH}' exists.")
else:
    print(f"Patches directory '{PATCHES_PATH}' does not exist")

# Optional: Display a few image file names to confirm data is there
if os.path.exists(REF_PATH) and os.listdir(REF_PATH):
    print("\nSample reference image files:")
    print(random.sample(os.listdir(REF_PATH), min(5, len(os.listdir(REF_PATH)))))

if os.path.exists(DIST_PATH) and os.listdir(DIST_PATH):
    print("\nSample distorted image files:")
    print(random.sample(os.listdir(DIST_PATH), min(5, len(os.listdir(DIST_PATH)))))

# Display a few image file names from tid2013_patches/copy
COPY_PATCHES_PATH = os.path.join(PATCHES_PATH, "copy")
if os.path.exists(COPY_PATCHES_PATH) and os.listdir(COPY_PATCHES_PATH):
    print(f"\nSample patch files from {COPY_PATCHES_PATH}:")
    print(random.sample(os.listdir(COPY_PATCHES_PATH), min(5, len(os.listdir(COPY_PATCHES_PATH)))))
elif os.path.exists(COPY_PATCHES_PATH):
     print(f"\nDirectory '{COPY_PATCHES_PATH}' is empty.")
else:
    print(f"\nDirectory '{COPY_PATCHES_PATH}' does not exist.")

# Optional: Read a few lines of the MOS file
if os.path.exists(MOS_FILE):
    print("\nSample lines from MOS file:")
    with open(MOS_FILE, 'r') as f:
        for i, line in enumerate(f):
            print(line.strip())
            if i >= 4: # Print first 5 lines
                break

Dataset directory '/content/drive/MyDrive/Samsung_Project/tid2013' exists and contains files.
Reference images directory '/content/drive/MyDrive/Samsung_Project/tid2013/reference_images' exists and contains files.
Distorted images directory '/content/drive/MyDrive/Samsung_Project/tid2013/distorted_images' exists and contains files.
MOS file '/content/drive/MyDrive/Samsung_Project/tid2013/mos_with_names.txt' found.
Patches directory '/content/drive/MyDrive/Samsung_Project/tid2013_patches' exists.

Sample reference image files:
['I20.BMP', 'I23.BMP', 'I22.BMP', 'I04.BMP', 'I11.BMP']

Sample distorted image files:
['i06_23_3.bmp', 'i20_06_3.bmp', 'i18_02_3.bmp', 'i12_06_1.bmp', 'i22_19_4.bmp']

Sample patch files from /content/drive/MyDrive/Samsung_Project/tid2013_patches/copy:
['copy_patch_1283.png', 'copy_patch_0939.png', 'copy_patch_0866.png', 'copy_patch_1697.png', 'copy_patch_1928.png']

Sample lines from MOS file:
5.51429 I01_01_1.bmp
5.56757 i01_01_2.bmp
4.94444 i01_01_3.bmp
4.3783

In [None]:
import json
from sklearn.model_selection import train_test_split
import os

# Load the metadata from the patches directory
# Corrected path to look directly in PATCHES_PATH
metadata_path = os.path.join(PATCHES_PATH, "metadata.json")
with open(metadata_path, 'r') as f:
    metadata = json.load(f)

# Extract data points - assuming metadata is a list
data_points = metadata

# Split data points into training and testing sets
train_data, test_data = train_test_split(data_points, test_size=0.2, random_state=42)

# Create dictionaries for training and testing sets
train_metadata = {'data': train_data}
test_metadata = {'data': test_data}

# Define the output paths for the split metadata files within the patches directory
# Corrected paths to save directly in PATCHES_PATH
train_json_path = os.path.join(PATCHES_PATH, "train.json")
test_json_path = os.path.join(PATCHES_PATH, "test.json")

# Save training metadata to train.json in the patches directory
with open(train_json_path, 'w') as f:
    json.dump(train_metadata, f, indent=4)

# Save testing metadata to test.json in the patches directory
with open(test_json_path, 'w') as f:
    json.dump(test_metadata, f, indent=4)

print(f"Train metadata saved to: {train_json_path}")
print(f"Test metadata saved to: {test_json_path}")

Train metadata saved to: /content/drive/MyDrive/Samsung_Project/tid2013_patches/train.json
Test metadata saved to: /content/drive/MyDrive/Samsung_Project/tid2013_patches/test.json


## Implement dataset loader

### Subtask:
Create a custom dataset loader that reads the image pairs and scores from the split metadata files.


**Reasoning**:
Implement the custom dataset class including the `__init__`, `__len__`, and `__getitem__` methods to load and process image pairs and scores from the metadata.



In [None]:
import torch
from torch.utils.data import Dataset
from PIL import Image
import os
import json
import torchvision.transforms as transforms

class ImagePairDataset(Dataset):
    def __init__(self, metadata_file, img_dir):
        """
        Args:
            metadata_file (string): Path to the json file with annotations.
            img_dir (string): Directory with all the images.
        """
        with open(metadata_file, 'r') as f:
            self.metadata = json.load(f)['data']
        self.img_dir = img_dir
        self.transform = transforms.Compose([
            transforms.Grayscale(num_output_channels=1),
            transforms.ToTensor(),
        ])


    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_info = self.metadata[idx]
        ref_img_name = img_info['reference_image']
        dist_img_name = img_info['distorted_image']
        score = img_info['score']

        ref_img_path = os.path.join(self.img_dir, ref_img_name)
        dist_img_path = os.path.join(self.img_dir, dist_img_name)

        ref_image = Image.open(ref_img_path)
        dist_image = Image.open(dist_img_path)

        # Apply transformations
        ref_image = self.transform(ref_image)
        dist_image = self.transform(dist_image)

        return ref_image, dist_image, score

# Example usage (optional, for testing the dataset class)
train_dataset = ImagePairDataset(metadata_file=os.path.join(PATCHES_PATH, 'train.json'), img_dir='/content/drive/MyDrive/Samsung_Project/tid2013_patches/copy')
test_dataset = ImagePairDataset(metadata_file=os.path.join(PATCHES_PATH, 'test.json'), img_dir='/content/drive/MyDrive/Samsung_Project/tid2013_patches/copy')

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of testing samples: {len(test_dataset)}")

# sample_ref, sample_dist, sample_score = train_dataset[0]
# print(f"Sample reference image shape: {sample_ref.shape}")
# print(f"Sample distorted image shape: {sample_dist.shape}")
# print(f"Sample score: {sample_score}")

Number of training samples: 1600
Number of testing samples: 400


## Implement the model

### Subtask:
Define the CNN model architecture as described in the prompt.


**Reasoning**:
Define the CNN model class with the specified architecture, including convolutional layers, flattening, a linear layer, and a sigmoid activation function.



# PatchDiff-GAP (20×20) – 9-ch CNN Regressor

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import json
import torchvision.transforms as transforms

# Define hyperparameters (add these lines)
batch_size = 16 # Reduced batch size
learning_rate = 0.001
num_epochs = 15 # Increased epochs slightly to compensate for smaller batch size, but early stopping will prevent overfitting
patience = 5 # Increased patience for early stopping
min_delta = 0.0005 # Reduced minimum delta for early stopping

# Updated ImagePairDatasetPatches to load distorted patches and generate reference patches on the fly
class ImagePairDatasetPatches(Dataset):
    def __init__(self, metadata_file, patch_dir, ref_img_original_dir):
        """
        Args:
            metadata_file (string): Path to the json file with annotations.
            patch_dir (string): Directory with all the distorted patches (unique_sample_id).
            ref_img_original_dir (string): Directory with original reference images (clean_image).
        """
        with open(metadata_file, 'r') as f:
            self.metadata = json.load(f)['data'] # Assuming the data is under the 'data' key
        self.patch_dir = patch_dir
        self.ref_img_original_dir = ref_img_original_dir
        self.transform = transforms.Compose([
            transforms.ToTensor(), # Keep as RGB (3 channels)
        ])
        # Create a case-insensitive mapping of filenames in the reference directory
        self.ref_filenames_lower = {filename.lower(): filename for filename in os.listdir(ref_img_original_dir)}


    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_info = self.metadata[idx]
        try:
            patch_filename = img_info['unique_sample_id']
            original_ref_img_name = img_info['clean_image']
            roi = img_info['metadata']['roi'] # Assuming roi is nested under 'metadata'
            score = img_info['score']
        except KeyError as e:
            print(f"KeyError: {e}. Available keys in metadata entry: {img_info.keys()}")
            if 'metadata' in img_info and 'roi' not in img_info['metadata']:
                 print(f"KeyError: 'roi' not found in metadata['metadata']. Available keys in 'metadata': {img_info['metadata'].keys()}")
            raise # Re-raise the exception after printing keys
        except Exception as e:
            print(f"An unexpected error occurred while accessing metadata for index {idx}: {e}")
            raise


        # Construct path to the distorted patch file
        dist_patch_path = os.path.join(self.patch_dir, patch_filename)

        # Construct path to the original reference image using case-insensitive lookup
        original_ref_img_name_lower_from_metadata = original_ref_img_name.lower()
        if original_ref_img_name_lower_from_metadata in self.ref_filenames_lower:
            actual_ref_img_name = self.ref_filenames_lower[original_ref_img_name_lower_from_metadata]
            original_ref_img_path = os.path.join(self.ref_img_original_dir, actual_ref_img_name)
        else:
            # If the file is not found even with case-insensitive lookup, raise an error
            raise FileNotFoundError(f"Original reference image '{original_ref_img_name}' (case-insensitive) not found in directory '{self.ref_img_original_dir}'.")


        try:
            # Load the distorted patch
            dist_patch = Image.open(dist_patch_path).convert('RGB') # Ensure RGB
            original_ref_img = Image.open(original_ref_img_path).convert('RGB')
            ref_patch = original_ref_img.crop((roi[0], roi[1], roi[0] + roi[2], roi[1] + roi[3]))


        except FileNotFoundError as e:
            print(f"FileNotFoundError: Could not find image file. Details: {e}")
            print(f"Attempted paths: Distorted patch: {dist_patch_path}, Original reference image: {original_ref_img_path}")
            raise
        except IndexError:
             print(f"IndexError: Invalid ROI format for entry {idx}. ROI: {roi}")
             raise
        except Exception as e:
            print(f"An unexpected error occurred while loading/processing images for index {idx}: {e}")
            print(f"Attempted paths: Distorted patch: {dist_patch_path}, Original reference image: {original_ref_img_path}")
            raise


        # Apply transformations
        ref_patch = self.transform(ref_patch)
        dist_patch = self.transform(dist_patch)

        # Stack the reference and distorted patches along the channel dimension
        stacked_patches = torch.cat([ref_patch, dist_patch], dim=0) # Stack along channel dim (C, H, W) -> (6, H, W)

        return stacked_patches, score

# Instantiate the ImagePairDataset for the training data (using patches and original ref images)
patches_dir_path = os.path.join(PATCHES_PATH, "copy") # Assuming distorted patches are in 'copy' subfolder
train_dataset = ImagePairDatasetPatches(metadata_file=os.path.join(PATCHES_PATH, 'train.json'), patch_dir=patches_dir_path, ref_img_original_dir=REF_PATH)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Implement the new TinyPatchRegressor model
class TinyPatchRegressor(nn.Module):
    def __init__(self, in_channels=6, use_groupnorm=True): # Input channels should be 6 for stacked RGB patches
        super().__init__()
        # 20x20-safe (padding keeps spatial size)
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=3, padding=1)
        self.n1 = nn.GroupNorm(8, 32) if use_groupnorm else nn.BatchNorm2d(32)

        self.conv2 = nn.Conv2d(32, 16, kernel_size=1)
        self.n2 = nn.GroupNorm(4, 16) if use_groupnorm else nn.BatchNorm2d(16)

        self.conv3 = nn.Conv2d(16, 8, kernel_size=3, padding=1)  # keep some channels before GAP

        self.gap = nn.AdaptiveAvgPool2d(1)  # -> [B,8,1,1]
        self.dropout = nn.Dropout(0.10)
        self.fc = nn.Linear(8, 1)           # linear head; no sigmoid

        # Kaiming init
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
                if m.bias is not None: nn.init.zeros_(m.bias)
            if isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight); nn.init.zeros_(m.bias)

    def forward(self, x):
        x = F.relu(self.n1(self.conv1(x)))
        x = F.relu(self.n2(self.conv2(x)))
        x = F.relu(self.conv3(x))                 # [B,8,20,20]
        x = self.gap(x).view(x.size(0), 8)        # [B,8]
        x = self.dropout(x)
        x = self.fc(x).squeeze(1)                 # [B]
        return x

model = TinyPatchRegressor(in_channels=6) # Initialize with 6 input channels


print("Model architecture:")
print(model)


# 3. Define the Mean Squared Error (MSE) loss function
criterion = nn.MSELoss()

# 4. Define an optimizer (e.g., Adam) and specify the learning rate
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 5. Implement the training loop
print(f"\nStarting training for {num_epochs} epochs...")

best_loss = float('inf')
epochs_no_improve = 0

# Initialize the gradient scaler for mixed precision training
scaler = torch.cuda.amp.GradScaler()


for epoch in range(num_epochs):
    running_loss = 0.0
    model.train() # Set model to training mode
    for i, data in enumerate(train_dataloader, 0):
        # get the inputs; data is a list of [stacked_patches, scores]
        inputs, scores = data

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Mixed precision training
        with torch.cuda.amp.autocast():
            # Forward pass
            outputs = model(inputs)

            # Calculate the MSE loss
            # The TinyPatchRegressor outputs a single value (no sigmoid), so scores should match its shape [B]
            scores = scores.float() # Ensure scores are float, shape [B]

            loss = criterion(outputs, scores)

        # Backward pass and optimize with scaler
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # Accumulate running loss
        running_loss += loss.item()

    # Print average loss for the epoch
    epoch_loss = running_loss / len(train_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}")

    # Early stopping check (using training loss - ideally use validation loss)
    if epoch_loss < best_loss - min_delta:
        best_loss = epoch_loss
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs due to no improvement in training loss.")
            break # Stop training loop

print("Finished Training")

Model architecture:
TinyPatchRegressor(
  (conv1): Conv2d(6, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (n1): GroupNorm(8, 32, eps=1e-05, affine=True)
  (conv2): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1))
  (n2): GroupNorm(4, 16, eps=1e-05, affine=True)
  (conv3): Conv2d(16, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (gap): AdaptiveAvgPool2d(output_size=1)
  (dropout): Dropout(p=0.1, inplace=False)
  (fc): Linear(in_features=8, out_features=1, bias=True)
)

Starting training for 15 epochs...


  scaler = torch.cuda.amp.GradScaler()
  with torch.cuda.amp.autocast():


Epoch 1/15, Loss: 0.0359
Epoch 2/15, Loss: 0.0253
Epoch 3/15, Loss: 0.0241
Epoch 4/15, Loss: 0.0236
Epoch 5/15, Loss: 0.0225
Epoch 6/15, Loss: 0.0215
Epoch 7/15, Loss: 0.0204
Epoch 8/15, Loss: 0.0191
Epoch 9/15, Loss: 0.0177
Epoch 10/15, Loss: 0.0182
Epoch 11/15, Loss: 0.0168
Epoch 12/15, Loss: 0.0163
Epoch 13/15, Loss: 0.0152
Epoch 14/15, Loss: 0.0156
Epoch 15/15, Loss: 0.0151
Finished Training


In [None]:
# 1. Instantiate the ImagePairDataset for the testing data and create a DataLoader
# Corrected metadata_file path to look directly in PATCHES_PATH
patches_dir = os.path.join(PATCHES_PATH, "copy") # Assuming patches are in a 'copy' subfolder
# Added the missing ref_img_original_dir argument
test_dataset = ImagePairDatasetPatches(metadata_file=os.path.join(PATCHES_PATH, 'test.json'), patch_dir=patches_dir, ref_img_original_dir=REF_PATH)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # No need to shuffle test data

# Set the model to evaluation mode
model.eval()

running_loss = 0.0
total_samples = 0

print("\nStarting evaluation on the test set...")

# Disable gradient calculation for evaluation
with torch.no_grad():
    for i, data in enumerate(test_dataloader, 0):
        # get the inputs; data is a list of [stacked_patches, scores]
        # Corrected data unpacking to match the dataset's output
        inputs, scores = data

        # No need to stack here, as the dataset already returns stacked patches
        # inputs = torch.cat([ref_images, dist_images], dim=1) # Stack along the channel dimension (3 + 3 = 6 channels)

        # Forward pass
        outputs = model(inputs)

        # Calculate the MSE loss
        # The TinyPatchRegressor outputs a single value (no sigmoid), so scores should match its shape [B]
        scores = scores.float() # Ensure scores are float, shape [B]

        loss = criterion(outputs, scores)

        # Accumulate running loss and total samples
        running_loss += loss.item() * inputs.size(0) # Multiply by batch size to get sum of losses
        total_samples += inputs.size(0)

# Calculate average loss for the test set
average_test_loss = running_loss / total_samples
print(f"Average Test Loss: {average_test_loss:.4f}")


Starting evaluation on the test set...
Average Test Loss: 0.0120
