In [4]:
# !pip install kagglehub
import kagglehub

# Download latest version
path = kagglehub.dataset_download("imbikramsaha/caltech-101")

print("Path to dataset files:", path)

Path to dataset files: C:\Users\Ducanh\.cache\kagglehub\datasets\imbikramsaha\caltech-101\versions\1


In [11]:
import glob
import os

path = "caltech-101/"

image_paths = glob.glob(f"{path}/*/*")
labels = [os.path.basename(os.path.dirname(image_path)) for image_path in image_paths]
print(len(labels), len(set(labels)))


9145 102


In [13]:
import random
from sklearn.model_selection import train_test_split


def sample_labels_and_images(
    image_paths,
    labels,
    num_labels=10,
    samples_per_label=10,
    test_size=0.2,
    random_state=42,
):
    unique_labels = list(set(labels))
    selected_unique_labels = random.sample(
        unique_labels, min(num_labels, len(unique_labels))
    )

    label_to_paths = {}
    for path, label in zip(image_paths, labels):
        if label in selected_unique_labels:
            if label not in label_to_paths:
                label_to_paths[label] = []
            label_to_paths[label].append(path)

    sampled_image_paths = []
    sampled_labels = []

    for label, paths in label_to_paths.items():
        sample_count = min(samples_per_label, len(paths))
        sampled_paths = random.sample(paths, sample_count)
        sampled_image_paths.extend(sampled_paths)
        sampled_labels.extend([label] * sample_count)

    train_image_paths, test_image_paths, train_labels, test_labels = train_test_split(
        sampled_image_paths,
        sampled_labels,
        test_size=test_size,
        stratify=sampled_labels,
        random_state=random_state,
    )

    return (
        train_image_paths,
        train_labels,
        test_image_paths,
        test_labels,
        selected_unique_labels,
    )



In [20]:
import time
import cv2
import numpy as np
from sklearn.cluster import KMeans
from tqdm import tqdm
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.metrics.pairwise import cosine_similarity


def top_k_accuracy(y_true, y_top_k_predict,k=3):
    correct_predictions = sum(
        1 for true, top_k in zip(y_true, y_top_k_predict) 
        if true in top_k[:k]
    )
    accuracy = correct_predictions / len(y_true) * 100
    
    return accuracy

def mean_reciprocal_rank(y_true, y_top_k_predict):
    reciprocal_ranks = []
    for true_label, top_k_preds in zip(y_true, y_top_k_predict):
        # Find the rank of the true label in predictions
        try:
            rank = np.where(top_k_preds==true_label)[0][0]+1
            reciprocal_ranks.append(1 / rank)
        except:
            # True label not found in predictions
            reciprocal_ranks.append(0)
    
    # Calculate Mean Reciprocal Rank
    mrr = sum(reciprocal_ranks) / len(reciprocal_ranks)
    
    return mrr

