### Initial Imports and Configurations

In [2]:
import time
import psutil
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import json
import os
import gc
import random
from sentence_transformers import SentenceTransformer, util
os.environ["TRANSFORMERS_NO_ADVISORY_WARNINGS"] = "1"
os.environ["TOKENIZERS_PARALLELISM"] = "false"

import warnings
warnings.filterwarnings("ignore", message=".*RobertaModel.*")
warnings.filterwarnings("ignore", message=".*pooler.dense.*")

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)
 
device='cpu'
print("Environment installation complete. running on CPU")


Environment installation complete. running on CPU


In [3]:
from datasets import load_dataset

#Load the dataset
dataset = load_dataset("trivia_qa", "rc", split='train[:10000]')

#Extract questions and Answers
questions = [item['question'] for item in dataset]
answers = [item['answer'] for item in dataset]

print(f'Loaded {len(questions)} questions and answers')

Resolving data files:   0%|          | 0/26 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/26 [00:00<?, ?it/s]

Loaded 10000 questions and answers


In [4]:
"""
print(f"Total answers: {len(answers)}")
print(f"Sample of first 5 answers:")
for i, answer in enumerate(answers[:5]):
    print(f"  Answer {i}: Type={type(answer)}, Value={answer}")
"""


'\nprint(f"Total answers: {len(answers)}")\nprint(f"Sample of first 5 answers:")\nfor i, answer in enumerate(answers[:5]):\n    print(f"  Answer {i}: Type={type(answer)}, Value={answer}")\n'

In [5]:
"""
print(f"Total Questions: {len(questions)}")
print(f"Sample of first 5 questions:")
for i, question in enumerate(questions[150:300]):
    print(f"  Question {i}: Type={type(question)}, Value={question}")

"""


'\nprint(f"Total Questions: {len(questions)}")\nprint(f"Sample of first 5 questions:")\nfor i, question in enumerate(questions[150:300]):\n    print(f"  Question {i}: Type={type(question)}, Value={question}")\n\n'

### Set up Database and Embedding function

In [6]:
import chromadb
from chromadb.utils import embedding_functions
from evaluate import load
import torch
#Set pytorch seeds
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

#initialize chromadb client
chroma_client = chromadb.PersistentClient('./chroma_db_optimized')

#embedding function
embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
             model_name="BAAI/bge-small-en-v1.5",device='cpu',
             trust_remote_code=True    
             )

#create collection
collection = chroma_client.get_or_create_collection(
    name = 'trivia-qa',
    embedding_function= embedding_function,
    metadata={"hnsw:space":"cosine",
             "hnsw:construction_ef":100,
             "hnsw:search_ef":50}
)

In [33]:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

print("Loading Language Model....")
tokenizer = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0",trust_remote_code=True)
llm_model = AutoModelForCausalLM.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0",trust_remote_code=True)
#set seed for transformers if available
try:
    from transformers import set_seed
    set_seed(SEED)
except ImportError:
    pass


Loading Language Model....


In [8]:
!python --version

Python 3.11.7


### Generate Embeddings and populate the vector database

In [9]:
import json

def extract_answer_value(answer_dict):
    """Extract the primary answer value"""
    if isinstance(answer_dict,dict):
        if 'normalized_value' in answer_dict and answer_dict['normalized_value']:
            return answer_dict['normalized_value']
        if 'value' in answer_dict and answer_dict['value']:
            return answer_dict['value']
        if 'aliases' in answer_dict and answer_dict['aliases']:
            return answer_dict['aliases'][0]
    return str(answer_dict)

print("Generate embeddings and populate vector database")
batch_size = 500 #process in batches to manage memory

for i in range(0,len(questions), batch_size):
    batch_questions = questions[i:i+batch_size]
    batch_answers = answers[i:i+batch_size]
    
    #convert answers to strings
    batch_answer_strings = [extract_answer_value(answer) for answer in batch_answers]
    
    #generate embeddings for this batch of questions
    embeddings = embedding_function(batch_questions)
    
    #Create IDs for each document
    ids = [f"id_{j}" for j in range(i,min(i+batch_size,len(questions)))]
    
    #Add to collection
    collection.add(
    embeddings= embeddings,
    documents= batch_answer_strings,
    metadatas= [{"questions" : q} for q in batch_questions],
    ids=ids
    )
    
    if i % 1000 == 0:
        print(f'Processed {i} items')
        
