2. Data Loading & Preprocessing


In [2]:
import os
import json
import torch
from typing import List, Dict, Tuple

# Assume you have your datasets (TAQA, TempQuestions, TimeQA, CRAG) downloaded
# and in a structured format (e.g., JSON files).

def load_json_dataset(filepath: str) -> List[Dict]:
    """Loads a dataset from a JSON file."""
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
    return data

def preprocess_document(doc: Dict) -> Dict:
    """
    Basic preprocessing for a document.
    Assumes doc has 'id', 'text', and 'timestamp' (optional).
    """
    doc_id = doc.get('id')
    text = doc.get('text', '').strip()
    timestamp = doc.get('timestamp') # Store as string, parse later if needed
    
    # Add more cleaning/tokenization as needed
    
    return {'id': doc_id, 'text': text, 'timestamp': timestamp}

def preprocess_question(question: Dict) -> Dict:
    """
    Basic preprocessing for a question.
    Assumes question has 'id', 'text', and 'ground_truth_temporal' (for Time-Aware Module testing).
    """
    q_id = question.get('id')
    text = question.get('question_text', '').strip()
    # This 'ground_truth_temporal' would be crucial for evaluating your TimeAwareModule
    ground_truth_temporal = question.get('is_temporal', None) 
    
    return {'id': q_id, 'text': text, 'ground_truth_temporal': ground_truth_temporal}


# Example usage (assuming dummy files exist)
# documents_data = load_json_dataset('path/to/your/documents.json')
# questions_data = load_json_dataset('path/to/your/questions.json')

# processed_documents = [preprocess_document(d) for d in documents_data]
# processed_questions = [preprocess_question(q) for q in questions_data]

ModuleNotFoundError: No module named 'torch'

3. Modular Component: Document Encoder & Vector Store


In [None]:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

class DocumentEncoder:
    def __init__(self, model_name: str = 'sentence-transformers/all-MiniLM-L6-v2'):
        """
        Initializes the document encoder using a SentenceTransformer model.
        """
        self.model = SentenceTransformer(model_name)
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model.to(self.device)
        print(f"DocumentEncoder loaded model {model_name} on {self.device}")

    def encode(self, texts: List[str]) -> np.ndarray:
        """
        Encodes a list of texts into embeddings.
        """
        embeddings = self.model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
        return embeddings

class VectorStore:
    def __init__(self, embedding_dim: int):
        """
        Initializes a FAISS index for document embeddings.
        """
        self.index = faiss.IndexFlatL2(embedding_dim) # Using L2 distance for similarity search
        self.doc_ids = []
        self.metadata = []
        print(f"VectorStore initialized with embedding dimension: {embedding_dim}")

    def add_documents(self, embeddings: np.ndarray, doc_ids: List[str], metadata: List[Dict]):
        """
        Adds document embeddings, IDs, and metadata to the FAISS index.
        """
        if embeddings.shape[0] != len(doc_ids) or embeddings.shape[0] != len(metadata):
            raise ValueError("Mismatched dimensions for embeddings, doc_ids, and metadata.")
        self.index.add(embeddings)
        self.doc_ids.extend(doc_ids)
        self.metadata.extend(metadata)
        print(f"Added {embeddings.shape[0]} documents to VectorStore.")

    def search(self, query_embedding: np.ndarray, k: int = 5) -> List[Tuple[str, float, Dict]]:
        """
        Searches the vector store for the top-k nearest neighbors.
        Returns a list of tuples: (doc_id, similarity_score, metadata).
        """
        if query_embedding.ndim == 1:
            query_embedding = query_embedding.reshape(1, -1) # Ensure 2D for FAISS search

        distances, indices = self.index.search(query_embedding, k)
        
        results = []
        for i, dist in zip(indices[0], distances[0]):
            if i == -1: # No more results
                continue
            doc_id = self.doc_ids[i]
            doc_meta = self.metadata[i]
            # FAISS returns L2 distance, convert to similarity if needed (e.g., 1 - normalized_l2_distance)
            # For simplicity, we'll return the raw distance for now, smaller is better.
            results.append((doc_id, dist, doc_meta)) 
        return results

    def save_index(self, path: str):
        faiss.write_index(self.index, path)
        # You'll also need to save doc_ids and metadata separately
        with open(f"{path}.doc_ids.json", 'w') as f:
            json.dump(self.doc_ids, f)
        with open(f"{path}.metadata.json", 'w') as f:
            json.dump(self.metadata, f)
        print(f"VectorStore index saved to {path}")

    def load_index(self, path: str):
        self.index = faiss.read_index(path)
        with open(f"{path}.doc_ids.json", 'r') as f:
            self.doc_ids = json.load(f)
        with open(f"{path}.metadata.json", 'r') as f:
            self.metadata = json.load(f)
        print(f"VectorStore index loaded from {path}")


