### Dependencies

In [1]:
import pandas as pd
import numpy as np
from collections import Counter
import pickle
#from tqdm import tqdm as progress_bar
import cv2
from sklearn.metrics import f1_score, confusion_matrix
from sklearn.model_selection import GridSearchCV, train_test_split
from tqdm import tqdm as progress_bar

from keras.datasets import cifar10

import os
import cv2
import pickle
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.cluster import MiniBatchKMeans
from scipy.spatial.distance import cdist
from Born_Functions import *

from itertools import combinations

FE = Feature_Extraction()




### Classes

#### Preprocessing

In [2]:
# Classe per il preprocessing
class Preprocessing:
    def __init__(self, extractor_dict):
        self.extractor_dict = extractor_dict

    def extract_descriptors(self, image, extractor):
        '''
        Extract the features of an image using a given extractor.
        Inputs:
            image: grayscale image of size (32, 32)
        Outputs:
            descriptors: np.array of size (extractor_len)
            kpoints: list of keypoints detected in the image
        '''
        kpoints, descriptors = extractor.detectAndCompute(image, None)
        
        if descriptors is None:
            return None, None
        
        return descriptors, kpoints

    def run_preprocess(self, X, y, extractor_name, test_size):
        '''
        Image preprocessing to create train and test sets, removing images without descriptors.
        Inputs:
            X: list of RGB or grayscale images
            y: labels associated with the images
            extractor_name: key name of the desired feature extractor
            test_size: fraction of the dataset to be used as test set
        Outputs:
            descriptors_train: list of training set descriptors
            descriptors_test: list of test set descriptors
            y_train: training set labels
            y_test: test set labels
        '''
        extractor = self.extractor_dict[extractor_name]
        filtered_X = []
        filtered_y = []
        missing_count = 0

        for img, label in zip(X, y):
            descriptors, kpoints = self.extract_descriptors(img, extractor)
            if descriptors is not None:
                filtered_X.append(descriptors)
                filtered_y.append(label)
            else:
                missing_count += 1

        print(f"Removed {missing_count} images due to missing descriptors.")

        descriptors_train, descriptors_test, y_train, y_test = train_test_split(filtered_X, filtered_y, test_size=test_size, random_state=19)
        
        return descriptors_train, descriptors_test, y_train, y_test



#### Extract_histograms

In [3]:
class Extract_Histograms:
    def __init__(self, supported_distance_metrics, k_values):
        self.supported_distance_metrics = supported_distance_metrics
        self.k_values = k_values

    def build_visual_vocabulary(self, descriptors_list, k):
        features_reshaped = np.vstack([desc for desc in descriptors_list if desc is not None])
        minibatch_kmeans = MiniBatchKMeans(n_clusters=k, random_state=0, batch_size=100, n_init=3)
        minibatch_kmeans.fit(features_reshaped)
        visual_vocab = minibatch_kmeans.cluster_centers_
        return visual_vocab

    def build_histograms(self, descriptors_list, visual_vocab, distance_metric):
        k = visual_vocab.shape[0]
        histograms = np.zeros((len(descriptors_list), k))
        for i, descriptors in enumerate(descriptors_list):
            if descriptors is None:
                continue
            distances = cdist(descriptors, visual_vocab, metric=distance_metric)
            closest_clusters = np.argmin(distances, axis=1)
            for cluster_idx in closest_clusters:
                histograms[i][cluster_idx] += 1
        return histograms

    def run_configuration(self, descriptors_train, descriptors_test, k, distance_metric_):
        visual_vocab = self.build_visual_vocabulary(descriptors_train, k)
        distance_metric = self.supported_distance_metrics[distance_metric_]
        histograms_train = self.build_histograms(descriptors_train, visual_vocab, distance_metric)
        histograms_test = self.build_histograms(descriptors_test, visual_vocab, distance_metric)
        
        return histograms_train, histograms_test



#### all_config_function

In [16]:
def run_all_configurations(Dataset, pairs, X, y, extractor_dict, supported_distance_metrics, k_values, repo, test_size=0.2):
    preprocessing = Preprocessing(extractor_dict)
    extract_histograms = Extract_Histograms(supported_distance_metrics, k_values)
    def get_class_pairs(X_, y_, pair):
        X_cop, y_cop = X_.copy(), y_.copy()
        indices = np.isin(y, pair)
        Xsub = X_cop[indices]
        ysub = y_cop[indices]
        return Xsub, ysub
    
    if not os.path.exists(repo):
        os.makedirs(repo)
    
    for pair in pairs:
        X_sub, y_sub = get_class_pairs(X,y, pair)
        descriptors_train, descriptors_test, y_train, y_test = preprocessing.run_preprocess(X_sub, y_sub, 'SIFT', test_size)
        histograms_train, histograms_test = extract_histograms.run_configuration(descriptors_train, descriptors_test, 2500, 'euclidean')
        
        file_name = f'{Dataset}_{pair}.pkl'
        file_path = os.path.join(repo, file_name)
        
        with open(file_path, 'wb') as f:
            pickle.dump({
                'histograms_train': histograms_train,
                'y_train': y_train,
                'histograms_test': histograms_test,
                'y_test': y_test
            }, f)
        
        print(f'Saved configuration: {file_name}')


### Run

In [9]:
##CIFAR10
# (x_train, y_train), (x_test, y_test) = cifar10.load_data()


# X = np.concatenate((x_train, x_test), axis=0)
# y = np.concatenate((y_train, y_test), axis=0).flatten()

# X =  np.array([FE.convert_to_grayscale(img) for img in X])

# ##IMAGENET
df = pd.read_pickle("imagenet.pkl")
df.columns = ["images", "labels"]

# resize images
image_size = (256, 256)

# Ridimensionare tutte le immagini e convertirle in un array
X = np.array([cv2.resize(img, image_size, interpolation=cv2.INTER_AREA) for img in df["images"]])
y = df["labels"].to_numpy()

In [14]:
extractor_dict = {
    'SIFT': cv2.SIFT_create()
}

supported_distance_metrics = {
    'euclidean': 'euclidean'
}

k_values = [2500]


class_pairs = list(combinations(range(10), 2))

In [17]:
run_all_configurations('IMAGENET', class_pairs, X, y, extractor_dict, supported_distance_metrics, k_values, 'Pair_imagenet_repository')

Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 1).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 2).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 3).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 4).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 5).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 6).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 7).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 8).pkl
Removed 1 images due to missing descriptors.
Saved configuration: IMAGENET_(0, 9).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(1, 2).pkl
Removed 0 images due to missing descriptors.
Saved configuration: IMAGENET_(1, 3).pkl
Removed 0 images due to missing descriptors.
Saved con