# Python vs. C++ - Why we choose Python?

# Install

## Part 1

In [None]:
! pip install PyYAML 

In [None]:
! pip install opencv-python

In [None]:
! pip install pyarrow

In [None]:
! pip uninstall preprocess

In [None]:
! pip install tqdm

In [None]:
! pip install faiss-cpu

## Part 2

In [None]:
! pip install torch

In [None]:
! pip install transformers

# Part 1

## > Dataset Acquisition & Audit

Our idea here is try to understand the distribution of the data, how the classes are distributed and if all resolution are the same. This step is important because will direct us about how to lead with the data on the nexts steps.

In [None]:
import os
import csv
import cv2

# Path to the folder containing the extracted images
root_dir = "MIR_DATASETS_B"
output_csv = "inventory.csv"

rows = []
for dirpath, _, filenames in os.walk(root_dir):
    for f in filenames:
        if f.lower().endswith((".png", ".jpg", ".jpeg")):
            path = os.path.join(dirpath, f)
            label = os.path.basename(dirpath)  # Folder name is used as the label
            img = cv2.imread(path)
            h, w = img.shape[:2]  # Get image height and width
            size_kb = round(os.path.getsize(path) / 1024, 2)  # File size in kilobytes
            rows.append([path, label, w, h, size_kb])

# Save image metadata to a CSV file
with open(output_csv, "w", newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["path", "class", "width", "height", "size_kb"])
    writer.writerows(rows)


In [None]:
import pandas as pd

df = pd.read_csv("inventory.csv")

# Number of images by class
class_counts = df['class'].value_counts().reset_index()
class_counts.columns = ["class", "n_imagens"]

# Average resolution ± σ
df["resolução"] = df["width"].astype(int) * df["height"].astype(int)
mean_res = df.groupby("class")["resolução"].mean()
std_res = df.groupby("class")["resolução"].std()

# Example of classes
example_paths = df.groupby("class")["path"].apply(lambda x: x.sample(3, random_state=42).tolist())


In [None]:
# Merge into a single summary table
summary_table = class_counts.set_index("class")
summary_table["mean_resolution"] = mean_res
summary_table["std_resolution"] = std_res

print(summary_table)

## > Pre image processing

It's important to normalize, redimension and convert all images to RGB to have something more standardized. Without this step, we will have problems applying the algorithms in the future.

In [4]:
import cv2
import numpy as np
import yaml
import matplotlib.pyplot as plt

ModuleNotFoundError: No module named 'cv2'

In [None]:
# Load configuration from YAML
with open("config.yaml", "r") as f:
    config = yaml.safe_load(f)

target_size = config["preprocessing"]["target_size"]

In [None]:
# Load configuration from YAML
with open("config.yaml", "r") as f:
    config = yaml.safe_load(f)

target_size = config["preprocessing"]["target_size"]

# def preprocess(img):
#     """
#     Preprocesses an image:
#     - Converts BGR to RGB
#     - Resizes to target size with aspect ratio preservation and padding
#     - Normalizes pixel values to float32 range [0, 1]
#     """
#     # Convert to RGB
#     img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

#     # Get original dimensions
#     h, w = img_rgb.shape[:2]
#     scale = min(target_size / h, target_size / w)
#     new_w, new_h = int(w * scale), int(h * scale)

#     # Resize with aspect ratio
#     resized = cv2.resize(img_rgb, (new_w, new_h), interpolation=cv2.INTER_AREA)

#     # Add padding to reach target size
#     delta_w = target_size - new_w
#     delta_h = target_size - new_h
#     top, bottom = delta_h // 2, delta_h - (delta_h // 2)
#     left, right = delta_w // 2, delta_w - (delta_w // 2)
#     padded = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])

#     # Normalize to [0, 1]
#     normalized = padded.astype(np.float32) / 255.0

#     return normalized

def preprocess(img, target_size=256):
    """Resize + pad + normalize image to match interface."""
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    h, w = img_rgb.shape[:2]
    scale = min(target_size / h, target_size / w)
    new_w, new_h = int(w * scale), int(h * scale)
    resized = cv2.resize(img_rgb, (new_w, new_h), interpolation=cv2.INTER_AREA)

    delta_w = target_size - new_w
    delta_h = target_size - new_h
    top, bottom = delta_h // 2, delta_h - (delta_h // 2)
    left, right = delta_w // 2, delta_w - (delta_w // 2)
    padded = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])

    normalized = padded.astype(np.float32) / 255.0
    return normalized


In [None]:
def test_preprocess_shape_and_range():
    # Create dummy image with random size and color
    dummy_img = np.random.randint(0, 256, (300, 150, 3), dtype=np.uint8)

    processed = preprocess(dummy_img)

    # Check shape
    assert processed.shape == (256, 256, 3)

    # Check range
    assert processed.dtype == np.float32
    assert processed.min() >= 0.0
    assert processed.max() <= 1.0


#### Testing

In [None]:
# Load image in BGR format
original_img = cv2.imread("MIR_DATASETS_B/MIR_DATASETS_B/araignees/barn spider/0_0_araignees_barnspider_1.jpg")

# Apply preprocessing
processed_img = preprocess(original_img)

# Convert original BGR to RGB for display
original_rgb = cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB)

# Display both images
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].imshow(original_rgb)
axes[0].set_title("Original")
axes[0].axis("off")

axes[1].imshow(processed_img)
axes[1].set_title("Preprocessed (256x256)")
axes[1].axis("off")

plt.tight_layout()
plt.show()

## > Descriptors

### Extract descriptors functions

In [None]:
def extract_color_histogram(img):
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    hist = cv2.calcHist([img_rgb], [0, 1, 2], None, [8, 8, 8], [0, 256]*3)
    hist = cv2.normalize(hist, hist, norm_type=cv2.NORM_L1)  # normalização correta
    return hist.flatten()


def extract_orb(img):
    """
    Extract ORB descriptors from a raw or normalized image.
    Ensures correct input type and shape.
    """
    if img.dtype != np.uint8:
        img = (img * 255).clip(0, 255).astype(np.uint8)

    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    orb = cv2.ORB_create(nfeatures=500)
    keypoints, descriptors = orb.detectAndCompute(gray, None)

    return descriptors  # can be None


def extract_sift(img):
    """
    Extract SIFT descriptors from a raw or normalized image.
    Ensures correct input type and shape.
    """
    if img.dtype != np.uint8:
        img = (img * 255).clip(0, 255).astype(np.uint8)

    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    sift = cv2.SIFT_create()
    keypoints, descriptors = sift.detectAndCompute(gray, None)

    return descriptors  # can be None

## > Extraction and Persistence