class CookBook:
    def __init__(
        self, train_paths, train_labels, test_paths, test_labels, n_cluster=100
    ):
        self.train_paths = train_paths
        self.train_labels = np.array(train_labels)
        self.test_paths = test_paths
        self.test_labels = np.array(test_labels)
        self.n_clusters = n_cluster
        self.train_features = None
        self.test_features = None
        self.train()

    def extract_sift_features(self, image_paths):
        if type(image_paths) == str:
            image_paths = [image_paths]

        sift = cv2.SIFT_create()

        # Lists to store descriptors
        all_descriptors = []
        feature_counts = []

        # Extract SIFT features from each image
        for path in tqdm(image_paths, desc="Extracting SIFT Features"):
            img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)

            _, descriptors = sift.detectAndCompute(img, None)

            # If descriptors are found, add them to the list
            if descriptors is not None:
                all_descriptors.append(descriptors)
                feature_counts.append(len(descriptors))

        return all_descriptors, feature_counts

    def train(self):
        start = time.time()
        self.descriptors, _ = self.extract_sift_features(self.train_paths)
        descriptors_stack = np.vstack(self.descriptors)
        print("Kmean clustering...")
        self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42, n_init=10)
        self.kmeans.fit(descriptors_stack)

        bow_features = []
        for descriptors in self.descriptors:
            # If no descriptors, return zero vector
            if descriptors is None or len(descriptors) == 0:
                return np.zeros(self.n_clusters)

            # Assign descriptors to nearest visual words
            visual_words = self.kmeans.predict(descriptors)

            # Compute histogram
            histogram, _ = np.histogram(visual_words, bins=range(self.n_clusters + 1))
            bow_features.append(histogram)
        self.train_features = bow_features
        
        self.tfidf_transformer = TfidfTransformer()
        self.train_features_tfidf = self.tfidf_transformer.fit_transform(self.train_features)


        self.train_probs = np.array([hist / np.sum(hist) for hist in self.train_features])
        self.train_probs = np.clip(self.train_probs, 1e-10, None)
        print("Total Training Time:", time.time()-start)
        
    def indexing(self, image_paths):
        if not self.kmeans:
            raise Exception("model haven't train")

        all_descriptors, _ = self.extract_sift_features(image_paths)

        bow_features = []
        for descriptors in all_descriptors:
            # If no descriptors, return zero vector
            if descriptors is None or len(descriptors) == 0:
                return np.zeros(self.n_clusters)

            # Assign descriptors to nearest visual words
            visual_words = self.kmeans.predict(descriptors)

            # Compute histogram
            histogram, _ = np.histogram(visual_words, bins=range(self.n_clusters + 1))
            bow_features.append(histogram)
        return np.array(bow_features)

    def common_word_retrieval(self, query, k=10):
        common_words = np.minimum(query, self.train_features).sum(axis=1)
        top_indices = np.argsort(common_words)[::-1][:k]
        top_labels = self.train_labels[top_indices]

        return top_labels

    def tfidf_retrieval(self, query, k=10):
        query_tfidf = self.tfidf_transformer.transform(query.reshape(1, -1))
        similarities = cosine_similarity(query_tfidf, self.train_features_tfidf).flatten()
        top_indices = np.argsort(similarities)[::-1][:k]
        top_labels = self.train_labels[top_indices]

        return top_labels  
      
    def KL_divergence_retrieval(self, query, k=10):
        query_prob = query / np.sum(query)
        
        query_prob = np.clip(query_prob, 1e-10, None)

        # Compute KL divergence for all training histograms simultaneously
        kl_divergences = np.sum(query_prob * np.log(query_prob / self.train_probs), axis=1)
        
        # Get the indices of the top-k smallest KL divergences
        top_indices = np.argsort(kl_divergences)[:k]
        
        # Retrieve corresponding labels
        top_labels = self.train_labels[top_indices]
        
        return top_labels

    def evaluate(self, test="common_word_retrieval"):

        if self.test_features is None:
            self.test_features = self.indexing(self.test_paths)
        start = time.time()
        if test =="common_word_retrieval":
            self.test_retrival_results = np.array([self.common_word_retrieval(query) for query in self.test_features])
            self.train_retrival_results = np.array([self.common_word_retrieval(query) for query in self.train_features])
        
        if test =="tfidf_retrieval":
            self.test_retrival_results = np.array([self.tfidf_retrieval(query) for query in self.test_features])
            self.train_retrival_results = np.array([self.tfidf_retrieval(query) for query in self.train_features])    
        
        if test == "KL_divergence_retrieval":
            self.test_retrival_results = np.array([self.KL_divergence_retrieval(query) for query in self.test_features])
            self.train_retrival_results = np.array([self.KL_divergence_retrieval(query) for query in self.train_features])    
        running_time = time.time() - start
        print(test)
        print("n_cluster = ", self.n_clusters)
        print("Train top 3 accuracy:", top_k_accuracy(self.train_labels, self.train_retrival_results))
        print("Test top 3 accuracy:", top_k_accuracy(self.test_labels, self.test_retrival_results))
        
        print("Train mean reciprocal rank:", mean_reciprocal_rank(self.train_labels, self.train_retrival_results))
        print("Test mean reciprocal rank:", mean_reciprocal_rank(self.test_labels, self.test_retrival_results))
        print("Total Runing Time: ",running_time)


