In [None]:
import json
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
from datetime import datetime
import re
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns

# Import our RAG system
from multilingual_rag import MultilingualRAG, DocumentChunk

@dataclass
class EvaluationResult:
    """Stores evaluation results for a single query"""
    query: str
    expected_answer: str
    generated_answer: str
    retrieved_chunks: List[Tuple[DocumentChunk, float]]
    groundedness_score: float
    relevance_score: float
    answer_similarity: float
    retrieval_precision: float
    response_time: float
    language: str

class RAGEvaluator:
    """Comprehensive RAG evaluation system"""

    def __init__(self, rag_system: MultilingualRAG):
        self.rag = rag_system
        self.similarity_model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
        self.evaluation_results: List[EvaluationResult] = []

    def load_test_dataset(self, dataset_path: str = None) -> List[Dict[str, Any]]:
        """Load test dataset with ground truth answers"""

        # Default test cases based on your requirements
        default_test_cases = [
            {
                "query": "অনুপমের ভাষায় সুপুরুষ কাকে বলা হয়েছে?",
                "expected_answer": "শুম্ভুনাথ",
                "category": "character_identification",
                "language": "bn"
            },
            {
                "query": "কাকে অনুপমের ভাগ্য দেবতা বলে উল্লেখ করা হয়েছে?",
                "expected_answer": "মামাকে",
                "category": "character_relationship",
                "language": "bn"
            },
            {
                "query": "বিয়ের সময় কল্যাণীর প্রকৃত বয়স কত ছিল?",
                "expected_answer": "১৫ বছর",
                "category": "factual_information",
                "language": "bn"
            },
            {
                "query": "Who is described as a handsome man in Anupam's words?",
                "expected_answer": "Shumbhunath",
                "category": "character_identification",
                "language": "en"
            },
            {
                "query": "What was Kalyani's actual age at the time of marriage?",
                "expected_answer": "15 years",
                "category": "factual_information",
                "language": "en"
            },
            {
                "query": "অনুপম কার সাথে বিয়ে করেছিল?",
                "expected_answer": "কল্যাণী",
                "category": "plot_detail",
                "language": "bn"
            },
            {
                "query": "গল্পের মূল চরিত্র কে?",
                "expected_answer": "অনুপম",
                "category": "main_character",
                "language": "bn"
            },
            {
                "query": "অনুপমের মামা কী ধরনের মানুষ ছিলেন?",
                "expected_answer": "প্রভাবশালী এবং বিত্তবান",
                "category": "character_description",
                "language": "bn"
            },
            {
                "query": "কল্যাণীর বিয়েতে কোন সমস্যা হয়েছিল?",
                "expected_answer": "বয়সের মিথ্যা তথ্য",
                "category": "plot_conflict",
                "language": "bn"
            },
            {
                "query": "What is the main conflict in the story?",
                "expected_answer": "Age deception in marriage",
                "category": "plot_conflict",
                "language": "en"
            }
        ]

        if dataset_path:
            try:
                with open(dataset_path, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except FileNotFoundError:
                print(f"Dataset file {dataset_path} not found. Using default test cases.")

        return default_test_cases

    def evaluate_groundedness(self, answer: str, retrieved_chunks: List[Tuple[DocumentChunk, float]]) -> float:
        """
        Evaluate if the answer is grounded in the retrieved context
        Returns score between 0 and 1
        """
        if not retrieved_chunks or not answer.strip():
            return 0.0

        # Combine all retrieved context
        context = " ".join([chunk.text for chunk, _ in retrieved_chunks])

        # Encode answer and context
        answer_embedding = self.similarity_model.encode([answer])
        context_embedding = self.similarity_model.encode([context])

        # Calculate similarity
        similarity = cosine_similarity(answer_embedding, context_embedding)[0][0]

        # Additional checks for direct content overlap
        answer_words = set(answer.lower().split())
        context_words = set(context.lower().split())

        # Word overlap ratio
        if len(answer_words) > 0:
            word_overlap = len(answer_words.intersection(context_words)) / len(answer_words)
        else:
            word_overlap = 0.0

        # Combine semantic similarity and word overlap
        groundedness_score = 0.7 * similarity + 0.3 * word_overlap

        return min(groundedness_score, 1.0)

    def evaluate_relevance(self, query: str, retrieved_chunks: List[Tuple[DocumentChunk, float]]) -> float:
        """
        Evaluate if retrieved documents are relevant to the query
        Returns average relevance score
        """
        if not retrieved_chunks:
            return 0.0

        query_embedding = self.similarity_model.encode([query])
        relevance_scores = []

        for chunk, retrieval_score in retrieved_chunks:
            # Use the retrieval score as primary relevance indicator
            relevance_scores.append(retrieval_score)

        return np.mean(relevance_scores)

    def evaluate_answer_similarity(self, expected_answer: str, generated_answer: str) -> float:
        """
        Evaluate similarity between expected and generated answers
        """
        if not expected_answer.strip() or not generated_answer.strip():
            return 0.0

        # Exact match check (case insensitive)
        if expected_answer.lower().strip() in generated_answer.lower().strip():
            return 1.0

        # Semantic similarity
        expected_embedding = self.similarity_model.encode([expected_answer])
        generated_embedding = self.similarity_model.encode([generated_answer])

        similarity = cosine_similarity(expected_embedding, generated_embedding)[0][0]

        return similarity

    def evaluate_retrieval_precision(self, query: str, retrieved_chunks: List[Tuple[DocumentChunk, float]],
                                   expected_answer: str, threshold: float = 0.3) -> float:
        """
        Evaluate precision of retrieval (how many retrieved chunks are actually relevant)
        """
        if not retrieved_chunks:
            return 0.0

        relevant_count = 0
        expected_embedding = self.similarity_model.encode([expected_answer])

        for chunk, score in retrieved_chunks:
            chunk_embedding = self.similarity_model.encode([chunk.text])
            chunk_relevance = cosine_similarity(expected_embedding, chunk_embedding)[0][0]

            if chunk_relevance >= threshold or score >= threshold:
                relevant_count += 1

        return relevant_count / len(retrieved_chunks)

    def evaluate_single_query(self, test_case: Dict[str, Any]) -> EvaluationResult:
        """Evaluate a single query"""

        query = test_case["query"]
        expected_answer = test_case["expected_answer"]

        # Measure response time
        start_time = datetime.now()

        # Retrieve chunks
        retrieved_chunks = self.rag.retrieve_relevant_chunks(query, k=5)

        # Generate answer
        generated_answer = self.rag.chat(query)

        end_time = datetime.now()
        response_time = (end_time - start_time).total_seconds()

        # Calculate metrics
        groundedness_score = self.evaluate_groundedness(generated_answer, retrieved_chunks)
        relevance_score = self.evaluate_relevance(query, retrieved_chunks)
        answer_similarity = self.evaluate_answer_similarity(expected_answer, generated_answer)
        retrieval_precision = self.evaluate_retrieval_precision(query, retrieved_chunks, expected_answer)

        return EvaluationResult(
            query=query,
            expected_answer=expected_answer,
            generated_answer=generated_answer,
            retrieved_chunks=retrieved_chunks,
            groundedness_score=groundedness_score,
            relevance_score=relevance_score,
            answer_similarity=answer_similarity,
            retrieval_precision=retrieval_precision,
            response_time=response_time,
            language=test_case.get("language", "unknown")
        )

    def run_evaluation(self, dataset_path: str = None) -> Dict[str, Any]:
        """Run complete evaluation on test dataset"""

        print("Starting RAG System Evaluation...")
        print("=" * 50)

        # Load test dataset
        test_cases = self.load_test_dataset(dataset_path)

        # Evaluate each test case
        results = []
        for i, test_case in enumerate(test_cases, 1):
            print(f"Evaluating {i}/{len(test_cases)}: {test_case['query'][:50]}...")

            result = self.evaluate_single_query(test_case)
            results.append(result)

            # Print individual result
            print(f"  Expected: {result.expected_answer}")
            print(f"  Generated: {result.generated_answer}")
            print(f"  Answer Similarity: {result.answer_similarity:.3f}")
            print(f"  Groundedness: {result.groundedness_score:.3f}")
            print(f"  Relevance: {result.relevance_score:.3f}")
            print(f"  Response Time: {result.response_time:.3f}s")
            print("-" * 30)

        self.evaluation_results = results

        # Calculate aggregate metrics
        aggregate_metrics = self.calculate_aggregate_metrics(results)

        # Generate detailed report
        report = self.generate_evaluation_report(results, aggregate_metrics)

        return report

    def calculate_aggregate_metrics(self, results: List[EvaluationResult]) -> Dict[str, float]:
        """Calculate aggregate metrics across all test cases"""

        if not results:
            return {}

        metrics = {
            "avg_groundedness": np.mean([r.groundedness_score for r in results]),
            "avg_relevance": np.mean([r.relevance_score for r in results]),
            "avg_answer_similarity": np.mean([r.answer_similarity for r in results]),
            "avg_retrieval_precision": np.mean([r.retrieval_precision for r in results]),
            "avg_response_time": np.mean([r.response_time for r in results]),
            "exact_match_rate": sum(1 for r in results if r.answer_similarity >= 0.9) / len(results),
            "good_groundedness_rate": sum(1 for r in results if r.groundedness_score >= 0.7) / len(results),
            "good_relevance_rate": sum(1 for r in results if r.relevance_score >= 0.5) / len(results)
        }

        return metrics

    def generate_evaluation_report(self, results: List[EvaluationResult],
                                 aggregate_metrics: Dict[str, float]) -> Dict[str, Any]:
        """Generate comprehensive evaluation report"""

        report = {
            "evaluation_summary": {
                "total_test_cases": len(results),
                "evaluation_date": datetime.now().isoformat(),
                "aggregate_metrics": aggregate_metrics
            },
            "detailed_results": [],
            "language_breakdown": {},
            "performance_analysis": {}
        }

        # Detailed results
        for result in results:
            report["detailed_results"].append({
                "query": result.query,
                "expected_answer": result.expected_answer,
                "generated_answer": result.generated_answer,
                "metrics": {
                    "groundedness_score": result.groundedness_score,
                    "relevance_score": result.relevance_score,
                    "answer_similarity": result.answer_similarity,
                    "retrieval_precision": result.retrieval_precision,
                    "response_time": result.response_time
                },
                "language": result.language,
                "retrieved_chunks_count": len(result.retrieved_chunks)
            })

        # Language breakdown
        language_groups = {}
        for result in results:
            lang = result.language
            if lang not in language_groups:
                language_groups[lang] = []
            language_groups[lang].append(result)

        for lang, lang_results in language_groups.items():
            report["language_breakdown"][lang] = {
                "count": len(lang_results),
                "avg_groundedness": np.mean([r.groundedness_score for r in lang_results]),
                "avg_relevance": np.mean([r.relevance_score for r in lang_results]),
                "avg_answer_similarity": np.mean([r.answer_similarity for r in lang_results]),
                "avg_response_time": np.mean([r.response_time for r in lang_results])
            }

        # Performance analysis
        report["performance_analysis"] = {
            "best_performing_queries": sorted(
                [(r.query, r.answer_similarity) for r in results],
                key=lambda x: x[1], reverse=True
            )[:3],
            "worst_performing_queries": sorted(
                [(r.query, r.answer_similarity) for r in results],
                key=lambda x: x[1]
            )[:3],
            "response_time_distribution": {
                "min": min(r.response_time for r in results),
                "max": max(r.response_time for r in results),
                "median": np.median([r.response_time for r in results])
            }
        }

        return report

    def save_evaluation_report(self, report: Dict[str, Any], filename: str = None):
        """Save evaluation report to JSON file"""

        if filename is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"rag_evaluation_report_{timestamp}.json"

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)

        print(f"Evaluation report saved to {filename}")

    def visualize_results(self, save_plots: bool = True):
        """Generate visualization plots for evaluation results"""

        if not self.evaluation_results:
            print("No evaluation results to visualize. Run evaluation first.")
            return

        # Create subplots
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        fig.suptitle('RAG System Evaluation Results', fontsize=16)

        # Extract data
        results = self.evaluation_results
        metrics = ['groundedness_score', 'relevance_score', 'answer_similarity',
                  'retrieval_precision', 'response_time']

        # 1. Overall metrics bar chart
        avg_scores = [np.mean([getattr(r, metric) for r in results]) for metric in metrics]
        axes[0, 0].bar(metrics, avg_scores, color=['skyblue', 'lightgreen', 'salmon', 'gold', 'plum'])
        axes[0, 0].set_title('Average Metrics')
        axes[0, 0].set_ylabel('Score')
        axes[0, 0].tick_params(axis='x', rotation=45)

        # 2. Language comparison
        languages = list(set(r.language for r in results))
        if len(languages) > 1:
            lang_data = {}
            for lang in languages:
                lang_results = [r for r in results if r.language == lang]
                lang_data[lang] = np.mean([r.answer_similarity for r in lang_results])

            axes[0, 1].bar(lang_data.keys(), lang_data.values(), color=['coral', 'lightblue'])
            axes[0, 1].set_title('Answer Similarity by Language')
            axes[0, 1].set_ylabel('Average Similarity Score')

        # 3. Response time distribution
        response_times = [r.response_time for r in results]
        axes[0, 2].hist(response_times, bins=10, color='lightgreen', alpha=0.7)
        axes[0, 2].set_title('Response Time Distribution')
        axes[0, 2].set_xlabel('Response Time (seconds)')
        axes[0, 2].set_ylabel('Frequency')

        # 4. Groundedness vs Relevance scatter
        groundedness = [r.groundedness_score for r in results]
        relevance = [r.relevance_score for r in results]
        axes[1, 0].scatter(groundedness, relevance, alpha=0.6, color='purple')
        axes[1, 0].set_xlabel('Groundedness Score')
        axes[1, 0].set_ylabel('Relevance Score')
        axes[1, 0].set_title('Groundedness vs Relevance')

        # 5. Answer similarity distribution
        similarities = [r.answer_similarity for r in results]
        axes[1, 1].hist(similarities, bins=10, color='orange', alpha=0.7)
        axes[1, 1].set_title('Answer Similarity Distribution')
        axes[1, 1].set_xlabel('Similarity Score')
        axes[1, 1].set_ylabel('Frequency')

        # 6. Performance by query category (if available)
        # This requires category information in test cases
        categories = {}
        for i, result in enumerate(results):
            # Try to extract category from test case if available
            category = "General"  # Default category
            categories[category] = categories.get(category, []) + [result.answer_similarity]

        if len(categories) > 1:
            cat_names = list(categories.keys())
            cat_scores = [np.mean(scores) for scores in categories.values()]
            axes[1, 2].bar(cat_names, cat_scores, color='lightcoral')
            axes[1, 2].set_title('Performance by Category')
            axes[1, 2].set_ylabel('Average Similarity Score')
            axes[1, 2].tick_params(axis='x', rotation=45)
        else:
            # Show overall performance summary
            summary_data = {
                'Excellent (>0.9)': sum(1 for s in similarities if s > 0.9),
                'Good (0.7-0.9)': sum(1 for s in similarities if 0.7 <= s <= 0.9),
                'Fair (0.5-0.7)': sum(1 for s in similarities if 0.5 <= s < 0.7),
                'Poor (<0.5)': sum(1 for s in similarities if s < 0.5)
            }
            axes[1, 2].pie(summary_data.values(), labels=summary_data.keys(), autopct='%1.1f%%')
            axes[1, 2].set_title('Performance Distribution')

        plt.tight_layout()

        if save_plots:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            plt.savefig(f"rag_evaluation_plots_{timestamp}.png", dpi=300, bbox_inches='tight')
            print(f"Plots saved to rag_evaluation_plots_{timestamp}.png")

        plt.show()

    def generate_human_evaluation_template(self, sample_size: int = 5) -> Dict[str, Any]:
        """Generate template for human evaluation"""

        if not self.evaluation_results:
            print("No evaluation results available. Run evaluation first.")
            return {}

        # Select sample for human evaluation
        sample_results = np.random.choice(self.evaluation_results,
                                        min(sample_size, len(self.evaluation_results)),
                                        replace=False)

        template = {
            "instructions": {
                "groundedness": "Rate how well the answer is supported by the provided context (1-5 scale)",
                "relevance": "Rate how relevant the retrieved information is to the question (1-5 scale)",
                "accuracy": "Rate the factual accuracy of the answer (1-5 scale)",
                "fluency": "Rate the language fluency and readability (1-5 scale)"
            },
            "evaluation_samples": []
        }

        for i, result in enumerate(sample_results):
            context = "\n".join([chunk.text[:200] + "..." for chunk, _ in result.retrieved_chunks[:3]])

            template["evaluation_samples"].append({
                "sample_id": i + 1,
                "query": result.query,
                "generated_answer": result.generated_answer,
                "expected_answer": result.expected_answer,
                "retrieved_context": context,
                "system_scores": {
                    "groundedness": result.groundedness_score,
                    "relevance": result.relevance_score,
                    "answer_similarity": result.answer_similarity
                },
                "human_evaluation": {
                    "groundedness": None,
                    "relevance": None,
                    "accuracy": None,
                    "fluency": None,
                    "comments": ""
                }
            })

        return template

    def compare_with_baseline(self, baseline_results: List[EvaluationResult]) -> Dict[str, Any]:
        """Compare current results with baseline results"""

        if not self.evaluation_results or not baseline_results:
            return {"error": "Missing evaluation results for comparison"}

        current_metrics = self.calculate_aggregate_metrics(self.evaluation_results)
        baseline_metrics = self.calculate_aggregate_metrics(baseline_results)

        comparison = {
            "current_performance": current_metrics,
            "baseline_performance": baseline_metrics,
            "improvements": {},
            "regressions": {}
        }

        for metric in current_metrics:
            if metric in baseline_metrics:
                diff = current_metrics[metric] - baseline_metrics[metric]
                if diff > 0:
                    comparison["improvements"][metric] = {
                        "improvement": diff,
                        "percentage_change": (diff / baseline_metrics[metric]) * 100
                    }
                elif diff < 0:
                    comparison["regressions"][metric] = {
                        "regression": abs(diff),
                        "percentage_change": (diff / baseline_metrics[metric]) * 100
                    }

        return comparison

