# DL-CBIR using ViT Base-16 Architecture

**Offline Phase**
***

# Libraries

In [None]:
from google.colab import drive

import os
import logging
from PIL import Image

import ast
import cv2
import numpy as np
import pandas as pd

import timm
import torch
from torch import nn
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, random_split
from torchvision.transforms import ToTensor

import joblib
from joblib import load
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score

# Extras
import time
from google.colab import userdata

In [None]:
# HuggingFace Token
os.environ['HF_TOKEN'] = userdata.get('HF_TOKEN')

In [None]:
# Reproducibility
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x78b3c1130cf0>

# Detect Device

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

Using device: cuda


# Configuration Parameters

In [None]:
DIR_PATH = '/content/drive/MyDrive/ML_Datasets/corel_1k_dataset/'
FEATURES_PATH = '/content/drive/MyDrive/ML_Datasets/vit_b_16_feature_vectors.csv'
MODELS_PATH = '/content/drive/MyDrive/ML_Models/'
svm_model_path = MODELS_PATH + 'vit_b_16_svm_model.joblib'

TRAIN_SPLIT = 0.8
BATCH_SIZE = 32

N_WORKERS = 2

PCA_DIM = 64
N_RESULTS = 10

# Logging Setup

In [None]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Remove any existing handlers
if logger.hasHandlers():
    logger.handlers.clear()

# Add a new stream handler
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

# Connecting to the dataset

In [None]:
drive.mount('/content/drive')
print(f"Available classes: {os.listdir(DIR_PATH + '/training_set')}")
print(f"Number of classes: {len(os.listdir(DIR_PATH + '/training_set'))}")

Mounted at /content/drive
Available classes: ['flowers', 'bus', 'foods', 'monuments', 'dinosaurs', 'peolpe_and_villages_in_Africa', 'elephants', 'horses', 'beaches', 'mountains_and_snow']
Number of classes: 10


# Components

## Data Loader

In [None]:
def get_data_transforms():
    return transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])

def load_images_and_labels(data_dir, target_class=None):
    """
    Loads images (optionally only from a specific class) and returns:
      - features (torch.Tensor) of shape (N, C, H, W) on DEVICE
      - labels   (torch.Tensor) of shape (N,) on DEVICE
      - file_names (list) of file paths for each image

    Args:
        data_dir (str): Directory path containing class subdirectories.
        target_class (str or list[str], optional): Only load images from this class (or list of classes).
                                                   If None, loads all classes.
    """
    if not os.path.exists(data_dir):
        raise FileNotFoundError(f"Data directory {data_dir} not found.")

    transform_pipeline = get_data_transforms()
    dataset = datasets.ImageFolder(root=data_dir)  # no transform yet

    # Determine which class indices to keep
    if target_class is None:
        valid_class_idxs = set(dataset.class_to_idx.values())
    else:
        if isinstance(target_class, str):
            target_list = [target_class]
        else:
            target_list = list(target_class)
        valid_class_idxs = {dataset.class_to_idx[c] for c in target_list}

    # Filter samples
    filtered_samples = [
        (path, label)
        for (path, label) in dataset.samples
        if label in valid_class_idxs
    ]

    feature_list = []
    label_list = []
    file_names  = []

    for img_path, label in filtered_samples:
        img = Image.open(img_path).convert('RGB')
        img_t = transform_pipeline(img)
        feature_list.append(img_t)
        label_list.append(label)
        file_names.append(img_path)

    # Stack and move to DEVICE
    features = torch.stack(feature_list).to(DEVICE)      # (N, C, H, W)
    labels   = torch.tensor(label_list, device=DEVICE)   # (N,)

    return features, labels, file_names

## Pre-trained ViT Base-16 Model for Feature Extraction

