### Data Download

In [None]:
import kagglehub
import os
import shutil

# Dataset directory
dataset_dir = "data/raw"

# Download dataset
path = kagglehub.dataset_download("jiayuanchengala/aid-scene-classification-datasets")
src_dir = os.path.join(path, "AID")

# Move only if not already moved
if not os.path.exists(dataset_dir):
    print("Copying dataset to data/raw...")
    # Note that if I move it then kagglehub wont re-download so we can=t keep consistency
    shutil.copytree(src_dir, dataset_dir)
else:
    print("Dataset already exists at data/raw")

print("✅ Dataset ready at:", dataset_dir)


### Manual Data Separation

Note that this isn't necessary for sklearn since it already includes splits and cross validation.

However, for deep learning (Pytorch) you do need to the separation yourself

In [None]:
import os


image_paths =  []
labels = []
source_dir = 'data/raw'
categories = [d for d in os.listdir(source_dir)
              if os.path.isdir(os.path.join(source_dir, d))]
for category in categories:
    category_path = os.path.join(source_dir, category)
    files = [os.path.join(category_path, f) for f in os.listdir(category_path)
             if os.path.isfile(os.path.join(category_path, f))]
    image_paths.extend(files)
    labels.extend([category] * len(files))

print(f"Found {len(image_paths)} images in {len(categories)} categories")
os.makedirs("metrics", exist_ok=True)

In [None]:
import random
from pathlib import Path

# Set random seed for reproducibility
random.seed(42)

# Source directory containing all the category folders
source_dir = 'data/raw'

# Create train and test directories
train_dir = 'data/train'
test_dir = 'data/test'
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

for category in categories:
    category_path = os.path.join(source_dir, category)

    if not os.path.exists(category_path):
        print(f"Warning: {category} folder not found, skipping...")
        continue

    # Get all files in the category folder
    files = [f for f in os.listdir(category_path)
             if os.path.isfile(os.path.join(category_path, f))]

    if len(files) == 0:
        print(f"Warning: {category} folder is empty, skipping...")
        continue

    # Shuffle files randomly
    random.shuffle(files)

    # Calculate split point (90% for train)
    split_idx = int(len(files) * 0.9)
    train_files = files[:split_idx]
    test_files = files[split_idx:]

    # Create category subfolders in train and test
    train_category_dir = os.path.join(train_dir, category)
    test_category_dir = os.path.join(test_dir, category)
    os.makedirs(train_category_dir, exist_ok=True)
    os.makedirs(test_category_dir, exist_ok=True)

    # Move files to train
    for file in train_files:
        src = os.path.join(category_path, file)
        dst = os.path.join(train_category_dir, file)
        shutil.copy2(src, dst)

    # Move files to test
    for file in test_files:
        src = os.path.join(category_path, file)
        dst = os.path.join(test_category_dir, file)
        shutil.copy2(src, dst)

    print(f"{category}: {len(train_files)} files to train, {len(test_files)} files to test")

print("\nSplit complete!")

In [None]:
from collections import defaultdict

# This is for cross-validation (Pytorch and Tensorflow need manual separation)
# To scikit-learning you onl need to pass the training data and it does the division

# Configuration
k_folds = 5
output_base = 'kfolds'
os.makedirs(output_base, exist_ok=True)

print(f"Creating {k_folds}-fold cross-validation split...")

# Collect all files by category
category_files = {}
for category in categories:
    category_path = os.path.join(source_dir, category)
    files = [f for f in os.listdir(category_path)
             if os.path.isfile(os.path.join(category_path, f))]
    random.shuffle(files)  # Shuffle within each category
    category_files[category] = files

# Create fold assignments for each category
fold_assignments = defaultdict(lambda: defaultdict(list))

for category, files in category_files.items():
    n_files = len(files)
    fold_size = n_files // k_folds

    # Assign files to folds ensuring balanced distribution
    for fold_idx in range(k_folds):
        start_idx = fold_idx * fold_size
        # Last fold gets any remaining files
        end_idx = start_idx + fold_size if fold_idx < k_folds - 1 else n_files
        fold_assignments[fold_idx][category] = files[start_idx:end_idx]

# Create fold directories with class subdirectories only
for fold_idx in range(k_folds):
    fold_name = f'fold_{fold_idx + 1}'
    fold_dir = os.path.join(output_base, fold_name)

    # Create class subdirectories in each fold
    for category in categories:
        category_fold_dir = os.path.join(fold_dir, category)
        os.makedirs(category_fold_dir, exist_ok=True)

        # Copy the files assigned to this fold for this category
        files_for_this_fold = fold_assignments[fold_idx][category]

        for file in files_for_this_fold:
            src = os.path.join(source_dir, category, file)
            dst = os.path.join(category_fold_dir, file)
            shutil.copy2(src, dst)

    print(f"\n{fold_name}:")
    print("=" * 50)

    total_files = 0
    for category in categories:
        files_in_fold = fold_assignments[fold_idx][category]
        total_files += len(files_in_fold)
        print(f"  {category}: {len(files_in_fold)} files")

    print(f"  Total in fold: {total_files} files")

print(f"\n\nK-fold split complete!")
print(f"Output directory: {output_base}/")
print(f"Structure: fold_1/, fold_2/, ..., fold_{k_folds}/")
print(f"Each fold contains: {', '.join(categories)} subdirectories with their respective files")