# --- Script to build initial document index (conceptual) ---
# This script would be run once to prepare your document embeddings
def build_document_index(documents: List[Dict], output_dir: str = 'index_data'):
    os.makedirs(output_dir, exist_ok=True)
    
    doc_encoder = DocumentEncoder()
    texts = [doc['text'] for doc in documents]
    doc_ids = [doc['id'] for doc in documents]
    metadata = [{'timestamp': doc['timestamp'], 'text': doc['text']} for doc in documents] # Store full text for retrieval later

    embeddings = doc_encoder.encode(texts)
    
    # Assuming all-MiniLM-L6-v2 produces 384-dim embeddings
    vector_store = VectorStore(embedding_dim=embeddings.shape[1]) 
    vector_store.add_documents(embeddings, doc_ids, metadata)
    
    index_path = os.path.join(output_dir, 'document_index.faiss')
    vector_store.save_index(index_path)
    print("Document index built and saved.")

# Example usage:
# Assuming 'processed_documents' from Phase 2
# build_document_index(processed_documents)

4. Modular Component: Time-Aware Module


In [None]:
import re
from datetime import datetime

class TimeAwareModule:
    def __init__(self):
        # Basic regex patterns for temporal cues. Expand this significantly!
        # This is a placeholder; a fine-tuned classifier would be more robust.
        self.temporal_keywords = [
            r'\bwhen\b', r'\bafter\b', r'\bbefore\b', r'\bduring\b', r'\bin\s+\d{4}\b',
            r'\b(january|february|march|april|may|june|july|august|september|october|november|december)\s+\d{4}\b',
            r'\d{2}/\d{2}/\d{4}', r'\d{4}-\d{2}-\d{2}', r'\d{4}s\b' # 2000s
        ]
        self.keyword_patterns = [re.compile(p, re.IGNORECASE) for p in self.temporal_keywords]
        print("TimeAwareModule initialized with basic temporal keyword patterns.")

    def get_temporal_score(self, text: str) -> float:
        """
        Calculates a simple temporal score based on keyword presence.
        A more sophisticated model would use a trained classifier.
        """
        score = 0
        for pattern in self.keyword_patterns:
            if pattern.search(text):
                score += 1 # Increment for each pattern found
        return score / len(self.keyword_patterns) # Normalize between 0 and 1

    def is_temporal_query(self, query_text: str, threshold: float = 0.1) -> bool:
        """
        Determines if a query is temporal based on its temporal score.
        """
        score = self.get_temporal_score(query_text)
        return score > threshold

    def get_temporal_relevance_from_timestamps(self, query_timestamp_str: str, doc_timestamp_str: str) -> float:
        """
        Calculates temporal relevance based on the time difference between query and document timestamps.
        Inspired by TempRALM. Assumes YYYY-MM-DD or YYYY-MM format.
        Returns a score where 1.0 is perfect match, decreasing with difference.
        """
        try:
            # Attempt to parse as full date first, then as year-month
            query_time = self._parse_timestamp(query_timestamp_str)
            doc_time = self._parse_timestamp(doc_timestamp_str)

            if query_time is None or doc_time is None:
                return 0.0 # Cannot parse timestamps, no temporal relevance from this method

            time_diff = abs((query_time - doc_time).days) # Difference in days
            # You'll need to define a decay function. This is a simple inverse linear decay.
            # Adjust the 'scale_factor' based on your data and desired sensitivity.
            scale_factor = 365 * 5 # E.g., significant decay after 5 years
            relevance = max(0.0, 1.0 - (time_diff / scale_factor))
            return relevance
        except Exception as e:
            # print(f"Error parsing timestamps for temporal relevance: {e}")
            return 0.0

    def _parse_timestamp(self, ts_str: str):
        if not ts_str:
            return None
        try:
            return datetime.strptime(ts_str, '%Y-%m-%d')
        except ValueError:
            try:
                return datetime.strptime(ts_str, '%Y-%m') # For year-month only
            except ValueError:
                try:
                    return datetime.strptime(ts_str, '%Y') # For year only
                except ValueError:
                    return None # Unable to parse