print("Database population complete.")  
    

Generate embeddings and populate vector database
Processed 0 items
Processed 1000 items
Processed 2000 items
Processed 3000 items
Processed 4000 items
Processed 5000 items
Processed 6000 items
Processed 7000 items
Processed 8000 items
Processed 9000 items
Database population complete.


### RAG PIPELINE FUNCTIONS

In [29]:
import torch
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
import logging
import re
from rank_bm25 import BM25Okapi
from sentence_transformers import SentenceTransformer

# Disable gradients for inference
torch.set_grad_enabled(False)


# Using a lightweight model for CPU environments
embedding_model = SentenceTransformer('BAAI/bge-small-en-v1.5')

class HybridReasoningRetriever:
    def __init__(self, documents: List[str]):
        """
        Initialize the hybrid retriever with both sparse and dense components.
        
        Args:
            documents: List of text documents to retrieve from
        """
        self.documents = documents
        
        # Initialize sparse retriever (BM25)
        self.tokenize = lambda text: re.findall(r'\w+', text.lower())
        tokenized_corpus = [self.tokenize(doc) for doc in documents]
        self.bm25 = BM25Okapi(tokenized_corpus)
        
        # Precompute document embeddings for dense retrieval
        self.doc_embeddings = embedding_model.encode(documents, show_progress_bar=False)
        
        # Initialize query expansion model (would use a lightweight LLM in practice)
        self.expansion_model = None  # Placeholder for a small LLM
    
    def sparse_retrieve(self, query: str, top_k: int = 5) -> List[Tuple[int, float]]:
        """
        Retrieve documents using BM25 (sparse retrieval).
        """
        tokenized_query = self.tokenize(query)
        scores = self.bm25.get_scores(tokenized_query)
        top_indices = np.argsort(scores)[::-1][:top_k]
        return [(i, scores[i]) for i in top_indices]
    
    def dense_retrieve(self, query: str, top_k: int = 5) -> List[Tuple[int, float]]:
        """
        Retrieve documents using dense embeddings.
        """
        query_embedding = embedding_model.encode([query])
        similarities = np.dot(self.doc_embeddings, query_embedding.T).flatten()
        top_indices = np.argsort(similarities)[::-1][:top_k]
        return [(i, similarities[i]) for i in top_indices]
    

    
    def hybrid_retrieve(self, query: str, top_k: int = 5, 
                       alpha: float = 0.7) -> List[Tuple[int, float]]:
        """
        Hybrid retrieval combining sparse and dense methods.
        
        Args:
            query: The query to retrieve documents for
            top_k: Number of documents to retrieve
            alpha: Weight for dense retrieval (1-alpha for sparse)
            
        """
        
        retrieval_query = query

        
        # Get sparse results
        sparse_results = self.sparse_retrieve(retrieval_query, top_k * 2)
        sparse_scores = np.zeros(len(self.documents))
        for idx, score in sparse_results:
            sparse_scores[idx] = score
        
        # Get dense results
        dense_results = self.dense_retrieve(retrieval_query, top_k * 2)
        dense_scores = np.zeros(len(self.documents))
        for idx, score in dense_results:
            dense_scores[idx] = score
        
        # Normalize scores
        if np.max(sparse_scores) > 0:
            sparse_scores = sparse_scores / np.max(sparse_scores)
        if np.max(dense_scores) > 0:
            dense_scores = dense_scores / np.max(dense_scores)
        
        # Combine scores
        combined_scores = alpha * dense_scores + (1 - alpha) * sparse_scores
        top_indices = np.argsort(combined_scores)[::-1][:top_k]
        
        return [(i, combined_scores[i]) for i in top_indices]

