In [1]:
import torch
import numpy as np
import pandas as pd
import copy
from tqdm import tqdm
import time

In [2]:
def calculate_label_ids_until_hit(query_features, reference_features, labels, step_size=1000):
    Q = len(query_features)    
    steps = Q // step_size + 1
    
    labels_np = labels.cpu().numpy()
    ref2index = {idx: i for i, idx in enumerate(labels_np)}
    
    similarity = []
    
    for i in range(steps):
        start = step_size * i
        end = start + step_size
        sim_tmp = query_features[start:end] @ reference_features.T    
        similarity.append(sim_tmp.cpu())
     
    # matrix Q x R
    similarity = torch.cat(similarity, dim=0)
    label_ids_until_hit = dict()
    bar = tqdm(range(Q), desc="Generate lists of label_ids until Hit")

    for i in bar:
        # similiarity value of gt reference
        gt_sim = similarity[i, ref2index[labels_np[i]]]

        # number of references with higher similiarity as gt
        higher_sim = (similarity[i, :] > gt_sim).numpy()

        # creating list of label_ids until hit
        hit_indices = np.where(higher_sim)[0]
        
        # sorting in descending order
        sorted_hit_indices = hit_indices[np.argsort(-similarity[i, hit_indices].numpy())]

        # label_ids_until_hit[label_id] = [], [label_id1], [label_id1, label_id2, ...]
        label_ids_until_hit[labels_np[i]] = labels_np[sorted_hit_indices].tolist()

    return label_ids_until_hit

In [3]:
def calculate_scores(label_ids_until_hit, metadata_df, recall_ranks, topk_recall=True ):
    #### Calculating Recalls
    count_until_hit = [len(value) for value in label_ids_until_hit.values()]

    recall_results = list()
    for rank in recall_ranks:
        recall = np.mean([int(count < rank) for count in count_until_hit])*100
        info = f"Recall@{rank}"
        recall_results.append((recall, info))

    if topk_recall:
        id_count = len(label_ids_until_hit)
        topk = id_count//100
        recall = np.mean([int(count < topk)  for count in count_until_hit])*100
        info = f"Recall@{topk}/Recall@{topk/id_count*100:0.2f}"
        recall_results.append((recall, info))

    print(' - '.join([f"{info}: {recall:.4f}" for recall, info in recall_results]))

    #### Calculating Median Rank
    median_rank = np.median(count_until_hit)
    print('Median Rank: {:.4f}'.format(median_rank))

#    ##### Calculating Mean Error Distance
#    coordinates = metadata_df.loc[:, ["latitude", "longitude"]]
#    error_distances = []
#    
#    # Initialize Haversine Distance Metric
#    dist = DistanceMetric.get_metric("haversine")
#    
#    for true_label_id, wrong_label_ids in label_ids_until_hit.items():
#        true_coords = coordinates.loc[true_label_id].to_numpy()
#        if len(wrong_label_ids) > 0:
#            wrong_coords = coordinates.loc[wrong_label_ids].to_numpy()
#            # Calculate Haversine distances
#            distances = dist.pairwise(np.radians([true_coords]), np.radians(wrong_coords)).flatten() * 6371  
#            error_distances.append(np.mean(distances))
#        else:
#            error_distances.append(0)
#    
#    mean_distance_error = np.mean(error_distances)
#    print(f"Mean Distance: {mean_distance_error:.3f} km")
    
    return recall_results[0][0]

In [223]:
query_features = torch.randn(100,512)
reference_features = torch.randn(100,512)
labels = torch.tensor(range(1,101))

label_ids_until_hit = calculate_label_ids_until_hit(query_features, reference_features, labels, step_size=1000)
recall_1 = calculate_scores(label_ids_until_hit, None, recall_ranks=[1,5,10,50,100], topk_recall=True)

Generate lists of label_ids until Hit: 100%|██████████| 100/100 [00:00<00:00, 13069.22it/s]

Recall@1: 2.0000 - Recall@5: 5.0000 - Recall@10: 12.0000 - Recall@50: 50.0000 - Recall@100: 100.0000 - Recall@1/Recall@1.00: 2.0000
Median Rank: 48.0000