### Diagram

           ┌────────────────────────────┐
           │     dataset_dir (root)     │
           └────────────┬───────────────┘
                        │
              ┌─────────▼─────────┐
              │ Coletar caminhos  │
              │ de todas imagens  │
              └─────────┬─────────┘
                        │
              ┌─────────▼─────────┐
              │  Paralelizar com  │
              │  multiprocessing  │
              └─────────┬─────────┘
                        │
        ┌───────────────▼────────────────┐
        │ Extrair: color_hist, ORB, SIFT │
        │ Salvar .npy (por imagem)       │
        └───────────────┬────────────────┘
                        │
        ┌───────────────▼────────────────────┐
        │  Atualizar `features_meta.parquet` │
        └────────────────────────────────────┘

### Feature extract

In [3]:
import gc
import os
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
from pathlib import Path

# Worker function that runs in main process or subprocess
# def extract_features_worker(args):
#     import gc
#     img_path, output_dir = args
#     print(f"[INFO] Processando: {img_path}")

#     try:
#         img = cv2.imread(img_path)
#         if img is None or img.size == 0:
#             print(f"[ERRO] Imagem inválida (cv2.imread retornou None): {img_path}")
#             return None

#         orb = cv2.ORB_create()
#         sift = cv2.SIFT_create()

#         try:
#             hist = cv2.calcHist([img], [0, 1, 2], None, [8]*3, [0, 256]*3).flatten()
#         except Exception as e:
#             print(f"[ERRO] Falha ao calcular histograma: {img_path}: {e}")
#             hist = np.zeros(512, dtype=np.float32)

#         try:
#             _, orb_desc = orb.detectAndCompute(img, None)
#             if orb_desc is None:
#                 orb_desc = np.zeros((0, 32), dtype=np.uint8)
#         except Exception as e:
#             print(f"[ERRO] ORB em {img_path}: {e}")
#             orb_desc = np.zeros((0, 32), dtype=np.uint8)

#         try:
#             _, sift_desc = sift.detectAndCompute(img, None)
#             if sift_desc is None:
#                 sift_desc = np.zeros((0, 128), dtype=np.float32)
#         except Exception as e:
#             print(f"[ERRO] SIFT em {img_path}: {e}")
#             sift_desc = np.zeros((0, 128), dtype=np.float32)

#         base = Path(img_path).stem
#         np.save(f"{output_dir}/{base}_hist.npy", hist)
#         np.save(f"{output_dir}/{base}_orb.npy", orb_desc)
#         np.save(f"{output_dir}/{base}_sift.npy", sift_desc)

#         del img, hist, orb_desc, sift_desc
#         gc.collect()

#         return {
#             "file": img_path,
#             "hist_file": f"{base}_hist.npy",
#             "orb_file": f"{base}_orb.npy",
#             "sift_file": f"{base}_sift.npy"
#         }

#     except Exception as e:
#         print(f"[FATAL] Erro inesperado em {img_path}: {e}")
#         return None

def extract_features_worker(args):
    import gc
    img_path, output_dir = args
    print(f"[INFO] Processing: {img_path}")

    try:
        img_bgr = cv2.imread(img_path)
        if img_bgr is None or img_bgr.size == 0:
            print(f"[ERROR] Invalid image: {img_path}")
            return None

        # Apply same preprocessing as interface
        img_preprocessed = preprocess(img_bgr, target_size=256)
        img_for_descriptors = (img_preprocessed * 255).astype(np.uint8)  # Convert back to 0-255 uint8

        # Extract histogram (from RGB normalized)
        img_rgb = cv2.cvtColor(img_for_descriptors, cv2.COLOR_BGR2RGB)
        hist = cv2.calcHist([img_rgb], [0, 1, 2], None, [8, 8, 8], [0, 256]*3)
        hist = cv2.normalize(hist, hist, norm_type=cv2.NORM_L1).flatten()

        # ORB
        orb = cv2.ORB_create(nfeatures=500)
        gray_orb = cv2.cvtColor(img_for_descriptors, cv2.COLOR_RGB2GRAY)
        _, orb_desc = orb.detectAndCompute(gray_orb, None)
        if orb_desc is None:
            orb_desc = np.zeros((0, 32), dtype=np.uint8)

        # SIFT
        sift = cv2.SIFT_create()
        gray_sift = cv2.cvtColor(img_for_descriptors, cv2.COLOR_RGB2GRAY)
        _, sift_desc = sift.detectAndCompute(gray_sift, None)
        if sift_desc is None:
            sift_desc = np.zeros((0, 128), dtype=np.float32)

        base = Path(img_path).stem
        np.save(f"{output_dir}/{base}_hist.npy", hist)
        np.save(f"{output_dir}/{base}_orb.npy", orb_desc)
        np.save(f"{output_dir}/{base}_sift.npy", sift_desc)

        del img_bgr, img_preprocessed, orb_desc, sift_desc, hist
        gc.collect()

        return {
            "file": img_path,
            "hist_file": f"{base}_hist.npy",
            "orb_file": f"{base}_orb.npy",
            "sift_file": f"{base}_sift.npy"
        }

    except Exception as e:
        print(f"[FATAL] Unexpected error with {img_path}: {e}")
        return None

# Manager class
# class FeatureExtractorManager:
#     def run_on_batch(self, image_paths, output_dir):
#         os.makedirs(output_dir, exist_ok=True)

#         args = [(img_path, output_dir) for img_path in image_paths]
#         results = []

#         for arg in tqdm(args, desc="Processando lote"):
#             result = extract_features_worker(arg)
#             if result is not None:
#                 results.append(result)
#             gc.collect()  # liberação manual de memória

#         df = pd.DataFrame(results)
#         return df
#     def run(self, dataset_dir, output_dir, n_jobs=None):
#         os.makedirs(output_dir, exist_ok=True)

#         all_imgs = [str(p) for p in Path(dataset_dir).rglob("*.jpg")]
#         n_jobs = n_jobs or cpu_count()

#         args = [(img_path, output_dir) for img_path in all_imgs]

#         results = []
#         if n_jobs == 1:
#             # Sequential loop (safe for notebooks)
#             for arg in tqdm(args, desc="Processing images (no parallel)"):
#                 result = extract_features_worker(arg)
#                 if result is not None:
#                     results.append(result)
#         else:
#             # Parallel execution
#             with Pool(n_jobs) as pool:
#                 results = list(tqdm(pool.imap(extract_features_worker, args), total=len(all_imgs)))
#                 results = [r for r in results if r is not None]