def retriever(query: str, k: int = 3, score_threshold: float = 0.4,
             max_candidates: int = 30) -> List[Dict[str, Any]]:
    """
    Enhanced hybrid retriever with reasoning capabilities.
    
    Args:
        query: Trivia question to search for
        k: Number of relevant answers to retrieve
        score_threshold: Minimum similarity score to include a result
        max_candidates: Maximum number of candidates to consider
    
    Returns:
        List of dictionaries containing document content, metadata, and similarity score
    """
    try:
        global hybrid_retriever
        
        # Preprocess query
        processed_query = _preprocess_trivia_query(query)
        
        # Use hybrid retrieval with reasoning capabilities
        results = hybrid_retriever.hybrid_retrieve(
            processed_query, 
            top_k=min(k * 5, max_candidates),
        )
        
        if not results:
            return []
        
        # Convert to scored results format
        scored_results = []
        for idx, score in results:
            scored_results.append({
                'content': hybrid_retriever.documents[idx],
                'score': float(score)  # Convert numpy float to Python float
            })
        
        # Filter by score threshold
        filtered_results = [r for r in scored_results if r['score'] >= score_threshold]
        
        # Filter low-quality context
        filtered_results = _filter_low_quality_context(filtered_results)
        
        # If no results meet threshold, return top results
        if not filtered_results and scored_results:
            filtered_results = scored_results[:min(3, len(scored_results))]
        
        # Ensure scores meet minimum threshold
        for result in filtered_results:
            result['score'] = max(score_threshold - 0.1, result['score'])
        
        # Sort by score
        filtered_results.sort(key=lambda x: x['score'], reverse=True)
        
        # Deduplicate
        unique_results = _deduplicate(filtered_results, k)
        
        return unique_results[:k]
            
    except Exception as e:
        logging.error(f"Error in retrieval: {e}")
        return []

# Keep existing helper functions
def _preprocess_trivia_query(query: str) -> str:
    """
    Preprocess the query to focus on key entities and facts
    """
    words = query.lower().split()
    filtered_words = []
    
    for word in words:
        if word in ['who', 'when', 'where', 'which', 'what']:
            filtered_words.append(word)
        elif word not in ['is', 'are', 'did', 'do', 'does', 'the', 'a', 'an']:
            filtered_words.append(word)
    
    return " ".join(filtered_words) if filtered_words else query

def _deduplicate(results: List[Dict], k: int) -> List[Dict]:
    """
    Memory-efficient deduplication for resource-constrained environments
    """
    if not results:
        return []
    
    unique_results = []
    seen_content_hashes = set()
    
    for result in results:
        if len(unique_results) >= k * 2:
            break
        
        content = result['content']
        content_hash = _simple_content_hash(content)
        
        if content_hash not in seen_content_hashes:
            seen_content_hashes.add(content_hash)
            unique_results.append(result)
    
    return unique_results

def _simple_content_hash(content: str, max_length: int = 200) -> int:
    """
    Create a simple hash for content deduplication without heavy processing
    """
    short_content = content[:max_length] if len(content) > max_length else content
    return hash(short_content)

def _filter_low_quality_context(contexts: List[Dict], min_length: int = 10) -> List[Dict]:
    """
    Filter out low-quality context documents
    """
    filtered = []
    for ctx in contexts:
        content = ctx['content']
        
        if len(content.strip()) < min_length:
            continue
            
        # Skip JSON-like content
        if content.strip().startswith('{') and '}' in content:
            continue
            
        # Skip content with too many special characters
        if sum(1 for c in content if not c.isalnum() and not c.isspace()) / len(content) > 0.5:
            continue
            
        filtered.append(ctx)
        
    return filtered

In [12]:
# Initialize the hybrid retriever with your actual documents
print("Initializing hybrid retriever with documents...")

# Extract the actual documents from your collection
document_contents = []
batch_size = 500

for i in range(0, len(questions), batch_size):
    batch_end = min(i + batch_size, len(questions))
    ids = [f"id_{j}" for j in range(i, batch_end)]
    
    try:
        results = collection.get(ids=ids)
        document_contents.extend(results['documents'])
    except Exception as e:
        print(f"Error retrieving batch {i}-{batch_end}: {e}")
        # Fallback: use the original answer strings
        document_contents.extend([json.dumps(answer) for answer in answers[i:batch_end]])

print(f"Loaded {len(document_contents)} documents for hybrid retriever")

# Initialize the hybrid retriever (GLOBAL)
hybrid_retriever = HybridReasoningRetriever(document_contents)
print("Hybrid retriever initialized successfully!")

Initializing hybrid retriever with documents...
Loaded 10000 documents for hybrid retriever
Hybrid retriever initialized successfully!


In [35]:
import torch
from transformers import GenerationConfig
import re
from collections import defaultdict
import json
import warnings