In [None]:
class ViTFeatureExtractor(nn.Module):
    def __init__(self):
        super(ViTFeatureExtractor, self).__init__()
        # Load pre‑trained ViT and remove its classification head
        self.model = timm.create_model('vit_base_patch16_224', pretrained=True)
        self.model.head = nn.Identity()
        # Move to DEVICE and set to eval mode
        self.model.to(DEVICE)
        self.model.eval()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Extracts ViT features for a batch of images.

        Args:
            x (torch.Tensor): Batch of images as a tensor of shape (N, C, H, W),
                              already resized/cropped/normalized to 224×224.

        Returns:
            torch.Tensor: Feature tensor of shape (N, embed_dim) on DEVICE.
        """
        # Ensure input is on DEVICE
        x = x.to(DEVICE)
        with torch.no_grad():
            features = self.model(x)
        return features

## PCA for Feature Extraction

In [None]:
def compute_normalized_pca(features: torch.Tensor, reduced_dim: int) -> torch.Tensor:
    """
    Performs PCA-based dimensionality reduction on a tensor of AlexNet features (N, 4096)
    followed by L2 normalization. The output always has shape (N, reduced_di'flowers'm), even if the
    number of available principal components (i.e. batch size N) is less than reduced_dim.
    In such cases, the projection matrix is padded with zeros. All computation happens on DEVICE.

    Args:
        features (torch.Tensor): Input tensor of shape (N, 4096), where N is the number of images.
        reduced_dim (int): Target dimensionality after PCA.

    Returns:
        torch.Tensor: L2‑normalized reduced features of shape (N, reduced_dim), on DEVICE.
    """
    # Move features to DEVICE
    features = features.to(DEVICE)

    # Step 1: Center the data
    mean = features.mean(dim=0, keepdim=True)               # (1, 4096)
    features_centered = features - mean                     # (N, 4096)

    # Step 2: Compute SVD on the centered data
    U, S, Vh = torch.linalg.svd(features_centered, full_matrices=False)
    # Vh has shape (min(N, 4096), 4096)
    num_components = Vh.shape[0]

    # Step 3: Select top principal components and form projection matrix
    if num_components >= reduced_dim:
        # Enough components
        principal_components = Vh[:reduced_dim].T          # (4096, reduced_dim)
    else:
        # Pad with zeros
        principal_components = Vh[:num_components].T       # (4096, num_components)
        pad_width = reduced_dim - num_components
        padding = torch.zeros((features.shape[1], pad_width), device=DEVICE)
        principal_components = torch.cat([principal_components, padding], dim=1)  # (4096, reduced_dim)

    # Step 4: Project onto the reduced subspace
    reduced_features = features_centered @ principal_components  # (N, reduced_dim)

    # Step 5: L2‑normalize each row
    reduced_features_normalized = F.normalize(reduced_features, p=2, dim=1)  # (N, reduced_dim)

    return reduced_features_normalized


## DCT for Feature Extraction

In [None]:
def compute_normalized_dct(images: torch.Tensor) -> torch.Tensor:
    """
    Computes the L2‑normalized DCT feature vector for each image in a batch,
    returning a tensor on DEVICE.

    Args:
        images (torch.Tensor): Batch of input images with shape (N, C, H, W),
                               where N is the number of images and C is the number of channels (3).

    Returns:
        torch.Tensor: Tensor of L2‑normalized DCT feature vectors with shape (N, feature_length),
                      on DEVICE.
    """
    # Move to CPU and convert to HWC numpy array
    images_np = images.permute(0, 2, 3, 1).cpu().numpy()  # shape: (N, H, W, C)

    dct_features = []
    for img in images_np:
        # Grayscale
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        # float32
        gray_f = np.float32(gray)
        # DCT
        dct_res = cv2.dct(gray_f)
        # Flatten
        vec = dct_res.flatten()
        # L2 norm
        norm = np.linalg.norm(vec)
        if norm > 0:
            vec = vec / norm
        dct_features.append(vec)

    # Stack into numpy, convert to torch tensor, move to DEVICE
    dct_np = np.stack(dct_features, axis=0)           # shape: (N, feature_length)
    dct_tensor = torch.from_numpy(dct_np).to(DEVICE)  # on DEVICE

    return dct_tensor

## Feature Vector Combination

In [None]:
def compute_combined_feature_vector(
    images_tensor: torch.Tensor,
    alexnet_features: torch.Tensor,
    pca_dim: int = PCA_DIM
) -> torch.Tensor:
    """
    Computes Combined Feature Vectors by concatenating the normalized DCT and PCA feature vectors
    for a batch of images, performing all computations on DEVICE.

    Args:
        images_tensor (torch.Tensor): Input image tensor with shape (N, C, H, W).
        alexnet_features (torch.Tensor): AlexNet feature tensor with shape (N, 4096).
        pca_dim (int): Target dimensionality for PCA reduction.

    Returns:
        torch.Tensor: Combined feature vectors of shape (N, dct_length + pca_dim), on DEVICE.
    """
    # Move inputs to DEVICE
    images_tensor = images_tensor.to(DEVICE)
    alexnet_features = alexnet_features.to(DEVICE)

    # Compute normalized DCT feature vectors (on DEVICE)
    dct_features = compute_normalized_dct(images_tensor)    # shape: (N, dct_length)

    # Compute normalized PCA feature vectors (on DEVICE)
    pca_features = compute_normalized_pca(alexnet_features, pca_dim)  # shape: (N, pca_dim)

    # Concatenate along feature axis (result is on DEVICE)
    combined_feature_vectors = torch.cat((dct_features, pca_features), dim=1)

    return combined_feature_vectors

## Features Database

In [None]:
def generate_feature_vectors_csv(feature_vectors: torch.Tensor, labels: torch.Tensor, file_names: list, output_csv_path: str):
    """
    Generates a CSV file with columns ordered as: file_path, label, f1, f2, ..., fN.
    This structure is preferred for CBIR applications, where each row represents an image,
    and each feature value is stored in its own column.

    Args:
        feature_vectors (torch.Tensor): Tensor of combined feature vectors with shape (N, feature_length).
        labels (torch.Tensor): Tensor of labels with shape (N,).
        file_names (list): List of file paths corresponding to each image.
        output_csv_path (str): Full path (including filename) where the CSV file will be saved.
    """
    if feature_vectors.size(0) == 0:
        raise ValueError("Feature vectors tensor is empty!")

    # Convert tensors to numpy arrays.
    feature_vectors_np = feature_vectors.cpu().numpy()
    labels_np = labels.cpu().numpy()

    # Determine the feature vector length.
    feature_length = feature_vectors_np.shape[1]

    # Create the column names: file_path, label, f1, f2, ..., fN.
    columns = ['file_path', 'label'] + [f'f{i+1}' for i in range(feature_length)]

    data = []
    for feat, label, fname in zip(feature_vectors_np, labels_np, file_names):
        # Each row contains the file path, label, then each feature value.
        row = [fname, label] + feat.tolist()
        data.append(row)

    # Create the DataFrame.
    df = pd.DataFrame(data, columns=columns)

    # Ensure the directory exists.
    output_dir = os.path.dirname(output_csv_path)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Write the DataFrame to a CSV file.
    df.to_csv(output_csv_path, index=False)
    print(f"CSV file '{output_csv_path}' generated successfully.")

## SVM Model for prediction of Image Class

### Feature Vectors Dataset

In [None]:
def load_feature_database(csv_file: str):
    """
    Loads the image database from a CSV file and converts the 'feature_vector' column
    from string to a list of floats.

    Args:
        csv_file (str): Path to the CSV file.

    Returns:
        tuple: A tuple (features, labels, file_paths) where:
            - features is a numpy array of shape (N, d)
            - labels is a list of labels
            - file_paths is a list of file paths for each image
    """
    data = pd.read_csv(csv_file)
    # Convert the 'feature_vector' column from string to list of floats.
    data['feature_vector'] = data['feature_vector'].apply(ast.literal_eval)
    # Convert list of feature vectors into a numpy array.
    features = np.array(data['feature_vector'].tolist())
    labels = data['label'].tolist()
    file_paths = data['file_path'].tolist()
    return features, labels, file_paths

### Model

In [None]:
class SVMClassifier:
    def __init__(self, csv_file: str, label_column: str = 'label', test_size: float = 0.2, random_state: int = 42):
        """
        Initializes the SVM classifier with data from a CSV file.

        Args:
            csv_file (str): Path to the input CSV file.
            label_column (str): Name of the column containing class labels.
            test_size (float): Fraction of data to be used for testing (default: 0.2).
            random_state (int): Random seed for reproducibility (default: 42).
        """
        self.csv_file = csv_file
        self.label_column = label_column
        self.test_size = test_size
        self.random_state = random_state
        self.model = None
        self.scaler = None
        self.X_test = None
        self.y_test = None
        logging.info(f"SVMClassifier initialized with csv_file: {csv_file}, label_column: {label_column}")

    def load_data(self):
        """
        Loads data from the CSV file and splits it into training and testing sets.
        Expects the CSV file to have columns: file_path, label, f1, f2, ..., fN.

        Returns:
            tuple: (X_train, X_test, y_train, y_test)
        """
        logging.info("Loading data...")
        # Load the dataset
        data = pd.read_csv(self.csv_file)

        # Drop the file_path column if it exists (we don't use it for training)
        if 'file_path' in data.columns:
            data.drop(columns=['file_path'], inplace=True)

        # Features are assumed to be all columns except the label column.
        features = data.drop(columns=[self.label_column])
        labels = data[self.label_column]

        # Split into training and testing sets.
        X_train, X_test, y_train, y_test = train_test_split(
            features, labels, test_size=self.test_size, random_state=self.random_state
        )
        logging.info("Data loaded and split into training and test sets.")
        return X_train, X_test, y_train, y_test

    def train(self):
        """
        Trains the SVM model on the training set.
        Standardizes the features and stores the trained model and scaler as instance attributes.

        Returns:
            model: The trained SVM model.
        """
        logging.info("Starting training...")
        # Load and split the data.
        X_train, X_test, y_train, y_test = self.load_data()

        # Standardize features for better SVM performance.
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        self.scaler = scaler
        self.X_test = X_test_scaled
        self.y_test = y_test

        # Initialize the SVM classifier with RBF kernel.
        model = SVC(kernel='rbf', gamma='scale', C=1.0)
        logging.info("SVM model initialized with RBF kernel.")

        # Train the model.
        model.fit(X_train_scaled, y_train)
        self.model = model
        logging.info("SVM model training completed.")
        return model

    def evaluate(self):
        """
        Evaluates the trained SVM model on the test set.
        Prints the classification report and accuracy score.

        Returns:
            tuple: (report, accuracy)
        """
        if self.model is None:
            logging.error("Model not trained. Please train the model before evaluation.")
            return None, None

        logging.info("Starting evaluation...")
        # Make predictions on the test set.
        y_pred = self.model.predict(self.X_test)

        # Evaluate the model.
        report = classification_report(self.y_test, y_pred)
        accuracy = accuracy_score(self.y_test, y_pred)
        logging.info("Evaluation completed.")

        print("Classification Report:")
        print(report)
        print("Accuracy Score:", accuracy)
        return report, accuracy

    def save_model(self, filename: str):
        """
        Saves the trained model and scaler to a file using joblib.

        Args:
            filename (str): The file path where the model will be saved.
        """
        if self.model is None or self.scaler is None:
            logging.error("No trained model to save. Please train the model first.")
            return

        joblib.dump((self.model, self.scaler), filename)
        logging.info(f"Model saved to {filename}")

    def load_model(self, filename: str):
        """
        Loads a model and scaler from a file using joblib.

        Args:
            filename (str): The file path from where the model will be loaded.
        """
        self.model, self.scaler = joblib.load(filename)
        logging.info(f"Model loaded from {filename}")


# Result

In [None]:
# Start time of execution
start_time = time.process_time()

# Load ALL classes:
images_tensor, labels_tensor, file_names = load_images_and_labels(
    DIR_PATH + '/training_set'
)

# # Load only the BUS class:
# images_tensor, labels_tensor, file_names = load_images_and_labels(
#     DIR_PATH + '/test_set', target_class='bus'
# )

# Load classes BUS and FLOWERS:
# images_tensor, labels_tensor, file_names = load_images_and_labels(
#     DIR_PATH + '/test_set', target_class=['bus','flowers']
# )

# Total Time for execution
logging.info(f"Time taken to execute = {(time.process_time() - start_time):.4f}s")

2025-04-17 05:43:40,455 - INFO - Time taken to execute = 5.5826s


In [None]:
# Start time of execution
start_time = time.process_time()

# ViT Model for Feature Extraction
model = ViTFeatureExtractor()
# The model expects a tensor of shape (N, C, H, W) and returns features of shape (N, 4096).
model_features = model(images_tensor)
logging.info(f"Extracted features shape: {model_features.shape}")

# Total Time for execution
logging.info(f"Time taken to execute = {(time.process_time() - start_time):.4f}s")

2025-04-17 05:43:42,583 - INFO - Loading pretrained weights from Hugging Face hub (timm/vit_base_patch16_224.augreg2_in21k_ft_in1k)


model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

2025-04-17 05:43:45,103 - INFO - [timm/vit_base_patch16_224.augreg2_in21k_ft_in1k] Safe alternative available for 'pytorch_model.bin' (as 'model.safetensors'). Loading weights using safetensors.
2025-04-17 05:43:46,638 - INFO - Extracted features shape: torch.Size([900, 768])
2025-04-17 05:43:46,639 - INFO - Time taken to execute = 4.0050s


In [None]:
# Start time of execution
start_time = time.process_time()

# Combined Feature Vector Generation
combined_vectors = compute_combined_feature_vector(images_tensor, model_features)
generate_feature_vectors_csv(combined_vectors, labels_tensor, file_names, FEATURES_PATH)

# Total Time for execution
logging.info(f"Time taken to execute = {(time.process_time() - start_time):.4f}s")

CSV file '/content/drive/MyDrive/ML_Datasets/vit_b_16_feature_vectors.csv' generated successfully.


2025-04-17 05:46:46,313 - INFO - Time taken to execute = 171.7949s


In [None]:
# Start time of execution
start_time = time.process_time()

# Training the SVM Classifier
label_column = 'label'

svm_classifier = SVMClassifier(csv_file=FEATURES_PATH, label_column=label_column, test_size=0.2, random_state=42)
model = svm_classifier.train()

report, accuracy = svm_classifier.evaluate()

# Total Time for execution
logging.info(f"Time taken to execute = {(time.process_time() - start_time):.4f}s")

2025-04-17 05:46:46,319 - INFO - SVMClassifier initialized with csv_file: /content/drive/MyDrive/ML_Datasets/vit_b_16_feature_vectors.csv, label_column: label
2025-04-17 05:46:46,322 - INFO - Starting training...
2025-04-17 05:46:46,323 - INFO - Loading data...
2025-04-17 05:47:26,131 - INFO - Data loaded and split into training and test sets.
2025-04-17 05:47:27,719 - INFO - SVM model initialized with RBF kernel.
2025-04-17 05:47:59,482 - INFO - SVM model training completed.
2025-04-17 05:47:59,488 - INFO - Starting evaluation...
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
2025-04-17 05:48:10,438 - INFO - Evaluation completed.
2025-04-17 05:48:10,447 - INFO - Time taken to execute = 109.1706s


Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        23
           1       0.11      0.58      0.19        12
           2       0.33      0.79      0.47        19
           3       0.00      0.00      0.00        19
           4       0.68      0.81      0.74        21
           5       0.00      0.00      0.00        20
           6       0.11      0.06      0.08        17
           7       0.00      0.00      0.00        14
           8       0.00      0.00      0.00        20
           9       0.16      0.40      0.23        15

    accuracy                           0.26       180
   macro avg       0.14      0.26      0.17       180
weighted avg       0.15      0.26      0.17       180

Accuracy Score: 0.25555555555555554


In [None]:
# Saving the SVM Classifier
output_dir = MODELS_PATH
model_filename = os.path.join(output_dir, svm_model_path)

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

svm_classifier.save_model(model_filename)

2025-04-17 05:48:11,481 - INFO - Model saved to /content/drive/MyDrive/ML_Models/vit_b_16_svm_model.joblib


**End**