#         # Save metadata
#         df = pd.DataFrame(results)
#         df.to_parquet(os.path.join(output_dir, "features_meta.parquet"))
class FeatureExtractorManager:
    def run_on_batch(self, image_paths, output_dir):
        """Sequential feature extraction for a list of image paths."""
        os.makedirs(output_dir, exist_ok=True)

        args = [(img_path, output_dir) for img_path in image_paths]
        results = []

        for arg in tqdm(args, desc="🔄 Processing images (sequential)"):
            result = extract_features_worker(arg)
            if result is not None:
                results.append(result)
            gc.collect()

        df = pd.DataFrame(results)
        return df

    def run(self, dataset_dir, output_dir, n_jobs=None):
        """Full directory feature extraction with optional multiprocessing."""
        os.makedirs(output_dir, exist_ok=True)

        all_imgs = [str(p) for p in Path(dataset_dir).rglob("*.jpg")]
        n_jobs = n_jobs or cpu_count()

        args = [(img_path, output_dir) for img_path in all_imgs]
        results = []

        if n_jobs == 1:
            for arg in tqdm(args, desc="🔄 Processing images (no parallel)"):
                result = extract_features_worker(arg)
                if result is not None:
                    results.append(result)
        else:
            with Pool(n_jobs) as pool:
                for result in tqdm(pool.imap(extract_features_worker, args), total=len(args)):
                    if result is not None:
                        results.append(result)

        df = pd.DataFrame(results)
        df.to_parquet(os.path.join(output_dir, "features_meta.parquet"))
        return df

ModuleNotFoundError: No module named 'cv2'

#### Testing

In [None]:
# Load image
img_path = "MIR_DATASETS_B/MIR_DATASETS_B/araignees/barn spider/0_0_araignees_barnspider_1.jpg"
img = cv2.imread(img_path)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

# --- Color Histogram ---
hist = cv2.calcHist([img], [0, 1, 2], None, [8, 8, 8], [0, 256]*3).flatten()
colors = ('r', 'g', 'b')
plt.figure(figsize=(12, 4))
for i, color in enumerate(colors):
    hist = cv2.calcHist([img], [i], None, [256], [0, 256])
    plt.plot(hist, color=color)
plt.title("Color Histogram (RGB channels)")
plt.xlabel("Pixel value")
plt.ylabel("Frequency")
plt.grid(True)
plt.tight_layout()
plt.show()

# --- ORB Keypoints ---
orb = cv2.ORB_create()
kp_orb, desc_orb = orb.detectAndCompute(img, None)
img_kp_orb = cv2.drawKeypoints(img, kp_orb, None, flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)

plt.figure(figsize=(6, 6))
plt.imshow(cv2.cvtColor(img_kp_orb, cv2.COLOR_BGR2RGB))
plt.title("ORB Keypoints")
plt.axis("off")
plt.show()

# --- SIFT Keypoints ---
sift = cv2.SIFT_create()
kp_sift, desc_sift = sift.detectAndCompute(img, None)
img_kp_sift = cv2.drawKeypoints(img, kp_sift, None, flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)

plt.figure(figsize=(6, 6))
plt.imshow(cv2.cvtColor(img_kp_sift, cv2.COLOR_BGR2RGB))
plt.title("SIFT Keypoints")
plt.axis("off")
plt.show()


# --- Visualization & Output ---
print("Color Histogram Shape:", hist.shape)
print("ORB Descriptor Shape:", None if desc_orb is None else desc_orb.shape)
print("SIFT Descriptor Shape:", None if desc_sift is None else desc_sift.shape)

## > Construction of the index with FAISS (IVF-PQ)

In [None]:
import os
import faiss
from pathlib import Path

In [None]:
from pathlib import Path

# Root path where class folders are located
root = Path("MIR_DATASETS_B/MIR_DATASETS_B")

# Initialize the feature extraction manager (assumed to be implemented elsewhere)
fem = FeatureExtractorManager()

# Iterate through each subdirectory (representing a class)
for class_dir in root.iterdir():
    if not class_dir.is_dir():
        continue

    input_dir = str(class_dir)
    output_dir = str(Path("features_output") / class_dir.name)

    print(f"\n🟢 Starting processing for class: {class_dir.name}")
    fem.run(input_dir, output_dir, n_jobs=1)
    print(f"✅ Completed: {class_dir.name}")


### Creating the matrix N x D

Getting the parameters and applying in Faiss index

In [None]:
from pathlib import Path

root = Path("features_output")
print("Subfolders:", [p.name for p in root.iterdir() if p.is_dir()])

In [None]:
import os

for folder in os.listdir("features_output"):
    path = os.path.join("features_output", folder)
    if os.path.isdir(path):
        print(f"Checking: {folder}")
        for file in os.listdir(path):
            if file.endswith("_hist.npy") or file.endswith("_sift.npy") or file.endswith("_orb.npy"):
                print("  Found descriptor file:", file)
                break


In [None]:
vec = np.load("features_output/araignees/0_0_araignees_barnspider_0_hist.npy")
print(vec.shape)


In [None]:
import os
import faiss
import numpy as np
import pandas as pd
from pathlib import Path

In [None]:
# Root directory containing class subfolders with features
features_root = "features_output"
index_output = "index_store"
os.makedirs(index_output, exist_ok=True)

# Supported descriptor types
descriptor_types = ["hist", "orb", "sift"]

# Target fixed lengths for each descriptor (after flattening)
target_lengths = {
    "hist": 512,           # 8x8x8 bins
    "orb": 500 * 32,       # 500 keypoints × 32 dims
    "sift": 500 * 128      # 500 keypoints × 128 dims
}

# Fix vector to a specific length by truncating or padding with zeros
def fix_vector(vec, target_len):
    flat = vec.flatten()
    if flat.shape[0] >= target_len:
        return flat[:target_len]
    else:
        padded = np.zeros(target_len, dtype=np.float32)
        padded[:flat.shape[0]] = flat
        return padded

# Dictionary to collect descriptor vectors and IDs
vectors = {desc: [] for desc in descriptor_types}
ids = {desc: [] for desc in descriptor_types}

# Step 1: Traverse all subfolders and collect .npy files by descriptor type
for class_dir in Path(features_root).glob("*"):
    if not class_dir.is_dir():
        continue
    print(f">>> Entering class folder: {class_dir.name}")

    meta_path = class_dir / "features_meta.parquet"
    if not meta_path.exists():
        print(f"[!] Missing metadata in: {class_dir}")
        continue

    meta = pd.read_parquet(meta_path)

    for npy_file in class_dir.glob("*.npy"):
        fname = npy_file.name
        print(f"  - Found: {npy_file.name}")
        for desc in descriptor_types:
            if fname.endswith(f"_{desc}.npy"):
                print(f"    -> Matched descriptor: {desc}")
                try:
                    vec = np.load(npy_file)
                    if vec is None or len(vec.shape) == 0:
                        print("    [!] Empty vector, skipping")
                        continue

                    # Apply fix only for descriptors that need it
                    vec = fix_vector(vec, target_lengths[desc])
                    vectors[desc].append(vec.astype(np.float32))

                    ids[desc].append({
                        "path": str(npy_file),
                        "classe": class_dir.name,
                        "vector_dim": vec.shape[0]
                    })

                except Exception as e:
                    print(f"    [X] Error loading {npy_file}: {e}")
                break