print(f"\n\nK-fold split complete!")
print(f"Output directory: {output_base}/")
print(f"Structure: fold_1/, fold_2/, ..., fold_{k_folds}/")

### Bag of Visual Words

Since Bag of Features consist on 4 stages we decide to use classes to not hardcode each possible combination.

We separate them into descriptorExtractor class which is meant to be used in the first step of the Bag of Visual Word which is extracting the features.

For the clustering step we would usually simply keep always MiniBatchKMeans since it is the most scalable and faster than KMeans with similar accuracy; however, because we are keeping both in a class for a more theoretical approach where we can compare both 

Then the encoding part consist on how you analyze the clusters given an image. The original implementation is only with histograms, however, there are some alternatives like VLAD or Fisher vector that we also considered to take into account (to know difference between them in both accuracy and memory). Note that they are considered different algorithms and usually not considered part of BoVW but we decided to classify as encoding class because it doesn't break Bag of Features structure

Lastly is the pipeline which could be removed and do training and classifier with the encoders and independent classifiers. However, you can see that the best scalable approach in sklearn are pipelines; however, they are not meant to be used for image classification by default, so we adapt them to be able to follow Bag of Features algorithm  while keeping sklearning advantages (which are fitting, predicting and saving/loading models with joblib)

In [None]:
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.cluster import MiniBatchKMeans, KMeans
from sklearn.preprocessing import normalize
from abc import ABC, abstractmethod
from typing import Optional
from enum import Enum
import joblib
import cv2 as cv


class DescriptorType(Enum):
    """Enum for available descriptor types."""
    SIFT = "SIFT"
    ORB = "ORB"
    AKAZE = "AKAZE"


class DescriptorExtractor:
    """
    Unified interface for different feature descriptors.

    This class handles the quirks of different OpenCV detectors
    and provides a consistent interface.

    Parameters
    ----------
    descriptor_type : str or DescriptorType
        Type of descriptor to use ('SIFT', 'ORB', 'AKAZE', etc.)
    n_features : int, optional
        Maximum number of features to detect (for SIFT, ORB)
    **kwargs : dict
        Additional parameters for the specific detector
    """

    def __init__(
        self,
        descriptor_type: str = 'SIFT',
        n_features: Optional[int] = None,
        **kwargs
    ):
        self.descriptor_type = descriptor_type.upper() # Case sensitive
        self.n_features = n_features
        self.kwargs = kwargs
        self.detector = self._create_detector()

        # Store descriptor dimensionality
        self.descriptor_size = self._get_descriptor_size()

    def _create_detector(self):
        """Create the appropriate OpenCV detector."""
        dt = self.descriptor_type

        if dt == 'SIFT':
            if self.n_features is not None:
                return cv.SIFT_create(nfeatures=self.n_features, **self.kwargs)
            return cv.SIFT_create(**self.kwargs)

        elif dt == 'ORB':
            if self.n_features is not None:
                return cv.ORB_create(nfeatures=self.n_features, **self.kwargs)
            return cv.ORB_create(**self.kwargs)

        elif dt == 'AKAZE':
            # AKAZE doesn't have nfeatures parameter
            return cv.AKAZE_create(**self.kwargs)

        else:
            raise ValueError(
                f"Unknown descriptor type: {dt}. "
                f"Available: {[e.value for e in DescriptorType]}"
            )

    def _get_descriptor_size(self) -> int:
        """Get the descriptor dimensionality for each type."""
        size_map = {
            'SIFT': 128,
            'ORB': 32,
            'AKAZE': 61
        }
        return size_map.get(self.descriptor_type, 128)

    def extract(
        self,
        image: np.ndarray,
        return_keypoints: bool = False
    ) -> np.ndarray:
        """
        Extract descriptors from an image array.

        Parameters
        ----------
        image : np.ndarray
            Input image (grayscale or color)
        return_keypoints : bool, default=False
            If True, return (keypoints, descriptors) tuple

        Returns
        -------
        descriptors : np.ndarray
            Descriptor array of shape (n_keypoints, descriptor_size)
            Returns empty array if no keypoints detected
        """
        # Convert to grayscale if needed
        if len(image.shape) == 3:
            image = cv.cvtColor(image, cv.COLOR_BGR2GRAY)

        # Detect and compute
        keypoints, descriptors = self.detector.detectAndCompute(image, None)

        # Handle no detections
        if descriptors is None or len(keypoints) == 0:
            descriptors = np.zeros((0, self.descriptor_size), dtype=np.float32)
            keypoints = []
        else:
            # Ensure float32 for consistency
            descriptors = descriptors.astype(np.float32)

        if return_keypoints:
            return keypoints, descriptors
        return descriptors

    def extract_from_file(
        self,
        filepath: str,
        return_keypoints: bool = False
    ) -> np.ndarray:
        """
        Extract descriptors from an image file.

        Parameters
        ----------
        filepath : str
            Path to image file
        return_keypoints : bool, default=False
            If True, return (keypoints, descriptors) tuple

        Returns
        -------
        descriptors : np.ndarray
            Descriptor array or empty array if file can't be read
        """
        image = cv.imread(str(filepath), cv.IMREAD_GRAYSCALE)

        if image is None:
            empty_desc = np.zeros((0, self.descriptor_size), dtype=np.float32)
            if return_keypoints:
                return [], empty_desc
            return empty_desc

        return self.extract(image, return_keypoints=return_keypoints)

    def extract_batch(
        self,
        images: list,
        from_files: bool = False
    ) -> list:
        """
        Extract descriptors from multiple images.

        Parameters
        ----------
        images : list
            List of image arrays or file paths
        from_files : bool, default=False
            If True, treat images as file paths

        Returns
        -------
        list of np.ndarray
            List of descriptor arrays
        """
        if from_files:
            return [self.extract_from_file(img) for img in images]
        else:
            return [self.extract(img) for img in images]

    def load_descriptors(self, filepath: str):
        """Load descriptors from a .npy file."""
        descriptors = np.load(filepath, allow_pickle=True)
        if descriptors is None or descriptors.size == 0:
            return None

        # guarantee correct shape
        descriptors = np.asarray(descriptors, dtype=np.float32)

        if descriptors.ndim != 2 or descriptors.shape[1] != 128:
            return None

        return descriptors

    def __repr__(self):
        return (
            f"DescriptorExtractor(type={self.descriptor_type}, "
            f"n_features={self.n_features}, "
            f"descriptor_size={self.descriptor_size})"
        )