class BenchmarkRunner:
    """Run standardized benchmarks for RAG systems"""

    def __init__(self, rag_system: MultilingualRAG):
        self.rag = rag_system
        self.evaluator = RAGEvaluator(rag_system)

    def run_multilingual_benchmark(self) -> Dict[str, Any]:
        """Run comprehensive multilingual benchmark"""

        print("Running Multilingual RAG Benchmark...")

        # Bengali literature specific test cases
        bengali_tests = [
            {
                "query": "রবীন্দ্রনাথ ঠাকুরের কোন উপন্যাসে গোরা চরিত্রটি আছে?",
                "expected_answer": "গোরা",
                "category": "literature_knowledge",
                "language": "bn"
            },
            {
                "query": "বাংলা সাহিত্যের আধুনিক যুগের প্রবর্তক কে?",
                "expected_answer": "মাইকেল মধুসূদন দত্ত",
                "category": "literature_history",
                "language": "bn"
            }
        ]

        # English equivalent tests
        english_tests = [
            {
                "query": "Who is considered the pioneer of modern Bengali literature?",
                "expected_answer": "Michael Madhusudan Dutt",
                "category": "literature_history",
                "language": "en"
            }
        ]

        # Combine all tests
        all_tests = bengali_tests + english_tests

        # Run evaluation
        results = []
        for test in all_tests:
            result = self.evaluator.evaluate_single_query(test)
            results.append(result)

        # Calculate metrics
        benchmark_report = {
            "benchmark_name": "Multilingual Bengali Literature RAG",
            "version": "1.0",
            "results": results,
            "aggregate_metrics": self.evaluator.calculate_aggregate_metrics(results),
            "language_specific_performance": {}
        }

        # Language-specific analysis
        for lang in ['bn', 'en']:
            lang_results = [r for r in results if r.language == lang]
            if lang_results:
                benchmark_report["language_specific_performance"][lang] = \
                    self.evaluator.calculate_aggregate_metrics(lang_results)

        return benchmark_report