# Step 2: For each descriptor type, build FAISS index and save IDs
for desc in descriptor_types:
    vec_list = vectors[desc]
    if not vec_list:
        print(f"[!] No vectors found for descriptor: {desc}")
        continue

    print(f"✅ Building FAISS index for: {desc} with {len(vec_list)} vectors")

    X = np.vstack(vec_list).astype(np.float32)
    d = X.shape[1]

    # Create FlatL2 index and add vectors
    index = faiss.IndexFlatL2(d)
    index.add(X)

    # Save FAISS index
    index_path = f"{index_output}/index_{desc}.faiss"
    faiss.write_index(index, index_path)

    # Save corresponding metadata
    df_ids = pd.DataFrame(ids[desc])
    ids_path = f"{index_output}/ids_{desc}.parquet"
    df_ids.to_parquet(ids_path, index=False)

    print(f"📁 Saved {index_path} and {ids_path}")


## > Similiarity Metrics

### Distance

In [None]:
import numpy as np

def distance(a: np.ndarray, b: np.ndarray, metric="cosine") -> float:
    """
    Computes the distance or similarity between two vectors using the specified metric.

    Supported metrics:
    - "l2"
    - "cosine"
    - "chi2"
    - "bhattacharyya"

    Parameters:
        a (np.ndarray): First vector.
        b (np.ndarray): Second vector.
        metric (str): Metric name.

    Returns:
        float: Distance value (lower = more similar, except cosine where higher = more similar).
    """
    a = a.astype(np.float32)
    b = b.astype(np.float32)

    if metric == "l2":
        return np.linalg.norm(a - b)

    elif metric == "cosine":
        num = np.dot(a, b)
        denom = np.linalg.norm(a) * np.linalg.norm(b)
        return 1 - (num / denom) if denom != 0 else 1.0

    elif metric == "chi2":
        eps = 1e-10
        return 0.5 * np.sum(((a - b) ** 2) / (a + b + eps))

    elif metric == "bhattacharyya":
        # Assumes histograms are normalized
        return -np.log(np.sum(np.sqrt(a * b)) + 1e-10)

    else:
        raise ValueError(f"Unsupported metric: {metric}")


#### Comparing Faiss vs. NumPY (L2)

In [None]:
import faiss

# Create FAISS L2 index and add a vector
d = 4
index = faiss.IndexFlatL2(d)
x = np.random.rand(1, d).astype(np.float32)
index.add(x)

# Query with itself
D, I = index.search(x, 1)
faiss_l2 = np.sqrt(D[0][0])  # FAISS returns squared L2

# Compare with our implementation
manual_l2 = distance(x[0], x[0], metric="l2")

print(f"FAISS L2:   {faiss_l2}")
print(f"Manual L2:  {manual_l2}")
print(f"Diff:       {abs(faiss_l2 - manual_l2)}")
assert abs(faiss_l2 - manual_l2) < 1e-5


## > Query with image

In [None]:
# Map descriptor type to its extractor and index settings
DESCRIPTORS = {
    "hist": {
        "extractor": extract_color_histogram,
        "index_path": "index_store/index_hist.faiss",
        "ids_path": "index_store/ids_hist.parquet",
        "metric": "l2"
    },
    "orb": {
        "extractor": extract_orb,
        "index_path": "index_store/index_orb.faiss",
        "ids_path": "index_store/ids_orb.parquet",
        "metric": "l2"
    },
    "sift": {
        "extractor": extract_sift,
        "index_path": "index_store/index_sift.faiss",
        "ids_path": "index_store/ids_sift.parquet",
        "metric": "l2"
    }
}

# Fixed vector lengths used during FAISS index creation
TARGET_LENGTHS = {
    "hist": 512,
    "orb": 500 * 32,
    "sift": 500 * 128
}

def fix_vector(vec, target_len):
    flat = vec.flatten()
    if flat.shape[0] >= target_len:
        return flat[:target_len]
    else:
        padded = np.zeros(target_len, dtype=np.float32)
        padded[:flat.shape[0]] = flat
        return padded

def query_image(image_path, top_k=10, ensemble_config=None):
    """
    Queries one or more FAISS indexes for the given image.
    
    Parameters:
        image_path (str): Path to the query image.
        top_k (int): Number of results to return.
        ensemble_config (dict): Weights for each descriptor. 
                                If None, uses only 'hist'.

    Returns:
        List of dicts with: image_id, score (distance), classe.
    """
    if ensemble_config is None:
        ensemble_config = {"hist": 1.0}

    # Normalize path
    image_path = str(Path(image_path).resolve())

    # Load and preprocess image
    img = cv2.imread(image_path)
    if img is None:
        raise FileNotFoundError(f"[X] Failed to load image: {image_path}")
    img = preprocess(img)

    combined_scores = {}
    combined_meta = {}

    for desc, weight in ensemble_config.items():
        if desc not in DESCRIPTORS:
            continue

        try:
            # Extract feature vector
            vec = DESCRIPTORS[desc]["extractor"](img)

            # Skip if extraction failed
            if vec is None or vec.size == 0:
                print(f"[!] Descriptor '{desc}' returned None or empty for image {image_path}. Skipping.")
                continue

            vec = fix_vector(vec, TARGET_LENGTHS[desc])
            vec = vec.reshape(1, -1).astype(np.float32)

            # Load index and metadata
            index = faiss.read_index(DESCRIPTORS[desc]["index_path"])
            ids_df = pd.read_parquet(DESCRIPTORS[desc]["ids_path"])

            # Search
            distances, indices = index.search(vec, top_k * 5)
            for i, idx in enumerate(indices[0]):
                if idx == -1:
                    continue
                item = ids_df.iloc[idx]
                key = item["path"]
                cls = item["classe"]
                score = distances[0][i]

                if key not in combined_scores:
                    combined_scores[key] = 0
                    combined_meta[key] = {"classe": cls}
                combined_scores[key] += weight * score

        except Exception as e:
            print(f"[X] Error processing descriptor '{desc}' for image {image_path}: {e}")
            continue

    # Aggregate and sort results
    results = [
        {"image_id": key, "score": combined_scores[key], "classe": combined_meta[key]["classe"]}
        for key in combined_scores
    ]
    results = sorted(results, key=lambda x: x["score"])[:top_k]

    return results


In [None]:
import cv2
import pandas as pd

results = query_image("MIR_DATASETS_B/MIR_DATASETS_B/chiens/Chihuahua/1_3_chiens_Chihuahua_1271.jpg", top_k=5, ensemble_config={"hist": 0.5, "orb": 0.5})
for r in results:
    print(r)

#### Seeing the differences, and why is so wrong the result