# --- Standalone Test Script for Time-Aware Module ---
# Save this in a separate file like test_time_aware_module.py
# Example usage:
if __name__ == '__main__':
    module = TimeAwareModule()

    # Test get_temporal_score
    print("\n--- Testing Temporal Score ---")
    print(f"'What happened in 2008?' Score: {module.get_temporal_score('What happened in 2008?')}")
    print(f"'Who is the president?' Score: {module.get_temporal_score('Who is the president?')}")
    print(f"'Events after March 2020?' Score: {module.get_temporal_score('Events after March 2020?')}")
    
    # Test is_temporal_query
    print("\n--- Testing Temporal Query Classification ---")
    print(f"'What happened in 2008?' Is temporal: {module.is_temporal_query('What happened in 2008?')}")
    print(f"'Who is the president?' Is temporal: {module.is_temporal_query('Who is the president?')}")
    print(f"'Events after March 2020?' Is temporal: {module.is_temporal_query('Events after March 2020?', threshold=0.2)}")

    # Test get_temporal_relevance_from_timestamps
    print("\n--- Testing Timestamp Relevance ---")
    print(f"Query 2020-01-15, Doc 2020-01-20: {module.get_temporal_relevance_from_timestamps('2020-01-15', '2020-01-20')}")
    print(f"Query 2020-01-15, Doc 2021-01-15: {module.get_temporal_relevance_from_timestamps('2020-01-15', '2021-01-15')}")
    print(f"Query 2010, Doc 2020: {module.get_temporal_relevance_from_timestamps('2010', '2020')}")
    print(f"Query no-date, Doc 2020: {module.get_temporal_relevance_from_timestamps('', '2020')}")

    # For robust evaluation, you'd load a test dataset with ground truth temporal labels
    # Example (conceptual):
    # test_questions = load_json_dataset('path/to/temporal_test_questions.json')
    # correct_predictions = 0
    # for q_data in test_questions:
    #     question_text = q_data['text']
    #     ground_truth = q_data['ground_truth_temporal'] # True/False
    #     prediction = module.is_temporal_query(question_text)
    #     if prediction == ground_truth:
    #         correct_predictions += 1
    # accuracy = correct_predictions / len(test_questions)
    # print(f"\nTimeAwareModule Classification Accuracy: {accuracy*100:.2f}%")

5. Modular Component: Query Encoders & Query Router


In [None]:
from transformers import AutoModel, AutoTokenizer # For Contriever or other HuggingFace models

class NormalQueryEncoder:
    def __init__(self, model_name: str = 'sentence-transformers/all-MiniLM-L6-v2'):
        self.model = SentenceTransformer(model_name)
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model.to(self.device)
        print(f"NormalQueryEncoder loaded model {model_name} on {self.device}")

    def encode(self, query_text: str) -> np.ndarray:
        embedding = self.model.encode(query_text, convert_to_numpy=True)
        return embedding

class TemporalQueryEncoder:
    def __init__(self, model_path: str = 'path/to/your/fine_tuned_contriever.bin'):
        """
        Initializes the temporal query encoder using your fine-tuned Contriever model.
        Assumes model_path points to the PyTorch .bin file for Contriever weights.
        You might need to adjust this based on the exact Contriever implementation.
        """
        # Load Contriever model (assuming it's a HuggingFace-compatible format or requires custom loading)
        # Placeholder: This might need adjustment based on how 'path/to/your/fine_tuned_contriever.bin' is structured.
        # Often, fine-tuned models can be loaded via AutoModel.from_pretrained if they have a config.json.
        try:
            self.tokenizer = AutoTokenizer.from_pretrained('facebook/contriever') # Or relevant Contriever tokenizer
            self.model = AutoModel.from_pretrained('facebook/contriever') # Load base Contriever architecture
            # Then load the state_dict from your .bin file
            state_dict = torch.load(model_path, map_location=torch.device('cpu')) # Load to CPU first
            self.model.load_state_dict(state_dict)
            self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
            self.model.to(self.device)
            self.model.eval() # Set to evaluation mode
            print(f"TemporalQueryEncoder loaded fine-tuned Contriever from {model_path} on {self.device}")
        except Exception as e:
            print(f"Error loading fine-tuned Contriever model: {e}")
            print("Please ensure 'model_path' is correct and compatible with AutoModel.from_pretrained or provide custom loading logic.")
            self.model = None # Indicate failure to load

    def encode(self, query_text: str) -> np.ndarray:
        if self.model is None:
            raise RuntimeError("TemporalQueryEncoder model not loaded.")
        inputs = self.tokenizer(query_text, return_tensors='pt', padding=True, truncation=True).to(self.device)
        with torch.no_grad():
            outputs = self.model(**inputs)
        # Contriever typically uses the mean of the last hidden state as embedding
        embedding = outputs.last_hidden_state.mean(dim=1).cpu().numpy().flatten()
        return embedding

