In [None]:
!pip install -q sentence-transformers

In [None]:
pip install -U sentence-transformers

In [None]:
import json
import numpy as np
import pandas as pd
from typing import List, Dict
import warnings
warnings.filterwarnings('ignore')

# Install required packages if not available
try:
    from sentence_transformers import SentenceTransformer
    from sklearn.metrics.pairwise import cosine_similarity
except ImportError:
    import subprocess
    import sys
    print("Installing required packages...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "sentence-transformers", "scikit-learn"])
    from sentence_transformers import SentenceTransformer
    from sklearn.metrics.pairwise import cosine_similarity


In [None]:
import json
from sentence_transformers import SentenceTransformer, InputExample, losses
from torch.utils.data import DataLoader
import os
from sentence_transformers.losses import TripletLoss


os.environ["WANDB_DISABLED"] = "true"

def preprocess_for_retriever(json_path):
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)  # List of question objects

    examples = []

    for item in data:
        question = item['question']
        context_chunks = item.get('context_chunks', [])

        # Positives only for contrastive loss (in-batch negatives used)
        positives = [chunk['text'] for chunk in context_chunks if chunk.get('contains_answer', False)]

        for pos_text in positives:
            examples.append(InputExample(texts=[question, pos_text]))

    print(f"Created {len(examples)} InputExample instances")
    return examples

def preprocess_for_triplet_loss(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    triplets = []
    for item in data:
        question = item['question']
        chunks = item['context_chunks']
        
        # Anchor: Question
        anchor = question
        
        # Positive: Chunk containing answer
        positives = [c['text'] for c in chunks if c['contains_answer']]
        
        # Hard Negatives: Top irrelevant chunks (semantically similar but wrong)
        negatives = [c['text'] for c in chunks 
                    if not c['contains_answer'] and c.get('score', 0) > 0.2]
        
        # Create triplets (anchor, positive, negative)
        for pos in positives:
            for neg in negatives[:3]:  # Use top 3 negatives per positive
                triplets.append(InputExample(
                    texts=[anchor, pos, neg],  # Order matters for TripletLoss!
                    label=1.0
                ))
    
    print(f"Generated {len(triplets)} triplets")
    return triplets

def fine_tune_model(model_name, examples, output_path, num_epochs=2):
    print(f"\nStarting training for model: {model_name}")
    model = SentenceTransformer(model_name)

    train_dataloader = DataLoader(examples, shuffle=True, batch_size=16)
    train_loss = losses.MultipleNegativesRankingLoss(model)

    warmup_steps = int(len(train_dataloader) * num_epochs * 0.1)  # 10% warm-up

    model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        epochs=num_epochs,
        warmup_steps=warmup_steps,
        show_progress_bar=True,
    )

    model.save(output_path)
    print(f"Fine-tuned model saved at {output_path}")

def train_with_triplet_loss(model_name, triplets, output_path, num_epochs=2):
    model = SentenceTransformer(model_name)
    
    train_dataloader = DataLoader(triplets, shuffle=True, batch_size=16)
    train_loss = losses.TripletLoss(
        model=model,
        distance_metric=losses.TripletDistanceMetric.COSINE,
        triplet_margin=0.5  # How much better positives should be than negatives
    )
    
    model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        epochs=num_epochs,
        warmup_steps=int(0.1 * len(triplets)),
        show_progress_bar=True
    )
    
    model.save(output_path)




def main():
    json_path = '/kaggle/input/simple-python-qa-rag-dataset-v2/simple_python_qa_rag_dataset.json'  
    
    # Step 1: Prepare triplets (anchor=question, positive=answer, negative=hard distractor)
    triplets = preprocess_for_triplet_loss(json_path)  # Use the new preprocessing function
    
    # Step 2: Train with TripletLoss instead of MNR
    models_to_finetune = [
        'intfloat/e5-base',
        'thenlper/gte-base'
    ]
    
    for model_name in models_to_finetune:
        safe_model_name = model_name.replace('/', '_')
        output_path = f'/kaggle/working/triplet_loss_finetuned_{safe_model_name}'
        
        train_with_triplet_loss(
            model_name=model_name,
            triplets=triplets,
            output_path=output_path,
            num_epochs=2  # Start with 2 epochs (TripletLoss may need fewer epochs)
        )

if __name__ == '__main__':
    main()


In [None]:
def load_test_data(json_path: str) -> List[Dict]:
    """Load test dataset from JSON file"""
    try:
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        print(f"✅ Loaded {len(data)} test questions from {json_path}")
        return data
    except Exception as e:
        print(f"❌ Error loading test data: {e}")
        return []

def load_model_safely(model_path: str) -> SentenceTransformer:
    """Load model with error handling"""
    try:
        model = SentenceTransformer(model_path)
        print(f"✅ Loaded model from {model_path}")
        return model
    except Exception as e:
        print(f"❌ Error loading model from {model_path}: {e}")
        return None