In [24]:
# Example usage
train_paths, train_labels, test_paths, test_labels, selected_labels = sample_labels_and_images(
    image_paths, labels, num_labels=20, samples_per_label=100, test_size=0.2
)

# Print out statistics
print("Selected Labels:", selected_labels)
print("\nTraining Set:")
print(f"Total train images: {len(train_paths)}")
from collections import Counter
print("Train samples per label:")
print(Counter(train_labels))

print("\nTesting Set:")
print(f"Total test images: {len(test_paths)}")
print("Test samples per label:")
print(Counter(test_labels))

Selected Labels: ['electric_guitar', 'hedgehog', 'platypus', 'lobster', 'mandolin', 'tick', 'pagoda', 'butterfly', 'panda', 'crocodile_head', 'revolver', 'flamingo', 'watch', 'cellphone', 'cannon', 'schooner', 'pigeon', 'Leopards', 'headphone', 'dolphin']

Training Set:
Total train images: 951
Train samples per label:
Counter({'Leopards': 80, 'watch': 80, 'butterfly': 73, 'revolver': 66, 'electric_guitar': 60, 'flamingo': 54, 'dolphin': 52, 'schooner': 50, 'cellphone': 47, 'hedgehog': 43, 'crocodile_head': 41, 'tick': 39, 'pagoda': 38, 'pigeon': 36, 'mandolin': 34, 'headphone': 34, 'cannon': 34, 'lobster': 33, 'panda': 30, 'platypus': 27})

Testing Set:
Total test images: 238
Test samples per label:
Counter({'Leopards': 20, 'watch': 20, 'butterfly': 18, 'revolver': 16, 'electric_guitar': 15, 'flamingo': 13, 'schooner': 13, 'dolphin': 13, 'cellphone': 12, 'hedgehog': 11, 'crocodile_head': 10, 'tick': 10, 'mandolin': 9, 'pigeon': 9, 'pagoda': 9, 'cannon': 9, 'lobster': 8, 'panda': 8, 'he

In [25]:

cookbook = CookBook(train_paths, train_labels, test_paths, test_labels, n_cluster=50)
cookbook.evaluate("common_word_retrieval")
cookbook.evaluate("tfidf_retrieval")
cookbook.evaluate("KL_divergence_retrieval")


Extracting SIFT Features: 100%|██████████| 951/951 [01:09<00:00, 13.61it/s]


Kmean clustering...
Total Training Time: 641.5259337425232


Extracting SIFT Features: 100%|██████████| 238/238 [00:15<00:00, 15.45it/s]


common_word_retrieval
n_cluster =  50
Train top 3 accuracy: 77.60252365930599
Test top 3 accuracy: 25.630252100840334
Train mean reciprocal rank: 0.710340410261546
Test mean reciprocal rank: 0.19452781112444986
Total Runing Time:  1.5982460975646973
tfidf_retrieval
n_cluster =  50
Train top 3 accuracy: 100.0
Test top 3 accuracy: 50.84033613445378
Train mean reciprocal rank: 1.0
Test mean reciprocal rank: 0.42880985727624377
Total Runing Time:  6.836222171783447
KL_divergence_retrieval
n_cluster =  50
Train top 3 accuracy: 100.0
Test top 3 accuracy: 46.21848739495798
Train mean reciprocal rank: 1.0
Test mean reciprocal rank: 0.39039949313058575
Total Runing Time:  2.6312737464904785