class QueryRouter:
    def __init__(self, time_aware_module: TimeAwareModule, 
                 normal_encoder: NormalQueryEncoder, 
                 temporal_encoder: TemporalQueryEncoder,
                 temporal_threshold: float = 0.1):
        self.time_aware_module = time_aware_module
        self.normal_encoder = normal_encoder
        self.temporal_encoder = temporal_encoder
        self.temporal_threshold = temporal_threshold
        print("QueryRouter initialized.")

    def route_and_encode(self, query_text: str) -> np.ndarray:
        """
        Routes the query to the appropriate encoder based on temporal analysis.
        Returns the encoded query embedding.
        """
        if self.time_aware_module.is_temporal_query(query_text, self.temporal_threshold):
            print(f"Query '{query_text}' identified as temporal. Using TemporalQueryEncoder.")
            return self.temporal_encoder.encode(query_text)
        else:
            print(f"Query '{query_text}' identified as non-temporal. Using NormalQueryEncoder.")
            return self.normal_encoder.encode(query_text)

# Example usage (conceptual):
# time_aware = TimeAwareModule()
# normal_enc = NormalQueryEncoder()
# temporal_enc = TemporalQueryEncoder(model_path='path/to/your/fine_tuned_contriever.bin') # REPLACE WITH ACTUAL PATH
# router = QueryRouter(time_aware, normal_enc, temporal_enc)

# query_embedding_temporal = router.route_and_encode("What happened in the stock market after 2008?")
# query_embedding_normal = router.route_and_encode("What is the capital of France?")

6. Modular Component: Large Language Model (LLM) Wrapper


In [None]:
from transformers import pipeline

class LLMGenerator:
    def __init__(self, model_name: str = "distilbert/distilgpt2", device: str = None):
        """
        Initializes the LLM for answer generation.
        You can replace "distilbert/distilgpt2" with other models like 'meta-llama/Llama-2-7b-chat-hf',
        'google/flan-t5-large', etc.
        Note: Larger models require more resources.
        """
        self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu')
        try:
            # Using a text-generation pipeline for simplicity. Adjust for chat models.
            self.generator = pipeline("text-generation", model=model_name, device=self.device)
            print(f"LLMGenerator loaded model {model_name} on {self.device}")
        except Exception as e:
            print(f"Error loading LLM model {model_name}: {e}")
            print("Please ensure the model name is correct and you have enough resources.")
            self.generator = None

    def generate_answer(self, question: str, context: List[str], max_length: int = 200) -> str:
        """
        Generates an answer based on the question and provided context.
        """
        if self.generator is None:
            return "Error: LLM model not loaded."

        # Combine question and context into a prompt.
        # This prompt format is crucial for RAG performance and can be fine-tuned.
        context_str = "\n".join(context)
        prompt = f"Context: {context_str}\nQuestion: {question}\nAnswer:"

        # Generate text. You might need to adjust generation parameters.
        try:
            response = self.generator(prompt, max_new_tokens=max_length, do_sample=False,
                                      num_return_sequences=1, return_full_text=False)
            if response and len(response) > 0:
                answer = response[0]['generated_text'].strip()
                # Basic post-processing to remove potential prompt repetition or incomplete sentences
                if "Question:" in answer:
                    answer = answer.split("Question:")[0].strip()
                if "Answer:" in answer: # In case the model generates "Answer:" itself
                    answer = answer.split("Answer:", 1)[-1].strip()
                return answer
            return "No answer generated."
        except Exception as e:
            return f"Error during answer generation: {e}"

7. Simple RAG Baseline Implementation