def create_rag_prompt(question, contexts, system_msg="You are a factual question-answering assistant."):
    """Improved prompt construction for trivia QA"""
    
    # Format contexts properly
    context_text = ""
    for i, ctx in enumerate(contexts[:3]):  # Use top 3 contexts max
        if isinstance(ctx, dict):
            content = ctx.get('content', '')
        else:
            content = str(ctx)
        
        # Clean and truncate context
        clean_ctx = content.strip()
        if len(clean_ctx) > 300:  # Truncate very long contexts
            clean_ctx = clean_ctx[:300] + "..."
        
        context_text += f"[Context {i+1}]: {clean_ctx}\n\n"
    
    return f"""{system_msg}

QUESTION: {question}

RELEVANT CONTEXTS:
{context_text}

INSTRUCTIONS:
- Answer based ONLY on the provided contexts
- Be concise and factual
- If the answer is not in the contexts, say "I cannot find the answer in the provided information"
- Do not make up information
- Keep your answer short (1-3 words when possible)

ANSWER:"""

def generator(question, contexts, llm_model, tokenizer):
    """
    Improved generator function with better context selection and processing
    """
    
    if not contexts:
        return "I cannot find the answer in the provided information."
    
    # Debug: Check what contexts look like
    print(f"Number of contexts received: {len(contexts)}")
    if contexts:
        print(f"First context type: {type(contexts[0])}")
        print(f"First context keys (if dict): {contexts[0].keys() if isinstance(contexts[0], dict) else 'Not a dict'}")
    
    # Extract content from contexts if they are dictionaries
    processed_contexts = []
    for ctx in contexts:
        if isinstance(ctx, dict):
            # Handle different possible content structures
            if 'content' in ctx:
                content = ctx['content']
            elif 'text' in ctx:
                content = ctx['text']
            elif 'value' in ctx:
                content = ctx['value']
            else:
                # If it's a string-like dict, use the string representation
                content = str(ctx)
        else:
            content = str(ctx)
        
        processed_contexts.append(content)
    
    # Improved context selection with simple ranking
    if len(processed_contexts) > 1:
        ranked_contexts = rank_contexts(question, processed_contexts)
    else:
        ranked_contexts = processed_contexts
    
    # Construct prompt with top contexts
    final_prompt = create_rag_prompt(question, ranked_contexts[:3])  # Use top 3 contexts
    
    # Tokenization with better parameters
    inputs = tokenizer(
        final_prompt,
        return_tensors='pt',
        truncation=True,
        max_length=512,  # Reduced for better performance
        padding=True
    )
    
    # Improved generation config for factual answers
    generation_config = GenerationConfig(
        max_new_tokens=50,  # Increased for complete answers
        num_return_sequences=1,
        temperature=0.1,  # Lower temperature for more factual responses
        do_sample=False,  # Use greedy sampling for factual answers
        top_p=0.9,
        repetition_penalty=1.1,
        pad_token_id=tokenizer.eos_token_id,
        eos_token_id=tokenizer.eos_token_id,
    )
    
    llm_model.eval()
    
    # Generate answer
    with torch.no_grad():
        try:
            outputs = llm_model.generate(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask'],
                generation_config=generation_config,
                early_stopping=True
            )
            
            # Extract only the generated part
            generated_tokens = outputs[0][inputs['input_ids'].shape[1]:]
            answer = tokenizer.decode(generated_tokens, skip_special_tokens=True).strip()
            
            # Clean up the answer
            answer = clean_answer(answer)
            
        except Exception as e:
            print(f"Generation error: {e}")
            answer = "I cannot provide an answer at the moment."
    
    return answer

def rank_contexts(question, contexts):
    """
    Simple but effective context ranking for trivia QA
    """
    if not contexts:
        return []
    
    question_lower = question.lower()
    question_words = set(re.findall(r'\w+', question_lower))
    
    scored_contexts = []
    
    for i, context in enumerate(contexts):
        score = 0.0
        
        # Basic length score (prefer medium-length contexts)
        context_length = len(context)
        if 50 <= context_length <= 500:  # Ideal length range
            score += 1.0
        elif context_length < 20:  # Too short
            score -= 2.0
        
        # Keyword overlap score
        context_lower = context.lower()
        context_words = set(re.findall(r'\w+', context_lower))
        overlap = len(question_words.intersection(context_words))
        keyword_score = overlap / max(len(question_words), 1)
        score += keyword_score * 2.0  # Weight keyword matches heavily
        
        # Exact phrase matching (important for trivia)
        if any(word in context_lower for word in question_lower.split()):
            score += 1.5
        
        # Position bonus (often first contexts are better)
        score += (1.0 / (i + 1)) * 0.5
        
        scored_contexts.append((context, score))
    
    # Sort by score descending
    scored_contexts.sort(key=lambda x: x[1], reverse=True)
    
    return [ctx for ctx, score in scored_contexts]