class ClusteringAlgorithm(ABC):
    """Base class for clustering algorithms."""

    def __init__(self, n_clusters: int, random_state: int = 42):
        self.n_clusters = n_clusters
        self.random_state = random_state
        self.model = None
        self._is_fitted = False

    @abstractmethod
    def fit(self, X: np.ndarray):
        pass

    @abstractmethod
    def fit_iterative(self, data_loader, load_func: Optional[callable] = None):
        pass

    def predict(self, X: np.ndarray) -> np.ndarray:
        if not self._is_fitted:
            raise ValueError("Model must be fitted before predicting.")
        return self.model.predict(X)

    @property
    def cluster_centers_(self) -> np.ndarray:
        if not self._is_fitted:
            raise ValueError("Model must be fitted first.")
        return self.model.cluster_centers_

    def descriptor_batch_generator(self,
    data_loader, load_func=None, max_descriptors=10000):
        """
        Yields batches of descriptors with a total number of rows up to max_descriptors.

        Args:
            data_loader: iterable of items to load descriptors from.
            load_func: optional function(item) -> descriptor array
            max_descriptors: max total rows per batch

        Yields:
            np.ndarray of shape (~max_descriptors, n_features)
        """
        batch = []
        current_size = 0   # number of descriptors accumulated

        for item in data_loader:
            data = load_func(item) if load_func else item

            # Skip empty or None
            if data is None or data.size == 0:
                continue

            # Ensure 2D
            if data.ndim == 1:
                data = data.reshape(1, -1)

            batch.append(data)
            current_size += data.shape[0]

            # If batch is full, yield it
            if current_size >= max_descriptors:
                yield np.vstack(batch)
                batch = []
                current_size = 0

        # Yield remainder
        if batch:
            yield np.vstack(batch)


class MiniBatchKMeansClustering(ClusteringAlgorithm):
    """MiniBatch KMeans with support for iterative fitting."""

    def __init__(self, n_clusters: int, batch_size: int = 1024, random_state: int = 42, **kwargs):
        super().__init__(n_clusters, random_state)
        self.batch_size = batch_size
        self.kwargs = kwargs
        self.model = MiniBatchKMeans(
            n_clusters=n_clusters,
            batch_size=batch_size,
            random_state=random_state,
            **kwargs
        )

    def fit(self, X: np.ndarray):
        self.model.fit(X)
        self._is_fitted = True
        return self

    def fit_iterative(
        self,
        data_loader,
        load_func: Optional[callable] = None,
        accumulate_batch_size: Optional[int] = None
    ):
        """
        Fit iteratively using partial_fit.

        Args:
            data_loader: Iterable of file paths or data chunks
            load_func: Function to load data from paths
            accumulate_batch_size: Size to accumulate before partial_fit
                                   (defaults to self.batch_size)
        """
        if accumulate_batch_size is None:
            accumulate_batch_size = self.batch_size

        for batch in self.descriptor_batch_generator(data_loader, load_func, accumulate_batch_size):
            self.model.partial_fit(batch)

        self._is_fitted = True
        return self

class KMeansClustering(ClusteringAlgorithm):
    """Standard KMeans clustering."""

    def __init__(self, n_clusters: int, random_state: int = 0, **kwargs):
        super().__init__(n_clusters, random_state)
        self.kwargs = kwargs
        self.model = KMeans(
            n_clusters=n_clusters,
            random_state=random_state,
            **kwargs
        )

    def fit(self, X: np.ndarray):
        """Fit KMeans on all data at once."""
        self.model.fit(X)
        self._is_fitted = True
        return self

    def fit_iterative(self, data_loader, load_func: Optional[callable] = None):
        """KMeans doesn't support iterative fitting - loads all data first."""
        print("Warning: KMeans doesn't support true iterative fitting. Loading all data...")

        all_data = []
        for item in data_loader:
            data = load_func(item) if load_func else item
            if data is not None and data.size > 0:
                all_data.append(data)

        if all_data:
            X = np.vstack(all_data)
            self.fit(X)
        return self


class EncodingAlgorithm(ABC):
    """Base class for encoding algorithms."""

    def __init__(self, clustering: ClusteringAlgorithm):
        self.clustering = clustering

    @abstractmethod
    def encode(self, descriptors: np.ndarray) -> np.ndarray:
        pass