In [None]:
def calculate_retrieval_metrics(ranked_indices: np.ndarray, relevant_indices: List[int], 
                               top_k_values: List[int]) -> Dict:
    """Calculate standard information retrieval metrics"""
    metrics = {}
    
    for k in top_k_values:
        top_k_indices = set(ranked_indices[:k])
        relevant_set = set(relevant_indices)
        
        # Recall@k: fraction of relevant items retrieved
        relevant_in_top_k = len(top_k_indices & relevant_set)
        recall_k = relevant_in_top_k / len(relevant_indices) if relevant_indices else 0
        
        # Precision@k: fraction of retrieved items that are relevant
        precision_k = relevant_in_top_k / k if k > 0 else 0
        
        # Hit Rate@k: whether at least one relevant item is retrieved
        hit_rate_k = 1 if relevant_in_top_k > 0 else 0
        
        metrics[f'recall@{k}'] = recall_k
        metrics[f'precision@{k}'] = precision_k
        metrics[f'hit_rate@{k}'] = hit_rate_k
    
    # Mean Reciprocal Rank (MRR)
    mrr = 0
    for rank, idx in enumerate(ranked_indices):
        if idx in relevant_indices:
            mrr = 1 / (rank + 1)
            break
    metrics['mrr'] = mrr
    
    # NDCG@10
    dcg = 0
    idcg = sum([1 / np.log2(i + 2) for i in range(min(len(relevant_indices), 10))])
    for rank, idx in enumerate(ranked_indices[:10]):
        if idx in relevant_indices:
            dcg += 1 / np.log2(rank + 2)
    metrics['ndcg@10'] = dcg / idcg if idcg > 0 else 0
    
    return metrics

def evaluate_retriever_performance(model: SentenceTransformer, test_data: List[Dict], 
                                 model_name: str = "Model", log_examples: int = 5) -> Dict:
    """Evaluate retriever performance on test data"""
    
    print(f"\n🔍 Evaluating {model_name} (Logging first {log_examples} queries)...")
    print("-" * 50)
    
    all_metrics = {metric: [] for metric in [f'recall@{k}' for k in TOP_K_VALUES] + 
                   [f'precision@{k}' for k in TOP_K_VALUES] + 
                   [f'hit_rate@{k}' for k in TOP_K_VALUES] + ['mrr', 'ndcg@10']}
    
    valid_questions = 0
    
    for idx, item in enumerate(test_data):
        question = item.get('question', '')
        context_chunks = item.get('context_chunks', [])
        
        if not question or not context_chunks:
            continue
            
        # Get relevant chunks
        relevant_indices = [i for i, chunk in enumerate(context_chunks) 
                          if chunk.get('contains_answer', False)]
        
        if not relevant_indices:
            continue
            
        valid_questions += 1
        
        # Encode question and chunks
        try:
            all_chunk_texts = [chunk.get('text', '') for chunk in context_chunks]
            question_embedding = model.encode([question], show_progress_bar=False)
            chunk_embeddings = model.encode(all_chunk_texts, show_progress_bar=False)
            
            # Calculate similarities and rank
            similarities = cosine_similarity(question_embedding, chunk_embeddings)[0]
            ranked_indices = np.argsort(similarities)[::-1]


            if idx < log_examples:
                print(f"\n📝 Question {idx+1}: {question}")
                print(f"   Gold Answer: {item['expected_answer']}...")
                for rank, chunk_idx in enumerate(ranked_indices[:3]):
                    chunk = context_chunks[chunk_idx]
                    print(f"  Rank {rank+1} (Score={similarities[chunk_idx]:.3f}):")
                    print(f"    Contains Answer: {chunk['contains_answer']}")
                    print(f"    Text: {chunk['text']}...")
            
            # Calculate metrics for this question
            question_metrics = calculate_retrieval_metrics(ranked_indices, relevant_indices, TOP_K_VALUES)
            
            # Accumulate metrics
            for metric, value in question_metrics.items():
                all_metrics[metric].append(value)
                
        except Exception as e:
            print(f"⚠️  Error processing question {idx}: {e}")
            continue
        
        # Progress update
        if valid_questions % 20 == 0:
            print(f"   Processed {valid_questions} questions...")
    
    # Calculate averages
    avg_metrics = {metric: np.mean(values) if values else 0 
                   for metric, values in all_metrics.items()}
    
    print(f"✅ Evaluation complete! Processed {valid_questions} valid questions.")
    
    return {
        'model_name': model_name,
        'total_questions': valid_questions,
        'metrics': avg_metrics,
        'raw_metrics': all_metrics
    }

In [None]:
# Load your fine-tuned model (example for gte-base)
model = SentenceTransformer('/kaggle/input/fine-tuned-retrievers/triplet_loss_finetuned_thenlper_gte-base')

# Load your dataset
test_data = load_test_data('/kaggle/input/simple-python-qa-rag-dataset-v2/simple_python_qa_rag_dataset.json')