def clean_answer(answer):
    """
    Clean and format the answer for trivia questions
    """
    if not answer:
        return "I cannot find the answer in the provided information."
    
    # Remove any prompt fragments that might have been generated
    unwanted_phrases = [
        "ANSWER:", "Answer:", "based on the context", "according to the text",
        "the context says", "the information states"
    ]
    
    for phrase in unwanted_phrases:
        answer = answer.replace(phrase, "")
    
    # Clean up punctuation and whitespace
    answer = re.sub(r'[^\w\s\.\,\-\']', '', answer)  # Keep basic punctuation
    answer = answer.strip()
    
    # Capitalize first letter
    if answer and answer[0].islower():
        answer = answer[0].upper() + answer[1:]
    
    # Handle empty answers
    if not answer or len(answer) < 2:
        return "I cannot find the answer in the provided information."
    
    return answer

# Optional: Add a simple evaluation helper
def evaluate_answer(true_answer, generated_answer):
    """
    Simple evaluation for debugging
    """
    true_lower = true_answer.lower()
    generated_lower = generated_answer.lower()
    
    # Exact match
    if true_lower == generated_lower:
        return 1.0
    
    # Substring match
    if true_lower in generated_lower or generated_lower in true_lower:
        return 0.8
    
    # Keyword overlap
    true_words = set(re.findall(r'\w+', true_lower))
    gen_words = set(re.findall(r'\w+', generated_lower))
    overlap = len(true_words.intersection(gen_words))
    
    if overlap >= len(true_words) * 0.5:  # 50% keyword overlap
        return 0.6
    
    return 0.0

## Functions to Calculate Answer Correctness

In [14]:
from typing import List, Dict, Any, Tuple, Optional

def calculate_semantic_similarity(text1:str, text2:str)-> float:
    if not text1 or not text2:
        return 0.0
    
    if model is not None:
        try:
            embeddings1 = model.encode(text1,convert_to_tensor=True)
            embeddings2 = model.encode(text2,convert_to_tensor=True)
            cosine_scores = util.pytorch_cos_sim(embeddings1,embeddings2)
            return float(cosine_scores[0][0])
        except Exception as e:
            print(f"Semantic similarity calculation failed : {e}")

def calculate_simple_similarity(text1:str, text2:str)-> float:
    words1 = set(text1.split())
    words2 = set(text2.split())
    
    if not words1 or not words2:
        return 0.0
    return len(words1.intersection(words2)) / len(words1.union(words2))

def calculate_correctness(substring_match,semantic_similarity,bert_f1_score):
    substring_match_bool = bool(substring_match)
    semantic_sim_norm = max(0.0,min(1.0,semantic_similarity))
    bert_f1_norm = max(0.0,min(1.0,bert_f1_score))
    
    if substring_match_bool:
        base_score = 0.80
        bonus = 0.20 * (0.5 * semantic_sim_norm + 0.5 * bert_f1_norm)
        return min(1.0,base_score + bonus)
    else:
        semantic_weight = 0.6
        bert_f1_weight = 0.4
        composite_score = (semantic_weight * semantic_sim_norm + bert_f1_weight * bert_f1_norm)
        final_score = composite_score ** 0.8
        return final_score
    