class BagOfVisualWords(EncodingAlgorithm):
    """Bag of Visual Words encoding."""

    def __init__(self, clustering: ClusteringAlgorithm,
                 normalize_hist: bool = True,
                 norm_type: str = 'l2'):  # Add this: 'l1' or 'l2'
        super().__init__(clustering)
        self.normalize_hist = normalize_hist
        self.norm_type = norm_type

    def encode(self, descriptors: np.ndarray) -> np.ndarray:
        if descriptors is None or descriptors.size == 0:
            return np.zeros(self.clustering.n_clusters)

        labels = self.clustering.predict(descriptors)
        histogram = np.bincount(labels, minlength=self.clustering.n_clusters).astype(np.float32)

        if self.normalize_hist:
            if self.norm_type == 'l2':
                norm = np.linalg.norm(histogram)
                if norm > 0:
                    histogram = histogram / norm
            elif self.norm_type == 'l1':
                if histogram.sum() > 0:
                    histogram = histogram / histogram.sum()

        return histogram


class VLAD(EncodingAlgorithm):
    """Vector of Locally Aggregated Descriptors."""

    def __init__(self, clustering: ClusteringAlgorithm, normalize_vlad: bool = True):
        super().__init__(clustering)
        self.normalize_vlad = normalize_vlad

    def encode(self, descriptors: np.ndarray) -> np.ndarray:
        if descriptors is None or descriptors.size == 0:
            n_features = self.clustering.cluster_centers_.shape[1]
            return np.zeros(self.clustering.n_clusters * n_features)

        labels = self.clustering.predict(descriptors)
        centers = self.clustering.cluster_centers_
        n_clusters, n_features = centers.shape
        vlad = np.zeros((n_clusters, n_features))

        for idx in range(n_clusters):
            mask = (labels == idx)
            if np.any(mask):
                residuals = descriptors[mask] - centers[idx]
                vlad[idx] = residuals.sum(axis=0)

        vlad = vlad.flatten()
        vlad = vlad.reshape(n_clusters, n_features)
        vlad = normalize(vlad, norm='l2', axis=1)
        vlad = vlad.flatten()
        vlad = np.sign(vlad) * np.sqrt(np.abs(vlad))

        if self.normalize_vlad:
            vlad = normalize(vlad.reshape(1, -1), norm='l2').flatten()

        return vlad

# This a wrapper meant it to be compatible with sklearn pre-processing
# Since normal sklearn doesn't support complex pipelines with custom fitting/loading and image paths
class VisualEncodingTransformer(BaseEstimator, TransformerMixin):
    """
    sklearn-compatible wrapper for visual encoding pipelines.

    This allows you to:
    - Save/load with joblib
    - Use in sklearn Pipelines
    - Use with GridSearchCV
    - Keep your flexible architecture

    Parameters
    ----------
    clustering : ClusteringAlgorithm
        Custom clustering algorithm instance
    encoding : EncodingAlgorithm
        Custom encoding algorithm instance
    descriptor_extractor : callable, optional
        Function to extract descriptors from images
        Signature: descriptor_extractor(image_path) -> np.ndarray
    iterative_fit : bool, default=False
        If True, use iterative fitting (for large datasets)
    skip_clustering_fit: bool, default=False
        If True, skip fitting the clustering model
    """

    def __init__(
        self,
        clustering: ClusteringAlgorithm,
        encoding: EncodingAlgorithm,
        descriptor_extractor: Optional[callable] = None,
        iterative_fit: bool = False,
        skip_clustering_fit: bool = False
    ):
        self.clustering = clustering
        self.encoding = encoding
        self.descriptor_extractor = descriptor_extractor
        self.iterative_fit = iterative_fit
        self.skip_clustering_fit = skip_clustering_fit

    def fit(self, X, y=None):
        """
        Fit the clustering model.

        Parameters
        ----------
        X : array-like
            Either:
            - List of image paths (if descriptor_extractor is provided)
            - List of descriptor arrays
            - Single stacked array of all descriptors
        y : array-like, optional
            Target labels (unused, for sklearn compatibility)

        Returns
        -------
        self
        """
        # Check if clustering is already fitted and we should skip
        if self.skip_clustering_fit:
            if not self.clustering._is_fitted:
                raise ValueError(
                    "skip_clustering_fit=True but clustering is not fitted! "
                    "Either fit the clustering first or set skip_clustering_fit=False"
                )
            print("Skipping clustering fit (using pre-fitted clusters)")
            return self

        if self.descriptor_extractor is not None:
            # X is list of image paths
            if self.iterative_fit:
                self.clustering.fit_iterative(X, load_func=self.descriptor_extractor)
            else:
                # Extract all descriptors first
                all_descriptors = []
                for path in X:
                    desc = self.descriptor_extractor(path)
                    if desc is not None and desc.size > 0:
                        all_descriptors.append(desc)
                if all_descriptors:
                    all_descriptors = np.vstack(all_descriptors)
                    self.clustering.fit(all_descriptors)
        else:
            # X is already descriptors
            if isinstance(X, list):
                # List of descriptor arrays - stack them
                X = np.vstack([d for d in X if d is not None and d.size > 0])
            self.clustering.fit(X)

        return self

    def transform(self, X):
        """
        Encode images/descriptors into fixed-length vectors.

        Parameters
        ----------
        X : array-like
            Either:
            - List of image paths (if descriptor_extractor is provided)
            - List of descriptor arrays (each image's descriptors)

        Returns
        -------
        np.ndarray
            Encoded vectors, shape (n_samples, encoding_dim)
        """
        if self.descriptor_extractor is not None:
            # X is list of image paths
            descriptors_list = [self.descriptor_extractor(path) for path in X]
        else:
            # X is already list of descriptor arrays
            descriptors_list = X

        # Encode each image's descriptors
        encoded = []
        for descriptors in descriptors_list:
            vector = self.encoding.encode(descriptors)
            encoded.append(vector)

        return np.array(encoded)

    def get_params(self, deep=True):
        """Get parameters for sklearn compatibility."""
        params = {
            'clustering': self.clustering,
            'encoding': self.encoding,
            'descriptor_extractor': self.descriptor_extractor,
            'iterative_fit': self.iterative_fit
        }

        if deep and hasattr(self.clustering, 'get_params'):
            # Include clustering params with prefix
            cluster_params = self.clustering.__dict__.copy()
            params.update({f'clustering__{k}': v for k, v in cluster_params.items()})

        return params

    def set_params(self, **params):
        """Set parameters for sklearn compatibility."""
        for key, value in params.items():
            if '__' in key:
                # Handle nested parameters like 'clustering__n_clusters'
                obj_name, param_name = key.split('__', 1)
                obj = getattr(self, obj_name)
                setattr(obj, param_name, value)
            else:
                setattr(self, key, value)
        return self