In [None]:
class SimpleRAG:
    def __init__(self, document_encoder: DocumentEncoder, vector_store: VectorStore, llm_generator: LLMGenerator):
        """
        Initializes the Simple RAG system.
        """
        self.document_encoder = document_encoder
        self.vector_store = vector_store
        self.llm_generator = llm_generator
        print("SimpleRAG system initialized.")

    def answer_question(self, query_text: str, k: int = 5) -> str:
        """
        Answers a question using a simple RAG approach.
        1. Encodes the query.
        2. Retrieves top-k documents.
        3. Generates an answer using the LLM based on retrieved context.
        """
        # 1. Encode the query
        query_embedding = self.document_encoder.encode([query_text])[0] # [0] because encode returns a batch

        # 2. Retrieve top-k documents
        retrieved_results = self.vector_store.search(query_embedding, k=k)
        
        # Extract relevant content from retrieved documents
        # Note: 'text' was stored in metadata during index building
        contexts = [result[2]['text'] for result in retrieved_results] # result[2] is metadata dict

        if not contexts:
            return "No relevant documents found."

        # 3. Generate answer using LLM
        answer = self.llm_generator.generate_answer(question=query_text, context=contexts)
        return answer

8. Full Proposed Pipeline Implementation (TemporalRAGPipeline)


In [None]:
from datetime import datetime

class TemporalRAGPipeline:
    def __init__(self, time_aware_module: TimeAwareModule, 
                 query_router: QueryRouter, 
                 vector_store: VectorStore, 
                 llm_generator: LLMGenerator,
                 re_ranking_weights: Dict[str, float] = None):
        """
        Initializes the full Temporal RAG Pipeline.
        re_ranking_weights: Dictionary for combining scores, e.g., {'semantic': 0.5, 'temporal_metadata': 0.3, 'temporal_content': 0.2}
        """
        self.time_aware_module = time_aware_module
        self.query_router = query_router
        self.vector_store = vector_store
        self.llm_generator = llm_generator
        self.re_ranking_weights = re_ranking_weights if re_ranking_weights else {
            'semantic': 0.5,
            'temporal_metadata': 0.3,
            'temporal_content': 0.2
        }
        # Normalize weights to sum to 1.0
        total_weight = sum(self.re_ranking_weights.values())
        if total_weight > 0:
            self.re_ranking_weights = {k: v / total_weight for k, v in self.re_ranking_weights.items()}
        print(f"TemporalRAGPipeline initialized with re-ranking weights: {self.re_ranking_weights}")

    def answer_question(self, query_text: str, k_retrieve: int = 10, k_rerank: int = 5) -> str:
        """
        Answers a question using the full Temporal RAG pipeline.
        1. Routes and encodes the query.
        2. Retrieves top-k_retrieve documents.
        3. Re-ranks documents using semantic, temporal (metadata), and temporal (content) scores.
        4. Selects top-k_rerank documents after re-ranking.
        5. Generates an answer using the LLM based on re-ranked context.
        """
        # 1. Route and encode the query
        query_embedding = self.query_router.route_and_encode(query_text)

        # 2. Retrieve top-k_retrieve documents
        # search returns: (doc_id, distance_score_L2, metadata_dict)
        retrieved_results = self.vector_store.search(query_embedding, k=k_retrieve)

        if not retrieved_results:
            return "No documents retrieved for re-ranking."

        # 3. Re-rank documents
        scored_documents = []
        for doc_id, l2_distance, metadata in retrieved_results:
            doc_text = metadata['text']
            doc_timestamp = metadata['timestamp']

            # Calculate Semantic Score (convert L2 distance to similarity, e.g., 1 / (1 + dist))
            # Lower L2 distance means higher similarity. Max L2 could be large.
            # A common way for cosine similarity (if embeddings are normalized) is (1 - L2_dist^2/2).
            # For simplicity, let's use a normalized inverse:
            semantic_score = 1.0 / (1.0 + l2_distance) if l2_distance >= 0 else 0.0 # Ensure non-negative distance

            # Calculate Temporal Score (Metadata-based) - requires query timestamp if available
            # For now, let's assume query_text contains a parsable timestamp if it's a temporal query.
            # In a real system, you might need a separate component to extract query timestamps.
            query_timestamp = None
            if self.time_aware_module.is_temporal_query(query_text):
                # Attempt to extract a year from the query for simple timestamp matching
                year_match = re.search(r'\b\d{4}\b', query_text)
                if year_match:
                    query_timestamp = year_match.group(0) # e.g., "2008"
            
            temporal_metadata_score = self.time_aware_module.get_temporal_relevance_from_timestamps(
                query_timestamp, doc_timestamp
            )

            # Calculate Temporal Score (Content-based)
            temporal_content_score = self.time_aware_module.get_temporal_score(doc_text)
            
            # Combine scores
            final_re_ranking_score = (
                self.re_ranking_weights.get('semantic', 0) * semantic_score +
                self.re_ranking_weights.get('temporal_metadata', 0) * temporal_metadata_score +
                self.re_ranking_weights.get('temporal_content', 0) * temporal_content_score
            )
            
            scored_documents.append({'id': doc_id, 'text': doc_text, 'score': final_re_ranking_score})
        
        # Sort by final re-ranking score (highest score first)
        scored_documents.sort(key=lambda x: x['score'], reverse=True)
        
        # Select top-k_rerank documents
        top_reranked_contexts = [doc['text'] for doc in scored_documents[:k_rerank]]

        if not top_reranked_contexts:
            return "No documents left after re-ranking for answer generation."

        # 4. Generate answer using LLM
        answer = self.llm_generator.generate_answer(question=query_text, context=top_reranked_contexts)
        return answer

