# Testing

In [2]:
import pandas as pd
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Dict, Union

RuntimeError: Failed to import transformers.modeling_utils because of the following error (look up to see its traceback):
module 'torch' has no attribute 'version'

### Ranking of recommendation lists

**`MetricsCalculator`** class is designed for evaluating our recommendation system's results with **Precision@k**, **Recall@k**, **MAP**, and **NDCG** metrics.

In [None]:
class MetricsCalculator:
    @staticmethod
    def precision_at_k(relevant_items: List[int], recommended_items: List[int], k: int) -> float:
        top_k = recommended_items[:k]
        relevant_in_top_k = len(set(top_k) & set(relevant_items))
        return relevant_in_top_k / k if k > 0 else 0.0
    
    @staticmethod
    def recall_at_k(relevant_items: List[int], recommended_items: List[int], k: int) -> float:
        top_k = recommended_items[:k]
        relevant_in_top_k = len(set(top_k) & set(relevant_items))
        return relevant_in_top_k / len(relevant_items) if relevant_items else 0.0
    
    @staticmethod
    def average_precision(relevant_items: List[int], recommended_items: List[int]) -> float:
        ap = 0.0
        num_relevant = len(relevant_items)
        relevant_positions = [i+1 for i, item in enumerate(recommended_items) if item in relevant_items]
        
        for i, pos in enumerate(relevant_positions):
            ap += (i+1) / pos
        
        return ap / num_relevant if num_relevant > 0 else 0.0
    
    @staticmethod
    def mean_average_precision(relevant_items_list: List[List[int]], recommended_items_list: List[List[int]]) -> float:
        ap_scores = [
            MetricsCalculator.average_precision(relevant, recommended)
            for relevant, recommended in zip(relevant_items_list, recommended_items_list)
        ]
        return np.mean(ap_scores) if ap_scores else 0.0
    
    @staticmethod
    def ndcg_at_k(relevant_items: List[int], recommended_items: List[int], k: int, relevance_scores: Dict[int, float] = None) -> float:
        top_k = recommended_items[:k]
        if relevance_scores is None:
            relevance_scores = {item: 1.0 for item in relevant_items}
        
        dcg = sum(
            (relevance_scores.get(item, 0) / np.log2(i + 2) 
            for i, item in enumerate(top_k))
        )
        
        ideal_relevance = sorted([relevance_scores.get(item, 0) for item in relevant_items], reverse=True)[:k]
        idcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(ideal_relevance))
        
        return dcg / idcg if idcg > 0 else 0.0

    @staticmethod
    def calculate_all_metrics(
        relevant_items_list: List[List[int]],
        recommended_items_list: List[List[int]],
        k_values: List[int] = [1, 3, 5, 10],
        relevance_scores_list: List[Dict[int, float]] = None
    ) -> Dict[str, Union[float, Dict[int, float]]]:
        results = {
            'MAP': MetricsCalculator.mean_average_precision(relevant_items_list, recommended_items_list),
            'Precision@k': {},
            'Recall@k': {},
            'NDCG@k': {}
        }
        
        for k in k_values:
            results['Precision@k'][k] = np.mean([
                MetricsCalculator.precision_at_k(relevant, recommended, k)
                for relevant, recommended in zip(relevant_items_list, recommended_items_list)
            ])
            
            results['Recall@k'][k] = np.mean([
                MetricsCalculator.recall_at_k(relevant, recommended, k)
                for relevant, recommended in zip(relevant_items_list, recommended_items_list)
            ])
            
            if relevance_scores_list:
                results['NDCG@k'][k] = np.mean([
                    MetricsCalculator.ndcg_at_k(relevant, recommended, k, rel_scores)
                    for relevant, recommended, rel_scores in zip(relevant_items_list, recommended_items_list, relevance_scores_list)
                ])
        
        return results

In [None]:
import json
import time
from annoy import AnnoyIndex
import faiss
from rank_bm25 import BM25Okapi