# ============================================================================
# Convenience Factory Functions
# ============================================================================

def create_bovw_pipeline(
    n_clusters: int = 512,
    batch_size: int = 1024,
    normalize: bool = True,
    descriptor_extractor: Optional[DescriptorExtractor] = None,  # Changed type hint
    iterative_fit: bool = False,
    clustering: Optional[ClusteringAlgorithm] = None  # NEW: optional pre-fitted
) -> VisualEncodingTransformer:
    """
    Create a BoVW encoding pipeline.

    Example:
        >>> extractor = DescriptorExtractor.create_sift(n_features=500)
        >>> bovw = create_bovw_pipeline(
        ...     n_clusters=1024,
        ...     descriptor_extractor=extractor.extract_from_file
        ... )
        >>> bovw.fit(train_image_paths)
    """

    if clustering is None:
        clustering = MiniBatchKMeansClustering(
            n_clusters=n_clusters,
            batch_size=batch_size,
            random_state=42
        )
        skip_fit = False
    else:
        # Check if already fitted
        skip_fit = clustering._is_fitted
        if skip_fit:
            print(f"Using pre-fitted clustering with {clustering.n_clusters} clusters")

    encoding = BagOfVisualWords(clustering, normalize_hist=normalize)

    # Convert DescriptorExtractor to callable if provided
    extract_func = None
    if descriptor_extractor is not None:
        if isinstance(descriptor_extractor, DescriptorExtractor):
            extract_func = descriptor_extractor.extract_from_file
        else:
            extract_func = descriptor_extractor

    return VisualEncodingTransformer(
        clustering=clustering,
        encoding=encoding,
        descriptor_extractor=extract_func,
        iterative_fit=iterative_fit,
        skip_clustering_fit=skip_fit
    )


def create_vlad_pipeline(
    n_clusters: int = 256,
    batch_size: int = 1024,
    normalize: bool = True,
    descriptor_extractor: Optional[DescriptorExtractor] = None,  # Changed type hint
    iterative_fit: bool = False,
    clustering: Optional[ClusteringAlgorithm] = None  # NEW: optional pre-fitted
) -> VisualEncodingTransformer:
    """Create a VLAD encoding pipeline."""
    if clustering is None:
        clustering = MiniBatchKMeansClustering(
            n_clusters=n_clusters,
            batch_size=batch_size,
            random_state=42
        )
        skip_fit = False
    else:
        # Check if already fitted
        skip_fit = clustering._is_fitted
        if skip_fit:
            print(f"Using pre-fitted clustering with {clustering.n_clusters} clusters")
    encoding = VLAD(clustering, normalize_vlad=normalize)

    # Convert DescriptorExtractor to callable if provided
    extract_func = None
    if descriptor_extractor is not None:
        if isinstance(descriptor_extractor, DescriptorExtractor):
            extract_func = descriptor_extractor.extract_from_file
        else:
            extract_func = descriptor_extractor
            print("dx")

    return VisualEncodingTransformer(
        clustering=clustering,
        encoding=encoding,
        descriptor_extractor=extract_func,
        iterative_fit=iterative_fit,
        skip_clustering_fit=skip_fit
    )


# **Training**

### Basic structure/Standard

In [None]:
from pathlib import Path
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split


le = LabelEncoder()
# It is easier for metrics and evaluation to have encoded labels (numeric instead of strings)
labels_encoded = le.fit_transform(labels)

sift = DescriptorExtractor(descriptor_type='SIFT')
output_dir = "descriptors"
os.makedirs(output_dir, exist_ok=True)
for x in (image_paths):
  p = Path(x)
  descriptors = sift.extract_from_file(x)

  np.save(f"{output_dir}/{p.stem}.npy", descriptors)