def normalize_text(text: str) -> str:
    if not text:
        return ""
    text = str(text)
    text = text.lower().strip()
    
    text = re.sub(r'[^\w\s]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    
    return text


def calculate_answer_relevance(question: str, generated_answer: str, 
                             ground_truth_answer: str = None) -> Dict[str, float]:
    """
    Calculate how relevant the answer is to the question
    """
    if not question or not generated_answer:
        return {"relevance_score": 0.0, "question_similarity": 0.0}
    
    # Method 1: Direct similarity between question and answer
    question_answer_similarity = calculate_semantic_similarity(question, generated_answer)
    
    # Method 2: If ground truth is available, compare answer similarity patterns
    if ground_truth_answer:
        gt_question_similarity = calculate_semantic_similarity(question, ground_truth_answer)
        answer_gt_similarity = calculate_semantic_similarity(generated_answer, ground_truth_answer)
        
        # Relevance score combines direct similarity and alignment with ground truth pattern
        relevance_score = 0.7 * question_answer_similarity + 0.3 * answer_gt_similarity
    else:
        relevance_score = question_answer_similarity
    
    return {
        "relevance_score": min(1.0, relevance_score),
        "question_similarity": question_answer_similarity
    }

In [15]:
import os
import time
import psutil
import numpy as np
import re
import logging
from bert_score import score as bert_score
import warnings
from sentence_transformers import SentenceTransformer, util
from typing import List, Dict, Any, Tuple, Optional
import torch

# Suppress all transformers warnings
os.environ["TRANSFORMERS_NO_ADVISORY_WARNINGS"] = "1"
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# More comprehensive warning suppression
warnings.filterwarnings("ignore")
logging.getLogger("transformers").setLevel(logging.ERROR)
logging.getLogger("bert_score").setLevel(logging.ERROR)
os.environ['CUDA_VISIBLE_DEVICES'] = ''  # Force CPU usage

model = SentenceTransformer('all-MiniLM-L6-v2')

def evaluate_rag_system(test_questions, test_answers, llm_model, tokenizer, k=3):
    """
    test_questions (list[str]): Input questions
    test_answers (list[str]): Ground truth answers
    llm_model: Preloaded language model (CPU)
    tokenizer: Preloaded tokenizer
    k (int): Top-k retrieved documents
    """
    
    predictions = []
    latencies = []
    cpu_times = []
    memory_usages = []
    
    process = psutil.Process(os.getpid())
    
    for i, (question, true_answer) in enumerate(zip(test_questions, test_answers)):
        if i % 10 == 0:
            print(f'Processing question {i}/{len(test_questions)}')
            
        # Start time
        start_time = time.time()
        cpu_start = process.cpu_times()
        mem_start = process.memory_info().rss
        
        # Retrieve Context and generate answer
        contexts_dicts = retriever(question, k=k)
        
        # Generate Answers
        answer = generator(question, contexts_dicts,llm_model,tokenizer)
        
        # End timing and calculate metrics
        end_time = time.time()
        cpu_end = process.cpu_times()
        mem_end = process.memory_info().rss
        
        latency = end_time - start_time
        cpu_time = (cpu_end.user - cpu_start.user) + (cpu_end.system - cpu_start.system)
        memory_usage = (mem_end - mem_start) / 1024 / 1024  # Convert to MB
        
        predictions.append(answer)
        latencies.append(latency)
        cpu_times.append(cpu_time)
        memory_usages.append(max(0, memory_usage))

    all_results = []
    relevance_metrics=[]
        
    for i, (pred, true) in enumerate(zip(predictions, test_answers)):
        if not pred or not true:
            result = {
                "correctness_score": 0.0,
                "semantic_similarity": 0.0,
                "is_plausible": 0.0,
                "substring_match": 0.0,
                "bert_f1_score": 0.0
            }
            all_results.append(result)
            relevance_metrics.append({"relevance_score":0,"question_similarity":0})
            continue
            
        pred_norm = normalize_text(pred)
        
        # Get all the possible answers from value, aliases, normalized values
        all_correct_answers = set()
        all_correct_answers.add(true['value'])
        all_correct_answers.update(true['aliases'])
        all_correct_answers.update(true['normalized_aliases'])
        all_correct_answers.add(true['normalized_value'])
        
        # Remove all the empty strings and normalize the correct answers
        correct_answers_norm = [normalize_text(ans) for ans in all_correct_answers if ans and str(ans).strip()]
        # Remove duplicates and empty
        correct_answers_norm = list(set([ans for ans in correct_answers_norm if ans]))
        
        if not correct_answers_norm:
            result = {
                "correctness_score": 0.0,
                "semantic_similarity": 0.0,
                "is_plausible": 0.0,
                "substring_match": 0.0,
                "bert_f1_score": 0.0
            }
            all_results.append(result)
            relevance_metrics.append({"relevance_score":0,"question_similarity":0})
            continue
        
        # Check for substring match
        substring_match = any(correct_norm in pred_norm for correct_norm in correct_answers_norm)
        
        # For semantic metrics use main value as the reference
        ref_norm = normalize_text(true['value'])
        
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", message=".*RobertaModel.*")
            P, R, F1 = bert_score([pred_norm], [ref_norm], lang='en', verbose=False)
            bert_f1_score = float(F1[0])
        
        semantic_similarity = calculate_semantic_similarity(pred_norm,correct_answers_norm)
        if semantic_similarity == 0.0 :
            semantic_similarity = calculate_simple_similarity(pred_norm,correct_answers_norm)
        
        correctness_score = calculate_correctness(substring_match,semantic_similarity,bert_f1_score)
        is_plausible = (semantic_similarity > 0.5 or substring_match)
        
       
        #Calculate Answer Relevance
        relevance_metric = calculate_answer_relevance(test_questions[i],pred,true['value'])
        relevance_metrics.append(relevance_metric)
        
        text_correctness_results = {
            "correctness_score": correctness_score,
            "semantic_similarity": semantic_similarity,
            "is_plausible": is_plausible,
            "substring_match": substring_match,
            "bert_f1_score": bert_f1_score,
        }
        all_results.append(text_correctness_results)
    
    # Calculate Average results
    aggregated = {
        "total_items": len(all_results),
        "average_correctness": sum(r["correctness_score"] for r in all_results) / len(all_results),
        "average_semantic_similarity": sum(r["semantic_similarity"] for r in all_results) / len(all_results),
        "plausible_count": sum(r["is_plausible"] for r in all_results),
        "plausible_percentage": sum(r["is_plausible"] for r in all_results) / len(all_results) * 100,
        "substring_match_count": sum(r["substring_match"] for r in all_results),
        "bert_f1_score": sum(r["bert_f1_score"] for r in all_results) / len(all_results),
    }
    
   
    # Aggregate relevance metrics
    relevance_aggregated = {
        "relevance_score_avg": sum(r["relevance_score"] for r in relevance_metrics) / len(relevance_metrics),
        "question_similarity_avg": sum(r["question_similarity"] for r in relevance_metrics) / len(relevance_metrics),
    }
    
    # Compile Results
    results = {
        "latency_avg": np.mean(latencies),
        "latency_std": np.std(latencies),
        "cpu_time_avg": np.mean(cpu_times),
        "cpu_time_std": np.std(cpu_times),
        "memory_usage_avg": np.mean(memory_usages),
        "average_correctness": aggregated["average_correctness"],
        "average_semantic_similarity": aggregated["average_semantic_similarity"],
        "plausible_percentage": aggregated["plausible_percentage"],
        "substring_match_count": aggregated["substring_match_count"],
        "bert_f1_score": aggregated["bert_f1_score"],
        
        # Relevance metrics
        "answer_relevance_score": relevance_aggregated["relevance_score_avg"],
        "question_answer_similarity": relevance_aggregated["question_similarity_avg"],
    }
    

    
    return results

In [16]:
import json
import datetime
import os
from typing import Dict, Any

def setup_logging(results: Dict[str, Any], test_items_count: int, log_dir: str = "rag_evaluation_logs"):
    """Setup logging with timestamp and test item count"""
    
    # Create logs directory if it doesn't exist
    os.makedirs(log_dir, exist_ok=True)
    main_folder="Logs"
    
    # Create filename with timestamp and test count
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"rag_eval_{log_dir}_{timestamp}_{test_items_count}items.json"
    filepath = os.path.join(main_folder, filename)
    
    # Add metadata to results
    results_with_metadata = {
        "metadata": {
            "timestamp": datetime.datetime.now().isoformat(),
            "test_items_count": test_items_count,
            "log_file": filename
        },
        "results": results
    }
    
    # Save to file
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(results_with_metadata, f, indent=2, ensure_ascii=False)
    
    print(f"Results logged to: {filepath}")
    return filepath



### Evaluate RAG system

In [17]:
import torch
import random

#Create a random test set
test_size = 100
if len(questions) >= test_size:
    indices = random.sample(range(len(questions)), test_size)
    
    test_questions = [questions[i] for i in indices]
    test_answers = [answers[i] for i in indices]

print("Running Evaluation on test set.....")
results = evaluate_rag_system(test_questions,test_answers,llm_model,tokenizer,k=3)
setup_logging(results,len(test_questions),log_dir="BGERRF-Tinyllama")

#Print Results
print("\n=== RAG SYSTEM EVALUATION RESULTS ===")
print(f"Average latency : {results['latency_avg']:.4f} ± {results['latency_std']:.4f} seconds")
print(f"Average CPU time : {results['cpu_time_avg']:.2f} ± {results['cpu_time_std']:.2f} seconds")
print(f"Average Memory Usage : {results['memory_usage_avg']:.4f} Mb")

print("\n=== RAG SYSTEM EVALUATION RESULTS FOR QUALITY OF ANSWER ===")

print(f"Average Correctness: {results['average_correctness']}")
print(f"Average Semantic Similarity:{results['average_semantic_similarity']}")

print(f"Average Answer Relevance Score: {results['answer_relevance_score']}")
print(f"Average Question Answer Similarity: {results['question_answer_similarity']}")

print(f"Average Substring Matchcount:{results['substring_match_count']}")
print(f"Bert F1 Score: {results['bert_f1_score']:.2f}")


Running Evaluation on test set.....
Processing question 0/100
Processing question 10/100
Processing question 20/100
Processing question 30/100
Processing question 40/100
Processing question 50/100
Processing question 60/100
Processing question 70/100
Processing question 80/100
Processing question 90/100
Results logged to: Logs\rag_eval_BGERRF_20250924_162854_100items.json

=== RAG SYSTEM EVALUATION RESULTS ===
Average latency : 11.7983 ± 4.2484 seconds
Average CPU time : 45.45 ± 17.06 seconds
Average Memory Usage : 34.8039 Mb

=== RAG SYSTEM EVALUATION RESULTS FOR QUALITY OF ANSWER ===
Average Correctness: 0.6362485070214792
Average Semantic Similarity:0.2169619937427342
Average Answer Relevance Score: 0.5946941991038621
Average Question Answer Similarity: 0.7124152204953134
Average Substring Matchcount:33
Bert F1 Score: 0.81


In [20]:
import torch
import random

#Create a random test set
test_size = 2
if len(questions) >= test_size:
    indices = random.sample(range(len(questions)), test_size)
    
    test_questions = [questions[i] for i in indices]
    test_answers = [answers[i] for i in indices]

print("Running Evaluation on test set.....")
results = evaluate_rag_system(test_questions,test_answers,llm_model,tokenizer,k=3)
setup_logging(results,len(test_questions),log_dir="BGERRF")

#Print Results
print("\n=== RAG SYSTEM EVALUATION RESULTS ===")
print(f"Average latency : {results['latency_avg']:.4f} ± {results['latency_std']:.4f} seconds")
print(f"Average CPU time : {results['cpu_time_avg']:.2f} ± {results['cpu_time_std']:.2f} seconds")
print(f"Average Memory Usage : {results['memory_usage_avg']:.4f} Mb")

print("\n=== RAG SYSTEM EVALUATION RESULTS FOR QUALITY OF ANSWER ===")

print(f"Average Correctness: {results['average_correctness']}")
print(f"Average Semantic Similarity:{results['average_semantic_similarity']}")

print(f"Average Answer Relevance Score: {results['answer_relevance_score']}")
print(f"Average Question Answer Similarity: {results['question_answer_similarity']}")

print(f"Average Substring Matchcount:{results['substring_match_count']}")
print(f"Bert F1 Score: {results['bert_f1_score']:.2f}")


Running Evaluation on test set.....
Processing question 0/2
Number of contexts received: 2
First context type: <class 'dict'>
First context keys (if dict): dict_keys(['content', 'score'])
Number of contexts received: 3
First context type: <class 'dict'>
First context keys (if dict): dict_keys(['content', 'score'])
Results logged to: Logs\rag_eval_BGERRF_20250924_163616_2items.json

=== RAG SYSTEM EVALUATION RESULTS ===
Average latency : 13.6652 ± 4.8506 seconds
Average CPU time : 56.30 ± 14.90 seconds
Average Memory Usage : 0.2480 Mb

=== RAG SYSTEM EVALUATION RESULTS FOR QUALITY OF ANSWER ===
Average Correctness: 0.4701780085830796
Average Semantic Similarity:0.11509966477751732
Average Answer Relevance Score: 0.6829853273928165
Average Question Answer Similarity: 0.9154719114303589
Average Substring Matchcount:0
Bert F1 Score: 0.80