class SearchEvaluator:
    """Class to evaluate different search approaches"""
    
    def __init__(self, data_path: str):
        # Load validation data
        with open(data_path, 'r') as f:
            self.validation_data = json.load(f)
        
        # Create movie title to index mapping
        self.movie_titles = list({movie for item in self.validation_data for movie in item['relevant_movies']})
        self.title_to_idx = {title: idx for idx, title in enumerate(self.movie_titles)}
        
        # Prepare queries and relevant items
        self.queries = [item['query'] for item in self.validation_data]
        self.relevant_items_list = [
            [self.title_to_idx[title] for title in item['relevant_movies'] if title in self.title_to_idx]
            for item in self.validation_data
        ]
        
        # Load all embeddings
        self.embeddings = {}
        self.models = {
            'all_mpnet_base_v2',
            'all_MiniLM_L12_v2',
            'multi_qa_distilbert_cos_v1'
        }
        
        for model in self.models:
            emb_path = f"embeddings/embeddings_{model.replace('-', '_')}.npy"
            self.embeddings[model] = np.load(emb_path)
        
        # Initialize BM25 (separate from embeddings)
        tokenized_titles = [title.lower().split() for title in self.movie_titles]
        self.bm25 = BM25Okapi(tokenized_titles)
    
    def bm25_search(self, query: str, top_k: int = 5) -> List[int]:
        tokenized_query = query.lower().split()
        scores = self.bm25.get_scores(tokenized_query)
        return np.argsort(scores)[-top_k:][::-1].tolist()
    
    def cosine_search(self, query: str, model_name: str, top_k: int = 5) -> List[int]:
        model = SentenceTransformer(model_name.replace("_", "-"))
        query_emb = model.encode([query])
        emb_matrix = self.embeddings[model_name]
        
        # Normalize embeddings
        query_emb = query_emb / np.linalg.norm(query_emb)
        emb_matrix = emb_matrix / np.linalg.norm(emb_matrix, axis=1, keepdims=True)
        
        # Calculate cosine similarity
        scores = np.dot(emb_matrix, query_emb.T).flatten()
        return np.argsort(scores)[-top_k:][::-1].tolist()
    
    def faiss_flat_search(self, query: str, model_name: str, top_k: int = 5) -> List[int]:
        model = SentenceTransformer(model_name.replace("_", "-"))
        query_emb = model.encode([query])
        emb_matrix = self.embeddings[model_name]
        
        # Build FAISS index
        dimension = emb_matrix.shape[1]
        index = faiss.IndexFlatIP(dimension)
        index.add(emb_matrix.astype('float32')) # for cuda
        
        # Search
        distances, indices = index.search(query_emb.astype('float32'), top_k)
        return indices[0].tolist()
    
    def faiss_hnsw_search(self, query: str, model_name: str, top_k: int = 5) -> List[int]:
        model = SentenceTransformer(model_name.replace("_", "-"))
        query_emb = model.encode([query])
        emb_matrix = self.embeddings[model_name]
        
        # Build HNSW index
        dimension = emb_matrix.shape[1]
        index = faiss.IndexHNSWFlat(dimension, 32)  # 32 is HNSW parameter
        index.add(emb_matrix.astype('float32')) # float32 for cuda
        
        # Search
        distances, indices = index.search(query_emb.astype('float32'), top_k)
        return indices[0].tolist()
    
    def annoy_search(self, query: str, model_name: str, top_k: int = 5) -> List[int]:
        model = SentenceTransformer(model_name.replace("_", "-"))
        query_emb = model.encode([query])
        emb_matrix = self.embeddings[model_name]
        
        # Build Annoy index
        dimension = emb_matrix.shape[1]
        annoy_index = AnnoyIndex(dimension, 'angular')
        for i, emb in enumerate(emb_matrix):
            annoy_index.add_item(i, emb)
        annoy_index.build(10)  # 10 trees
        
        # Search
        return annoy_index.get_nns_by_vector(query_emb[0], top_k)
    
    def evaluate_all_approaches(self, output_file: str = "results.json"):
        results = {}
        search_approaches = {
            'bm25': lambda q: self.bm25_search(q, 5),
            'cosine': lambda q, m: self.cosine_search(q, m, 5),
            'faiss_flat': lambda q, m: self.faiss_flat_search(q, m, 5),
            'faiss_hnsw': lambda q, m: self.faiss_hnsw_search(q, m, 5),
            'annoy': lambda q, m: self.annoy_search(q, m, 5)
        }
        
        for model in self.models:
            results[model] = {}
            
            for approach_name, search_func in search_approaches.items():
                # Skip bm25 for non-embedding models
                if approach_name == 'bm25' and model != 'bm25':
                    continue
                
                print(f"Evaluating {model} with {approach_name}...")
                start_time = time.time()
                
                # Get recommendations for all queries
                recommended_items_list = []
                for query in self.queries:
                    if approach_name == 'bm25':
                        recs = search_func(query)
                    else:
                        recs = search_func(query, model)
                    recommended_items_list.append(recs)
                
                # Calculate metrics
                metrics = MetricsCalculator.calculate_all_metrics(
                    self.relevant_items_list,
                    recommended_items_list,
                    k_values=[5]  # We only care about top 5
                )
                
                # Record timing
                elapsed = time.time() - start_time
                metrics['time_per_query'] = elapsed / len(self.queries)
                
                results[model][approach_name] = metrics
        
        # Save results
        with open(output_file, 'w') as f:
            json.dump(results, f, indent=2)
        
        return results

In [None]:
if __name__ == "__main__":
    evaluator = SearchEvaluator("validation_set.json")
    results = evaluator.evaluate_all_approaches()