# To test different algorithms we saved the descriptors to only extract them once
image_paths = [f"{output_dir}/{Path(x).stem}.npy" for x in image_paths]

# Note that stratify makes sure that the class distribution is maintained in train and test splits (important in classification tasks)
X_train, X_test, y_train, y_test = train_test_split(image_paths, labels_encoded, test_size=0.2, random_state=0, stratify=labels)


# Save and load
# joblib.dump(full_pipeline, 'image_classifier.pkl')
# loaded_pipeline = joblib.load('image_classifier.pkl')
# predictions = loaded_pipeline.predict(test_paths)

## Cross-Validation / Tuning Parameters

This part is only meant to be run for calibrating and tuning the hyperparameters of models (like number of clusters, generalization (C), max-depth...)

In [None]:
# Create sklearn-compatible transformer

# The one for cross-validation we need to fit in every fold (else it would leak data)
bovw_cv = create_bovw_pipeline(
    n_clusters=2048,
    descriptor_extractor=sift.load_descriptors,
    iterative_fit=True
)

# For single validation we could proceed to fit directly and re-separating the training data
# X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=0, stratify=y_train)

# For this case we can simply fit one the whole training data and compare with the single validation fold
# clustering = MiniBatchKMeansClustering(
#     n_clusters=2048,
#     batch_size=10000
# )

# clustering.fit_iterative(
#     data_loader=X_train,
#     load_func=sift.load_descriptors
# )

# With this approach we highly reduce training time since we don't need to constantly fit the clustering in every fold
# bovw = create_bovw_pipeline(
#     clustering=clustering,
#     descriptor_extractor=sift.load_descriptors
# )

In [None]:
from sklearn.model_selection import cross_validation, StratifiedKFold
from sklearn.svm import SVC

pipeline = Pipeline([
    ('encoding', bovw_cv),
    ('classifier',  # You can change the classifier here
     # For example SVM
     SVC(kernel='linear', C=1.0, random_state=42)
    )
])

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_results = cross_validation(
    pipeline, 
    X_train, 
    y_train, 
    cv=cv,
    scoring='accuracy',
    verbose=2
)

for fold in range(5):
    print(f"Fold {fold+1}: Train={cv_results['train_score'][fold]:.4f}, "
          f"Val={cv_results['test_score'][fold]:.4f}")

print(f"\nMean Train: {cv_results['train_score'].mean():.4f} ± {cv_results['train_score'].std():.4f}")
print(f"Mean Val:   {cv_results['test_score'].mean():.4f} ± {cv_results['test_score'].std():.4f}")
print(f"Overfit:    {cv_results['train_score'].mean() - cv_results['test_score'].mean():.4f}")

In [None]:
# Cross validation can also be done manually where you see each fold explicitly
from sklearn.model_selection import StratifiedKFold

skf = StratifiedKFold(n_splits=5, shuffle=True)
accuracies = []

for train_idx, val_idx in skf.split(X_train, y_train):
    X_tr, X_val = [X_train[i] for i in train_idx], [X_train[i] for i in val_idx]
    y_tr, y_val = y_train[train_idx], y_train[val_idx]

    # Fit the pipeline
    pipeline.fit(X_tr, y_tr)

    # Evaluate
    test_accuracy = pipeline.score(X_val, y_val)
    train_accuracy = pipeline.score(X_tr, y_tr)
    accuracies.append([train_accuracy, test_accuracy])
    print(f"Fold Train accuracy: {train_accuracy:.4f}")
    print(f"Fold Validation Accuracy: {test_accuracy:.4f}")

print("\nCross-validation results:")
accuracies = np.array(accuracies)
print(f"Mean Train: {accuracies[:,0].mean():.4f} ± {accuracies[:,0].std():.4f}")
print(f"Mean Val: {accuracies[:,1].mean():.4f} ± {accuracies[:,1].std():.4f}")
print(f"Overfit: {accuracies[:,0].mean() - accuracies[:,1].mean():.4f}")

## Production Ready

This part is only meant to be run once after you have already defined the best hyperparameter for the models

It generates metrics data and safe models pipeline to be used to predict other dataset 

### Metrics

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, classification_report, roc_auc_score, roc_curve
)
from sklearn.preprocessing import label_binarize
from sklearn.pipeline import Pipeline
import time
import joblib