# Example usage and testing
def main():
    """Example usage of the RAG evaluation system"""

    # Initialize RAG system (make sure it's built with knowledge base)
    rag = MultilingualRAG()

    # Initialize evaluator
    evaluator = RAGEvaluator(rag)

    # Run evaluation
    print("Running RAG System Evaluation...")
    evaluation_report = evaluator.run_evaluation()

    # Save report
    evaluator.save_evaluation_report(evaluation_report)

    # Generate visualizations
    evaluator.visualize_results()

    # Generate human evaluation template
    human_eval_template = evaluator.generate_human_evaluation_template(sample_size=3)

    with open("human_evaluation_template.json", 'w', encoding='utf-8') as f:
        json.dump(human_eval_template, f, indent=2, ensure_ascii=False)

    print("Human evaluation template saved to human_evaluation_template.json")

    # Print summary
    print("\n" + "="*50)
    print("EVALUATION SUMMARY")
    print("="*50)

    metrics = evaluation_report["evaluation_summary"]["aggregate_metrics"]
    print(f"Average Groundedness: {metrics['avg_groundedness']:.3f}")
    print(f"Average Relevance: {metrics['avg_relevance']:.3f}")
    print(f"Average Answer Similarity: {metrics['avg_answer_similarity']:.3f}")
    print(f"Average Response Time: {metrics['avg_response_time']:.3f}s")
    print(f"Exact Match Rate: {metrics['exact_match_rate']:.3f}")

    # Run benchmark
    benchmark_runner = BenchmarkRunner(rag)
    benchmark_results = benchmark_runner.run_multilingual_benchmark()

    print("\nBenchmark completed!")
    return evaluation_report, benchmark_results

if __name__ == "__main__":
    main()