In [None]:
def compare_models(ft_results: Dict, baseline_results: Dict) -> pd.DataFrame:
    """Create comparison DataFrame between fine-tuned and baseline models"""
    
    comparison_data = []
    
    for metric in ft_results['metrics'].keys():
        ft_score = ft_results['metrics'][metric]
        baseline_score = baseline_results['metrics'][metric]
        improvement = ft_score - baseline_score
        improvement_pct = (improvement / baseline_score * 100) if baseline_score > 0 else 0
        
        comparison_data.append({
            'Metric': metric,
            'Fine-tuned': round(ft_score, 4),
            'Baseline': round(baseline_score, 4),
            'Improvement': round(improvement, 4),
            'Improvement %': round(improvement_pct, 2)
        })
    
    return pd.DataFrame(comparison_data)

In [None]:
def show_qualitative_examples(model: SentenceTransformer, test_data: List[Dict], 
                            num_examples: int = 3, model_name: str = "Model"):
    """Display qualitative examples of retrieval results"""
    
    print(f"\n🔍 QUALITATIVE ANALYSIS - {model_name}")
    print("=" * 80)
    
    examples_shown = 0
    
    for idx, item in enumerate(test_data):
        if examples_shown >= num_examples:
            break
            
        question = item.get('question', '')
        context_chunks = item.get('context_chunks', [])
        
        if not question or not context_chunks:
            continue
            
        relevant_chunks = [chunk for chunk in context_chunks 
                         if chunk.get('contains_answer', False)]
        
        if not relevant_chunks:
            continue
            
        examples_shown += 1
        
        print(f"\n📝 EXAMPLE {examples_shown}")
        print("-" * 40)
        print(f"QUESTION: {question}")
        
        print(f"\n✅ GROUND TRUTH ({len(relevant_chunks)} relevant chunks):")
        for i, chunk in enumerate(relevant_chunks[:2]):  # Show max 2 for brevity
            text_preview = chunk.get('text', '')[:150] + "..." if len(chunk.get('text', '')) > 150 else chunk.get('text', '')
            print(f"   {i+1}. {text_preview}")
        
        # Get model predictions
        try:
            all_chunk_texts = [chunk.get('text', '') for chunk in context_chunks]
            question_embedding = model.encode([question])
            chunk_embeddings = model.encode(all_chunk_texts)
            similarities = cosine_similarity(question_embedding, chunk_embeddings)[0]
            ranked_indices = np.argsort(similarities)[::-1]
            
            print(f"\n🤖 MODEL PREDICTIONS (Top 3):")
            for rank, idx in enumerate(ranked_indices[:3]):
                is_relevant = context_chunks[idx].get('contains_answer', False)
                status = "✅ RELEVANT" if is_relevant else "❌ NOT RELEVANT"
                score = similarities[idx]
                text_preview = context_chunks[idx].get('text', '')[:150] + "..." if len(context_chunks[idx].get('text', '')) > 150 else context_chunks[idx].get('text', '')
                
                print(f"   {rank+1}. [Score: {score:.3f}] {status}")
                print(f"      {text_preview}")
                
        except Exception as e:
            print(f"   ❌ Error getting predictions: {e}")


In [None]:
def print_performance_summary(results: Dict):
    """Print a formatted summary of performance metrics"""
    
    print(f"\n📊 PERFORMANCE SUMMARY - {results['model_name']}")
    print("=" * 50)
    print(f"Total Questions Evaluated: {results['total_questions']}")
    print("\n🎯 Key Metrics:")
    
    # Group metrics for better readability
    recall_metrics = {k: v for k, v in results['metrics'].items() if k.startswith('recall@')}
    hit_rate_metrics = {k: v for k, v in results['metrics'].items() if k.startswith('hit_rate@')}
    other_metrics = {k: v for k, v in results['metrics'].items() if not k.startswith(('recall@', 'hit_rate@', 'precision@'))}
    
    print("\n   Recall (How much relevant content is retrieved):")
    for metric, score in recall_metrics.items():
        print(f"   • {metric:12}: {score:.4f}")
    
    print("\n   Hit Rate (Questions with at least one relevant result):")
    for metric, score in hit_rate_metrics.items():
        print(f"   • {metric:12}: {score:.4f}")
    
    print("\n   Ranking Quality:")
    for metric, score in other_metrics.items():
        print(f"   • {metric:12}: {score:.4f}")

In [None]:

FINE_TUNED_MODEL_PATHS = {
    'e5-base': '/kaggle/input/fine-tuned-retrievers/triplet_loss_finetuned_intfloat_e5-base',
    'gte-base': '/kaggle/input/fine-tuned-retrievers/triplet_loss_finetuned_thenlper_gte-base'  # Fix typo if needed
}

BASELINE_MODELS = {
    'gte-base': 'thenlper/gte-base',
    'e5-base': 'intfloat/e5-base',
}
TEST_DATA_PATH = '/kaggle/input/simple-python-qa-rag-dataset-v2/simple_python_qa_rag_dataset.json'
OUTPUT_DIR = '/kaggle/working'

# Evaluation settings
TOP_K_VALUES = [1, 3]
NUM_QUALITATIVE_EXAMPLES = 3