def evaluate_model(
    pipeline, model_name,
    X_train, y_train,
    X_test, y_test,
    le,
    verbose=True,
    save_model_path=None
):
    """
    Train and evaluate a single model.
    Returns: dict of metrics, predictions, pipeline, and timing info.
    """

    # Train
    if verbose:
        print(f"\n{'='*60}")
        print(f"Training: {model_name}")
        print(f"{'='*60}")

    start_time = time.time()
    pipeline.fit(X_train, y_train)
    train_time = time.time() - start_time

    if verbose:
        print(f"✓ Trained in {train_time:.2f}s")

    # Predict
    y_train_pred = pipeline.predict(X_train)
    y_test_pred = pipeline.predict(X_test)
    inference_time = (time.time() - start_time) / len(X_test)

    # Metrics
    train_acc = accuracy_score(y_train, y_train_pred)
    test_acc  = accuracy_score(y_test, y_test_pred)
    precision = precision_score(y_test, y_test_pred, average='macro', zero_division=0)
    recall    = recall_score(y_test, y_test_pred, average='macro', zero_division=0)
    f1        = f1_score(y_test, y_test_pred, average='macro', zero_division=0)
    overfit   = train_acc - test_acc

    # Try probabilities & ROC-AUC
    try:
        y_proba = pipeline.predict_proba(X_test)
        roc_auc = roc_auc_score(y_test, y_proba, multi_class='ovr', average='macro')
    except Exception:
        y_proba = None
        roc_auc = np.nan

    # Confusion matrix
    cm = confusion_matrix(y_test, y_test_pred)

    results = {
        'model_name': model_name,
        'pipeline': pipeline,
        'train_time': train_time,
        'inference_time_ms': inference_time * 1000,
        'train_accuracy': train_acc,
        'test_accuracy': test_acc,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'roc_auc': roc_auc,
        'overfit': overfit,
        'y_test': y_test,
        'y_pred': y_test_pred,
        'y_proba': y_proba,
        'confusion_matrix': cm,
        'X_test': X_test,
        'label_encoder': le
    }

    if verbose:
        print(f"\n{'─'*60}")
        print(f"  {model_name} - Summary")
        print(f"{'─'*60}")
        print(f"  Train Acc:   {train_acc:.4f}")
        print(f"  Test Acc:    {test_acc:.4f}")
        print(f"  Precision:   {precision:.4f}")
        print(f"  Recall:      {recall:.4f}")
        print(f"  F1:          {f1:.4f}")
        if not np.isnan(roc_auc):
            print(f"  ROC-AUC:     {roc_auc:.4f}")
        print(f"  Overfit:     {overfit:.4f}")
        print(f"  Train Time:  {train_time:.2f}s")
        print(f"  Inference:   {inference_time * 1000:.3f}ms/sample")
        print(f"{'─'*60}\n")

    # Save model if requested
    if save_model_path:
        joblib.dump(pipeline, save_model_path)
        if verbose:
            print(f"✓ Model saved to {save_model_path}")

    return results

def plot_confusion_matrix(results, normalize=True, figsize=(10, 8), save_path=None):
    cm = results['confusion_matrix']
    le = results['label_encoder']

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        fmt = '.2f'
        title = f"{results['model_name']}\nNormalized Confusion Matrix"
    else:
        fmt = 'd'
        title = f"{results['model_name']}\nConfusion Matrix"

    plt.figure(figsize=figsize)
    sns.heatmap(cm, annot=True, fmt=fmt, cmap='Blues',
                xticklabels=le.classes_,
                yticklabels=le.classes_,
                cbar_kws={'label': 'Proportion' if normalize else 'Count'})
    plt.title(title, fontsize=14, fontweight='bold', pad=20)
    plt.xlabel('Predicted Label', fontweight='bold')
    plt.ylabel('True Label', fontweight='bold')
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        print(f"✓ Saved to {save_path}")

    plt.show()


def plot_roc_curve(results, figsize=(12, 5), save_path=None):
    y_proba = results['y_proba']
    if y_proba is None:
        print("⚠ Skipping ROC: model doesn’t support predict_proba()")
        return

    y_test = results['y_test']
    le = results['label_encoder']
    n_classes = len(le.classes_)
    y_test_bin = label_binarize(y_test, classes=range(n_classes))

    fig, axes = plt.subplots(1, 2, figsize=figsize)
    fpr = dict()
    tpr = dict()

    # Compute per-class ROC
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_proba[:, i])

    # Macro-average ROC
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(n_classes):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= n_classes
    macro_auc = np.trapz(mean_tpr, all_fpr)

    # Plot
    ax = axes[0]
    ax.plot(all_fpr, mean_tpr, linewidth=3, label=f'Macro-avg (AUC={macro_auc:.3f})')
    ax.plot([0, 1], [0, 1], 'k--', linewidth=1.5, label='Random')
    ax.set_xlabel('FPR', fontweight='bold')
    ax.set_ylabel('TPR', fontweight='bold')
    ax.set_title(f"{results['model_name']}\nMacro-Average ROC", fontweight='bold')
    ax.legend(loc='lower right')
    ax.grid(alpha=0.3)

    # Per-class (top 10)
    ax = axes[1]
    for i in range(min(n_classes, 10)):
        auc_i = np.trapz(tpr[i], fpr[i])
        ax.plot(fpr[i], tpr[i], label=f'{le.classes_[i]} ({auc_i:.2f})', linewidth=1.5)
    ax.plot([0, 1], [0, 1], 'k--', alpha=0.5)
    ax.set_xlabel('FPR', fontweight='bold')
    ax.set_ylabel('TPR', fontweight='bold')
    ax.set_title(f"{results['model_name']}\nPer-Class ROC", fontweight='bold')
    ax.legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=8)
    ax.grid(alpha=0.3)

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        print(f"✓ Saved to {save_path}")
    plt.show()

def print_classification_report(results):
    y_test, y_pred = results['y_test'], results['y_pred']
    le = results['label_encoder']
    print(f"\n{'='*70}")
    print(f"Classification Report: {results['model_name']}")
    print(f"{'='*70}\n")
    print(classification_report(y_test, y_pred, target_names=le.classes_, digits=4))

## Models

In [None]:
import os
models_dir = "models"
os.makedirs(models_dir, exist_ok=True)

clustering = MiniBatchKMeansClustering(
    n_clusters=2048,
    batch_size=10000
)