9. Evaluation Framework


In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from typing import Callable

class Evaluator:
    def __init__(self):
        print("Evaluator initialized.")

    def calculate_metrics(self, predictions: List[str], ground_truths: List[str], questions: List[Dict]) -> Dict:
        """
        Calculates various evaluation metrics.
        This is a conceptual implementation. For real RAG evaluation, you'd use
        more sophisticated metrics and possibly human judgment or specialized tools.
        
        For simplicity, let's assume ground_truths are direct answers and we can
        do basic string matching for initial 'correctness'.
        For hallucination/missing answers, you'd need more advanced NLP techniques
        or human annotation.
        
        questions: List of original question dictionaries, including 'ground_truth_temporal' if used.
        """
        
        results = {}
        correct_answers = 0
        for pred, gt in zip(predictions, ground_truths):
            # Simple exact match (highly stringent)
            if pred.strip().lower() == gt.strip().lower():
                correct_answers += 1
        
        results['exact_match_accuracy'] = correct_answers / len(predictions) if predictions else 0

        # Placeholder for more complex metrics (e.g., using an LLM to judge correctness,
        # or measuring hallucination/missing info).
        # For CRAG, you'd integrate their specific evaluation script.
        # results['hallucination_rate'] = ...
        # results['missing_answer_rate'] = ...
        
        print("\n--- Evaluation Results ---")
        for metric, value in results.items():
            print(f"{metric}: {value:.4f}")
        
        return results

    def evaluate_temporal_classification(self, time_aware_module: TimeAwareModule, temporal_test_questions: List[Dict]) -> Dict:
        """
        Evaluates the performance of the TimeAwareModule in classifying temporal queries.
        temporal_test_questions: List of question dicts with 'text' and 'ground_truth_temporal' (True/False).
        """
        if not temporal_test_questions:
            print("No temporal test questions provided for TimeAwareModule evaluation.")
            return {}

        true_labels = [q['ground_truth_temporal'] for q in temporal_test_questions if q['ground_truth_temporal'] is not None]
        if not true_labels:
            print("No ground truth temporal labels found in the test questions.")
            return {}

        predictions = [time_aware_module.is_temporal_query(q['text']) for q in temporal_test_questions if q['ground_truth_temporal'] is not None]

        metrics = {
            'accuracy': accuracy_score(true_labels, predictions),
            'precision': precision_score(true_labels, predictions, zero_division=0),
            'recall': recall_score(true_labels, predictions, zero_division=0),
            'f1_score': f1_score(true_labels, predictions, zero_division=0)
        }
        
        print("\n--- TimeAwareModule Temporal Classification Results ---")
        for metric, value in metrics.items():
            print(f"{metric}: {value:.4f}")
        return metrics


    def run_evaluation_suite(self, 
                             model_or_pipeline_instance: Union[SimpleRAG, TemporalRAGPipeline], 
                             test_dataset: List[Dict]) -> Dict:
        """
        Runs the full evaluation for a given model/pipeline on a test dataset.
        test_dataset: List of question dictionaries, each with 'text' and 'answer' (ground truth).
        """
        print(f"\n--- Running Evaluation for: {model_or_pipeline_instance.__class__.__name__} ---")
        predictions = []
        ground_truths = []
        
        for i, q_data in enumerate(test_dataset):
            question_text = q_data['text']
            ground_truth_answer = q_data['answer'] # Assuming 'answer' key for ground truth
            
            print(f"Processing question {i+1}/{len(test_dataset)}: {question_text[:50]}...")
            predicted_answer = model_or_pipeline_instance.answer_question(question_text)
            
            predictions.append(predicted_answer)
            ground_truths.append(ground_truth_answer)
            # print(f"Pred: {predicted_answer}\nGT: {ground_truth_answer}\n") # Uncomment for debugging

        results = self.calculate_metrics(predictions, ground_truths, test_dataset)
        return results