In [None]:
def show_query_results(query_path, results, title="Top-5 Similar Images"):
    """
    Displays the query image and the top-k results with scores and classes.

    Parameters:
        query_path (str): Path to the query image.
        results (list): List of result dicts from query_image().
    """
    n = len(results)
    plt.figure(figsize=(15, 3))

    # Show query image
    query_img = cv2.imread(query_path)
    query_img = cv2.cvtColor(query_img, cv2.COLOR_BGR2RGB)
    plt.subplot(1, n + 1, 1)
    plt.imshow(query_img)
    plt.title("Query")
    plt.axis("off")

    # Root folder of original images
    original_root = Path("MIR_DATASETS_B/MIR_DATASETS_B")

    for i, result in enumerate(results):
        npy_path = Path(result["image_id"])
        classe = result["classe"]
        score = result["score"]

        parts = npy_path.name.split("_")
        subclasse = parts[3]
        image_name = "_".join(parts[:-1]) + ".jpg"

        # Construct full image path
        original_img_path = original_root / classe / subclasse / image_name

        if not original_img_path.exists():
            print(f"[!] Missing image file: {original_img_path}")
            continue

        img = cv2.imread(str(original_img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        plt.subplot(1, n + 1, i + 2)
        plt.imshow(img)
        plt.title(f"{classe}\nscore: {score:.1f}")
        plt.axis("off")

    plt.suptitle(title, fontsize=16)
    plt.tight_layout()
    plt.show()


In [None]:
query = "MIR_DATASETS_B/MIR_DATASETS_B/chiens/Chihuahua/1_3_chiens_Chihuahua_1271.jpg"
results = query_image(query, top_k=5, ensemble_config={"hist": 0.5, "orb": 0.5})
show_query_results(query, results)

## > Evaluating model

#### Generate test queries

In [None]:
import pandas as pd
from pathlib import Path

# Base path to original images
base_img_dir = Path("MIR_DATASETS_B/MIR_DATASETS_B")

# Pick 1 image per class (manual or automated)
examples = [
    ("araignees", "barn spider", "0_0_araignees_barnspider_1.jpg"),
    ("chiens", "Chihuahua", "1_3_chiens_Chihuahua_1271.jpg"),
    ("oiseaux", "bulbul", "2_5_oiseaux_bulbul_2512.jpg"),
    ("poissons", "hammerhead", "3_5_poissons_hammerhead_3414.jpg"),
    ("singes", "gorilla", "4_2_singes_gorilla_3890.jpg")
]

# Build rows for the CSV
rows = []
for classe, subclasse, filename in examples:
    path = base_img_dir / classe / subclasse / filename
    rows.append({
        "query_image_path": str(path),
        "expected_class": classe
    })

# Save to CSV
df = pd.DataFrame(rows)
df.to_csv("test_queries.csv", index=False)

print("✅ Saved test_queries.csv with 5 query samples.")


In [None]:
from pathlib import Path
import pandas as pd

df = pd.read_csv("test_queries.csv")

for path_str in df["query_image_path"]:
    path = Path(path_str.strip())
    print(f"{path}: Exists={path.exists()}")


#### Evaluating model

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path

def compute_average_precision(results, expected_class):
    """
    Compute average precision (AP) for a single query.
    """
    hits = 0
    precisions = []
    for i, r in enumerate(results):
        if r["classe"] == expected_class:
            hits += 1
            precisions.append(hits / (i + 1))
    return np.mean(precisions) if precisions else 0.0

def compute_r_precision(results, expected_class, R):
    """
    R-Precision = Precision at R, where R = number of relevant items.
    Since we only expect class match, assume R = total items with same class in the dataset.
    For simplification in top_k, we approximate R = hits in top_k.
    """
    relevant = [r for r in results if r["classe"] == expected_class]
    R_estimated = len(relevant)
    return sum([1 for r in results[:R_estimated] if r["classe"] == expected_class]) / max(R_estimated, 1)

def evaluate(test_csv_path, top_k=10, ensemble_config=None, save_csv=True):
    """
    Evaluate the retrieval system based on a test CSV.
    
    Returns a DataFrame with individual and global metrics.
    """
    df = pd.read_csv(test_csv_path)
    rows = []

    for i, row in df.iterrows():
        query_path = str(Path(row["query_image_path"]))
        expected_class = row["expected_class"]

        results = query_image(query_path, top_k=top_k, ensemble_config=ensemble_config)

        topk_hits = sum([1 for r in results[:top_k] if r["classe"] == expected_class])
        recall_at_k = topk_hits / top_k
        precision_at_k = topk_hits / top_k
        ap = compute_average_precision(results[:top_k], expected_class)
        rp = compute_r_precision(results[:top_k], expected_class, R=None)

        rows.append({
            "query_image": query_path,
            "expected_class": expected_class,
            "precision@k": precision_at_k,
            "recall@k": recall_at_k,
            "AP": ap,
            "R-Precision": rp
        })

    results_df = pd.DataFrame(rows)

    # Add global metrics
    avg_metrics = results_df[["precision@k", "recall@k", "AP", "R-Precision"]].mean()
    avg_metrics["query_image"] = "MEAN"
    avg_metrics["expected_class"] = "-"
    results_df = pd.concat([results_df, pd.DataFrame([avg_metrics])], ignore_index=True)

    if save_csv:
        results_df.to_csv("retrieval_evaluation_results.csv", index=False)
        print("✅ Saved: retrieval_evaluation_results.csv")

    return results_df


In [None]:
df_results = evaluate("test_queries.csv", top_k=5, ensemble_config={"hist": 0.5, "orb": 0.5})
print(df_results)


## > Try to maximize the result by the descriptors

In [None]:
import itertools
import time
import pandas as pd

def generate_weight_combinations(step=0.1):
    values = [round(i * step, 2) for i in range(int(1 / step) + 1)]
    combos = []
    for h, o, s in itertools.product(values, repeat=3):
        if abs(h + o + s - 1.0) < 1e-6:
            combos.append({"hist": h, "orb": o, "sift": s})
    return combos

def run_grid_search(test_csv="test_queries.csv", top_k=5):
    weight_combos = generate_weight_combinations(step=0.1)
    results = []

    for weights in weight_combos:
        print(f">>> Testing ensemble: {weights}")
        start = time.time()
        df_eval = evaluate(test_csv, top_k=top_k, ensemble_config=weights, save_csv=False)
        end = time.time()

        # Linha com métricas globais (última linha do df_eval)
        avg_row = df_eval[df_eval["query_image"] == "MEAN"].iloc[0]
        results.append({
            "hist_weight": weights["hist"],
            "orb_weight": weights["orb"],
            "sift_weight": weights["sift"],
            "mAP": avg_row["AP"],
            "precision@k": avg_row["precision@k"],
            "recall@k": avg_row["recall@k"],
            "R-Precision": avg_row["R-Precision"],
            "time_sec": round(end - start, 2)
        })

    df_results = pd.DataFrame(results)
    df_results.to_csv("ensemble_results.csv", index=False)
    print("✅ Saved: ensemble_results.csv")
    return df_results


In [None]:
df = run_grid_search("test_queries.csv", top_k=5)
print(df.sort_values("mAP", ascending=False).head(5))

# Part 2

## > Vision Transformer (ViT) - Vector extraction

Let's extract vectors from deep visual characteristics using a Vision Transformer (VIT).

We cannot reuse the existing .npy files, as VIT produces distinct embeddings that should be generated from scratch. Based on the last output of the past, Swift was the descriptor that did better, so let's compare with him in the future.

### Structuring the data

#### Creating the descriptors (feature_output_vit)

In [None]:
import os
from PIL import Image
import torch
import numpy as np
from tqdm import tqdm
from glob import glob
from transformers import ViTFeatureExtractor, ViTModel

# Load the ViT model and feature extractor
feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224')
vit_model = ViTModel.from_pretrained('google/vit-base-patch16-224')
vit_model.eval()

# Define input and output folders
input_root = "features_output"
image_root = "MIR_DATASETS_B/MIR_DATASETS_B"
output_root = "feature_output_vit"
os.makedirs(output_root, exist_ok=True)

# Loop over class folders
for class_name in os.listdir(input_root):
    class_input_path = os.path.join(input_root, class_name)
    class_output_path = os.path.join(output_root, class_name)
    os.makedirs(class_output_path, exist_ok=True)

    print(f"Processing class: {class_name}")

    npy_files = [f for f in os.listdir(class_input_path) if f.endswith("_sift.npy")]

    for npy_file in tqdm(npy_files):
        base_name = npy_file.replace("_sift.npy", "")
        # Use glob to find the image recursively within the class folder
        search_pattern = os.path.join(image_root, class_name, "**", base_name + ".jpg")
        matching_files = glob(search_pattern, recursive=True)

        if not matching_files:
            print(f"[!] Image not found for {base_name}")
            continue

        image_path = matching_files[0]

        image = Image.open(image_path).convert("RGB")
        inputs = feature_extractor(images=image, return_tensors="pt")

        with torch.no_grad():
            outputs = vit_model(**inputs)
            embedding = outputs.pooler_output.squeeze().numpy()

        output_file = os.path.join(class_output_path, base_name + ".npy")
        np.save(output_file, embedding)


#### Creating the vectors by class (.parquet)

In [None]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm

# Root directory where ViT features were saved
features_root = "feature_output_vit"

# Loop over each class directory
for class_name in os.listdir(features_root):
    class_path = os.path.join(features_root, class_name)
    
    # Skip if not a directory
    if not os.path.isdir(class_path):
        continue

    print(f"Creating parquet for class: {class_name}")
    
    embeddings = []
    image_ids = []

    # Read all .npy files in the class folder
    for fname in tqdm(os.listdir(class_path)):
        if not fname.endswith(".npy"):
            continue

        fpath = os.path.join(class_path, fname)
        vector = np.load(fpath)

        image_id = fname.replace(".npy", "")
        embeddings.append(vector)
        image_ids.append(image_id)

    # Create a DataFrame with image_id as index
    df = pd.DataFrame(embeddings, index=image_ids)
    df.index.name = "image_id"

    # Save as .parquet
    parquet_path = os.path.join(features_root, f"{class_name}.parquet")
    df.to_parquet(parquet_path)


#### Creating vector of text description

In [None]:
import json
import os
import pandas as pd
import torch
from tqdm import tqdm
from transformers import CLIPProcessor, CLIPModel

# Load CLIP model and processor
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model.eval()

# Load the captions JSON
with open("captions_MIR_DATASETS_B.json", "r") as f:
    captions = json.load(f)

# Store results
text_embeddings = []
image_ids = []

# Extract embeddings
for image_path, caption in tqdm(captions.items()):
    # Format image ID (remove extension and keep only filename)
    image_id = os.path.basename(image_path).replace(".jpg", "")

    # Tokenize and extract text embedding
    inputs = clip_processor(text=[caption], return_tensors="pt", padding=True)
    with torch.no_grad():
        outputs = clip_model.get_text_features(**inputs)
        embedding = outputs.squeeze().numpy()  # shape: (512,)

    image_ids.append(image_id)
    text_embeddings.append(embedding)

# Save all text embeddings into a single parquet
df = pd.DataFrame(text_embeddings, index=image_ids)
df.index.name = "image_id"
df.to_parquet("clip_text_embeddings.parquet")


#### Vector indexation

In [None]:
import os
import faiss
import numpy as np
import pandas as pd

# Caminho das pastas
features_root = "feature_output_vit"
index_output_path = "index_store"

# Crie a pasta se não existir
os.makedirs(index_output_path, exist_ok=True)

all_vectors = []
all_ids = []

# Lê todos os .parquet por classe
for fname in os.listdir(features_root):
    if not fname.endswith(".parquet"):
        continue
    
    df = pd.read_parquet(os.path.join(features_root, fname))
    all_vectors.append(df.values)
    all_ids.extend(df.index.tolist())

# Empilha tudo em um único array
vectors = np.vstack(all_vectors).astype("float32")  # FAISS exige float32

# Cria o índice FAISS
dimension = vectors.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(vectors)

# Salva o índice
faiss.write_index(index, os.path.join(index_output_path, "index_vit.faiss"))

# Salva os nomes das imagens
df_ids = pd.DataFrame({"image_id": all_ids})
df_ids.to_parquet(os.path.join(index_output_path, "ids_vit.parquet"))


#### Creating faiss index with clip textual vectors

In [None]:
import faiss
import pandas as pd
import numpy as np
import os

# Caminhos
embedding_file = "clip_text_embeddings.parquet"
index_output_path = "index_store"
os.makedirs(index_output_path, exist_ok=True)

# Carrega os embeddings textuais
df = pd.read_parquet(embedding_file)
vectors = df.values.astype("float32")
ids = df.index.tolist()

# Cria índice FAISS (512 dimensões)
dimension = vectors.shape[1]
assert dimension == 512, f"Expected dimension 512, got {dimension}"
index = faiss.IndexFlatL2(dimension)
index.add(vectors)

# Salva o índice
faiss.write_index(index, os.path.join(index_output_path, "index_clip_text.faiss"))

# Salva os image_ids
df_ids = pd.DataFrame({"image_id": ids})
df_ids.to_parquet(os.path.join(index_output_path, "ids_clip_text.parquet"))


## Searching by text

In [None]:
import faiss
import torch
import numpy as np
import pandas as pd
from transformers import CLIPProcessor, CLIPModel

# Load the CLIP model and processor
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model.eval()

# Load the FAISS index and image IDs (for text embeddings)
index = faiss.read_index("index_store/index_clip_text.faiss")
df_ids = pd.read_parquet("index_store/ids_clip_text.parquet")

# --- Function to search by text in the textual embedding space ---
def search_by_text(text_query, top_k=5):
    """
    Given a text query, return the top-k most semantically similar image captions.

    Args:
        text_query (str): The natural language search string.
        top_k (int): Number of closest results to return.

    Returns:
        results (list): List of image_ids ranked by similarity.
        distances (list): Corresponding distances in vector space.
    """
    # Convert the query into a CLIP embedding
    inputs = clip_processor(text=[text_query], return_tensors="pt", padding=True)
    with torch.no_grad():
        text_embedding = clip_model.get_text_features(**inputs).numpy().astype("float32")

    # Search for nearest neighbors in FAISS index
    distances, indices = index.search(text_embedding, top_k)

    # Retrieve image IDs from index positions
    results = [df_ids.iloc[idx]["image_id"] for idx in indices[0]]
    return results, distances[0]


#### Use example

In [None]:
results, distances = search_by_text("a monkey eating a banana", top_k=10)
print("Top results:")
for img, dist in zip(results, distances):
    print(f"{img}  (distance={dist:.4f})")

##### Showing the images

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import os

# Base directory where the original images are stored
image_base_path = "MIR_DATASETS_B/MIR_DATASETS_B"

# --- Function to locate full path of a given image_id ---
def find_image_path(image_id):
    """
    Search for the image path in the base directory using the image ID.
    It recursively searches through subfolders.

    Args:
        image_id (str): The image filename without extension.

    Returns:
        str or None: Full path to the image if found, else None.
    """
    for root, _, files in os.walk(image_base_path):
        for fname in files:
            if fname.startswith(image_id) and fname.endswith(".jpg"):
                return os.path.join(root, fname)
    return None

# --- Function to show search results as images ---
def show_results(image_ids, title="Top Results"):
    """
    Display images for the given image IDs.

    Args:
        image_ids (list): List of image IDs to show.
        title (str): Title to display above the image set.
    """
    plt.figure(figsize=(15, 3))
    for i, img_id in enumerate(image_ids):
        img_path = find_image_path(img_id)
        if img_path is None:
            print(f"[!] Not found: {img_id}")
            continue

        image = Image.open(img_path).convert("RGB")

        plt.subplot(1, len(image_ids), i + 1)
        plt.imshow(image)
        plt.title(img_id, fontsize=8)
        plt.axis("off")

    plt.suptitle(title)
    plt.tight_layout()
    plt.show()


In [None]:
results, distances = search_by_text("a monkey eating a banana", top_k=5)
show_results(results, title="Top 5: monkey eating a banana")

## Evaluating the text model

In [None]:
def extract_class_from_image_id(image_id):
    """
    Extracts the class label from an image ID.
    Assumes the ID format is: 'prefix_class_subclass_index'

    Example:
        '0_2_singes_monkey_123' → 'singes'

    Returns:
        str: top-level class name
    """
    parts = image_id.split('_')
    if len(parts) < 3:
        return "unknown"
    return parts[2]  # Ex: 'singes'


In [None]:
def get_total_relevant_images(class_name):
    """
    Count how many total images exist in the dataset for a given class.
    Based on IDs in the FAISS index.

    Returns:
        int
    """
    return sum(1 for img_id in df_ids["image_id"] if extract_class_from_image_id(img_id) == class_name)


In [None]:
def compute_average_precision(relevance_list):
    """
    Compute average precision for a binary relevance list.

    Args:
        relevance_list (list of 0 or 1)

    Returns:
        float: AP
    """
    ap = 0.0
    rel_count = 0
    for i, rel in enumerate(relevance_list):
        if rel:
            rel_count += 1
            ap += rel_count / (i + 1)
    return ap / rel_count if rel_count > 0 else 0.0


In [None]:
def evaluate_single_query(query_text, top_k=10):
    """
    Evaluate retrieval performance for a single text query.

    Steps:
    - Use the text to query the FAISS index
    - Extract the true class from the text's image_id (via caption mapping)
    - Compare retrieved image_ids against ground truth class

    Returns:
        dict with precision@k, recall@k, AP, R-Precision
    """
    # Get the ground truth image from the caption list
    # We assume you still have `captions` loaded
    matched = [(img_id, cap) for img_id, cap in captions.items() if cap == query_text]
    if not matched:
        print("[!] Caption not found in JSON.")
        return None

    query_image_id = os.path.basename(matched[0][0]).replace(".jpg", "")
    true_class = extract_class_from_image_id(query_image_id)

    # Run the search
    retrieved_ids, _ = search_by_text(query_text, top_k=top_k)

    # Evaluate
    relevant = [1 if extract_class_from_image_id(rid) == true_class else 0 for rid in retrieved_ids]
    precision_at_k = sum(relevant) / len(relevant)
    recall = sum(relevant) / get_total_relevant_images(true_class)
    r_precision = sum(relevant[:get_total_relevant_images(true_class)]) / get_total_relevant_images(true_class)
    ap = compute_average_precision(relevant)

    return {
        "query": query_text,
        "true_class": true_class,
        "precision@k": precision_at_k,
        "recall@k": recall,
        "AP": ap,
        "R-Precision": r_precision,
    }


#### Just one case evaluation

In [None]:
evaluate_single_query("a monkey eating a banana", top_k=10)

#### Evaluating some cases at same time

In [None]:
def evaluate_multiple_queries(captions_dict, top_k=10, max_queries=100):
    """
    Evaluate retrieval quality over multiple text queries.

    Args:
        captions_dict (dict): Mapping of image_path → caption
        top_k (int): Number of top results to consider for each query
        max_queries (int): Maximum number of queries to evaluate

    Returns:
        DataFrame with metrics per query and overall averages
    """
    from tqdm import tqdm

    all_results = []
    used_captions = set()
    query_count = 0

    for image_path, caption in tqdm(captions_dict.items()):
        # Avoid duplicates
        if caption in used_captions:
            continue
        used_captions.add(caption)

        result = evaluate_single_query(caption, top_k=top_k)
        if result is not None:
            all_results.append(result)
            query_count += 1

        if query_count >= max_queries:
            break

    df = pd.DataFrame(all_results)

    # Compute aggregated metrics
    summary = {
        "mean precision@k": df["precision@k"].mean(),
        "mean recall@k": df["recall@k"].mean(),
        "mean AP (mAP)": df["AP"].mean(),
        "mean R-Precision": df["R-Precision"].mean(),
        "evaluated_queries": len(df),
    }

    return df, pd.Series(summary)


In [None]:
# Avaliar até 100 legendas diferentes, com top_k = 10
df_metrics, summary = evaluate_multiple_queries(captions, top_k=10, max_queries=100)

# Ver resumo
print(summary)

### Creating the tables

#### Descriptors comparation

In [None]:
import os
import time
import faiss
import numpy as np
import pandas as pd

# Lista completa de descritores
descriptors = {
    "SIFT": {
        "index_file": "index_store/index_sift.faiss",
        "ids_file": "index_store/ids_sift.parquet",
    },
    "ORB": {
        "index_file": "index_store/index_orb.faiss",
        "ids_file": "index_store/ids_orb.parquet",
    },
    "HIST": {
        "index_file": "index_store/index_hist.faiss",
        "ids_file": "index_store/ids_hist.parquet",
    },
    "ViT": {
        "index_file": "index_store/index_vit.faiss",
        "ids_file": "index_store/ids_vit.parquet",
    },
    "CLIP-text": {
        "index_file": "index_store/index_clip_text.faiss",
        "ids_file": "index_store/ids_clip_text.parquet",
    }
}

def average_search_time(index, num_queries=100):
    dim = index.d
    queries = np.random.rand(num_queries, dim).astype("float32")
    start = time.time()
    index.search(queries, k=5)
    end = time.time()
    return (end - start) / num_queries

# Construção da tabela
rows = []

for i, (desc_name, paths) in enumerate(descriptors.items(), start=1):
    index_path = paths["index_file"]
    ids_path = paths["ids_file"]

    # Tempo de carregamento do índice
    start_index_time = time.time()
    index = faiss.read_index(index_path)
    index_load_time = time.time() - start_index_time

    # Tempo médio de busca
    avg_search = average_search_time(index)

    # Tamanho dos arquivos
    total_size_mb = (
        os.path.getsize(index_path) + os.path.getsize(ids_path)
    ) / 1024 / 1024

    rows.append({
        "Vos meilleurs descripteurs": f"Descripteur N° {i:02}",
        "Nom de(s) descripteur(s)": desc_name,
        "Temps d’indexation (s)": round(index_load_time, 3),
        "Taille du descripteur (MB)": round(total_size_mb, 2),
        "Temps de recherche moyen par image (s)": round(avg_search, 5)
    })

# DataFrame final
df_table1 = pd.DataFrame(rows)

In [None]:
print(df_table1)

#### Image searching table

In [None]:
# Our group of images
query_requests = {
    "R1": "3_4_poissons_eagleray_3310",
    "R2": "3_5_poissons_hammerhead_3495",
    "R3": "3_3_poissons_tigershark_3244",
    "R4": "1_2_chiens_boxer_1146",
    "R5": "1_4_chiens_goldenretriever_1423",
    "R6": "1_5_chiens_Rottweiler_1578",
    "R7": "4_3_singes_squirrelmonkey_4082",
    "R8": "4_2_singes_gorilla_4004",
    "R9": "4_1_singes_chimpanzee_3772"
}

In [None]:
import faiss
import numpy as np
import pandas as pd
from tqdm import tqdm

# Funções utilitárias
def extract_class_from_image_id(image_id):
    return image_id.split('_')[2]  # ex: 'poissons'

def get_total_relevant_images(class_name, df_ids):
    return sum(1 for img_id in df_ids["image_id"] if extract_class_from_image_id(img_id) == class_name)

def compute_average_precision(relevance_list, total_relevant):
    """
    Computes Average Precision (AP) over the top-k results,
    normalized by the number of total relevant items in the dataset.
    """
    ap = 0.0
    hit_count = 0

    for i, rel in enumerate(relevance_list):
        if rel:
            hit_count += 1
            precision_at_i = hit_count / (i + 1)
            ap += precision_at_i

    return ap / total_relevant if total_relevant > 0 else 0.0



def evaluate_vit_query(image_id, index, df_ids, feature_dir, top_ks=[50, 100]):
    """
    Evaluate retrieval metrics for a given query image using ViT features.

    Args:
        image_id (str): The ID of the query image (without .jpg)
        index (faiss.Index): FAISS index loaded for the descriptor
        df_ids (DataFrame): Mapping from FAISS positions to image_ids
        feature_dir (str): Directory where .npy features are stored
        top_ks (list): List of K values to evaluate (e.g., [50, 100])

    Returns:
        dict: Dictionary with precision, recall, AP and TopMax for each K
    """
    import os
    import numpy as np

    class_name = extract_class_from_image_id(image_id)

    # Load the ViT feature vector of the query image
    query_vector = None
    for root, _, files in os.walk(feature_dir):
        for f in files:
            if f.startswith(image_id) and f.endswith(".npy"):
                query_vector = np.load(os.path.join(root, f)).astype("float32").reshape(1, -1)
                break
        if query_vector is not None:
            break

    if query_vector is None:
        print(f"[!] Feature not found for {image_id}")
        return None

    result = {"Indice requête": image_id}
    total_relevant = get_total_relevant_images(class_name, df_ids)
    result["TopMax"] = total_relevant

    # Retrieve more than max(top_ks) to allow for exclusion
    max_k = max(top_ks)
    distances, indices = index.search(query_vector, max_k + 10)

    # Get image IDs of retrieved items
    retrieved_ids = [df_ids.iloc[i]["image_id"] for i in indices[0]]

    # Remove all images with the same exact subclass (prefix match)
    prefix = "_".join(image_id.split("_")[:4])  # Ex: '3_4_poissons_eagleray'
    retrieved_ids = [rid for rid in retrieved_ids if not rid.startswith(prefix)]


    # Recalculate metrics using cleaned list
    for k in top_ks:
        rel_k_ids = retrieved_ids[:k]
        relevance_k = [1 if extract_class_from_image_id(rid) == class_name else 0 for rid in rel_k_ids]

        precision = sum(relevance_k) / k
        recall = sum(relevance_k) / total_relevant
        ap = compute_average_precision(relevance_k, total_relevant)

        result[f"P (Top{k})"] = round(precision, 3)
        result[f"R (Top{k})"] = round(recall, 3)
        result[f"AP (Top{k})"] = round(ap, 3)

    result["MaP (Top50)"] = result["AP (Top50)"]
    result["MaP (Top100)"] = result["AP (Top100)"]

    return result


In [None]:
# Carregar o índice ViT
index = faiss.read_index("index_store/index_vit.faiss")
df_ids = pd.read_parquet("index_store/ids_vit.parquet")
feature_dir = "feature_output_vit"

# Avaliação das 9 requisições
tabela2_rows = []
for req_name, image_id in tqdm(query_requests.items()):
    row = evaluate_vit_query(image_id, index, df_ids, feature_dir)
    row["Indice requête"] = req_name
    tabela2_rows.append(row)

# Criar DataFrame com ordem de colunas
df_table2 = pd.DataFrame(tabela2_rows)[[
    "Indice requête",
    "R (Top50)", "R (Top100)",
    "P (Top50)", "P (Top100)",
    "AP (Top50)", "AP (Top100)",
    "MaP (Top50)", "MaP (Top100)",
    "TopMax"
]]

In [None]:
print(df_table2)