clustering.fit_iterative(
    data_loader=X_train,
    load_func=sift.load_descriptors
)

# With this approach we highly reduce training time since we don't need to constantly fit the clustering in every fold
bovw = create_bovw_pipeline(
    clustering=clustering,
    descriptor_extractor=sift.load_descriptors
)

vlad = create_vlad_pipeline(
    clustering=clustering,
    descriptor_extractor=sift.load_descriptors
)

### Naive Gaussian Bayes

In [None]:
from sklearn.naive_bayes import GaussianNB
from sklearn.preprocessing import Normalizer
bovw_gnb_pipeline = Pipeline([
        ('encoding', bovw),
        ('classifier', GaussianNB())
])

results_bovw_gnb = evaluate_model(
        bovw_gnb_pipeline,
        model_name="BoVW + GaussianNB",
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        le=le,
        verbose=True,
        save_model_path=f"{models_dir}/bovw_gnb_model.pkl"
        )
plot_confusion_matrix(results_bovw_gnb, normalize=True, save_path="metrics/bovw_gnb_confusion_matrix.png")
plot_roc_curve(results_bovw_gnb, save_path="metrics/bovw_gnb_roc_curve.png")
print_classification_report(results_bovw_gnb)

### Softmax

In [None]:
from sklearn.linear_model import LogisticRegression

bovw_softmax_pipeline = Pipeline([
        ('encoding', bovw),
        ('classifier', LogisticRegression( multi_class='multinomial',  # for multiclass classification (Softmax)
            solver='lbfgs',
            max_iter=500
))
])

results_bovw_softmax = evaluate_model(
        bovw_softmax_pipeline,
        model_name="BoVW + Softmax",
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        le=le,
        verbose=True,
        save_model_path=models_dir + "/bovw_softmax_model.pkl"
        )
plot_confusion_matrix(results_bovw_softmax, normalize=True, save_path="metrics/bovw_softmax_confusion_matrix.png")
plot_roc_curve(results_bovw_softmax, save_path="metrics/bovw_softmax_roc_curve.png")
print_classification_report(results_bovw_softmax)

### Decisition Tree

In [None]:
from sklearn.tree import DecisionTreeClassifier

bovw_dt_pipeline = Pipeline([
        ('encoding', bovw),
        ('normalizer', Normalizer(norm='l2'))
        ('classifier', DecisionTreeClassifier(
            max_depth=15,
            random_state=42
        ))
])

results_bovw_dt = evaluate_model(
        bovw_dt_pipeline,
        model_name="BoVW + Decision Tree",
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        le=le,
        verbose=True,
        )
plot_confusion_matrix(results_bovw_dt, normalize=True, save_path="metrics/bovw_dt_confusion_matrix.png")
plot_roc_curve(results_bovw_dt, save_path="metrics/bovw_dt_roc_curve.png")
print_classification_report(results_bovw_dt)

### Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier

bovw_rf_pipeline = Pipeline([
        ('encoding', bovw),
        ('classifier', RandomForestClassifier(n_estimators=350, random_state=42))
])

results_bovw_rf = evaluate_model(
        bovw_rf_pipeline,
        model_name="BoVW + Random Forest",
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        le=le,
        verbose=True,
        save_model_path=f"{models_dir}/bovw_rf_model.pkl"
        )
plot_confusion_matrix(results_bovw_rf, normalize=True, save_path="metrics/bovw_rf_confusion_matrix.png")
plot_roc_curve(results_bovw_rf, save_path="metrics/bovw_rf_roc_curve.png")
print_classification_report(results_bovw_rf)

### Support Vector Machine

In [None]:
from sklearn.svm import SVC
# Use in sklearn Pipeline
bovw_svc_lin_pipeline = Pipeline([
        ('encoding', bovw),
        ('classifier', SVC(kernel='linear', C=10.0, probability=True))
])
    

results_bovw_svc_lin = evaluate_model(
        bovw_svc_lin_pipeline,
        model_name="BoVW + Support Vector Classifier (Linear)",
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        le=le,
        verbose=True,
        save_model_path=f"{models_dir}/bovw_svc_lin_model.pkl"
        )
plot_confusion_matrix(results_bovw_svc_lin, normalize=True, save_path="metrics/bovw_svc_lin_confusion_matrix.png")
plot_roc_curve(results_bovw_svc_lin, save_path="metrics/bovw_svc_lin_roc_curve.png")
print_classification_report(results_bovw_svc_lin)

In [None]:
from sklearn.svm import SVC
# Use in sklearn Pipeline
bovw_svc_rbf_pipeline = Pipeline([
        ('encoding', bovw),
        ('classifier', SVC(kernel='rbf', C=10.0, gamma='scale', probability=True))
])
    

results_bovw_svc_rbf = evaluate_model(
        bovw_svc_rbf_pipeline,
        model_name="BoVW + Support Vector Classifier (RBF)",
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        le=le,
        verbose=True,
        save_model_path=f"{models_dir}/bovw_svc_rbf_model.pkl"
        )
plot_confusion_matrix(results_bovw_svc_rbf, normalize=True, save_path="metrics/bovw_svc_rbf_confusion_matrix.png")
plot_roc_curve(results_bovw_svc_rbf, save_path="metrics/bovw_svc_rbf_roc_curve.png")
print_classification_report(results_bovw_svc_rbf)