In [8]:
%pip install -U langchain-community
%pip install -U langchain-chroma
import pandas as pd
import numpy as np
import json
import requests
import time
from typing import List, Dict, Tuple, Optional, Any
from sklearn.metrics.pairwise import cosine_similarity
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain.schema import Document
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.llms.base import LLM
import re
from dataclasses import dataclass
from datetime import datetime

@dataclass
class EvaluationResult:
    """Class to store evaluation results"""
    question: str
    answer: str
    expected_answer: str
    retrieved_contexts: List[str]
    groundedness_score: float
    relevance_score: float
    cosine_similarity_score: float
    human_judgment: Optional[str] = None
    timestamp: str = None

class HuggingFaceLLM(LLM):
    """Custom LLM wrapper for Hugging Face Inference API"""
    model_name: str = "microsoft/DialoGPT-medium"
    api_url: str = ""

    def __init__(self, model_name: str = "microsoft/DialoGPT-medium"):
        super().__init__()
        self.model_name = model_name
        self.api_url = f"https://api-inference.huggingface.co/models/{model_name}"

    def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
        """Make API call to Hugging Face"""
        api_url = "https://api-inference.huggingface.co/models/google/flan-t5-large"

        headers = {"Content-Type": "application/json"}
        payload = {
            "inputs": prompt,
            "parameters": {
                "max_new_tokens": 150,
                "temperature": 0.1,
                "return_full_text": False
            }
        }

        try:
            response = requests.post(api_url, headers=headers, json=payload)
            if response.status_code == 200:
                result = response.json()
                if isinstance(result, list) and len(result) > 0:
                    return result[0].get('generated_text', '').strip()
                elif isinstance(result, dict):
                    return result.get('generated_text', '').strip()
            else:
                return self._simple_answer_extraction(prompt)
        except Exception as e:
            print(f"API Error: {e}")
            return self._simple_answer_extraction(prompt)

    def _simple_answer_extraction(self, prompt: str) -> str:
        """Simple fallback method to extract answers from context"""
        if "Question:" in prompt and "Context:" in prompt:
            context_start = prompt.find("Context:") + 8
            question_start = prompt.find("Question:") + 9
            context = prompt[context_start:prompt.find("Question:")].strip()
            question = prompt[question_start:].strip()

            question_lower = question.lower()
            context_lines = context.split('\n')

            for line in context_lines:
                if any(word in line for word in question.split() if len(word) > 2):
                    words = line.split()
                    if len(words) > 0:
                        return line.strip()[:100]

        return "তথ্য পাওয়া যায়নি"

    @property
    def _llm_type(self) -> str:
        return "huggingface"

class RAGEvaluator:
    """Class to evaluate RAG system performance"""

    def __init__(self, embeddings_model):
        self.embeddings_model = embeddings_model
        self.evaluation_results = []

    def calculate_cosine_similarity(self, text1: str, text2: str) -> float:
        """Calculate cosine similarity between two texts"""
        try:
            # Get embeddings for both texts
            emb1 = self.embeddings_model.embed_query(text1)
            emb2 = self.embeddings_model.embed_query(text2)

            # Calculate cosine similarity
            similarity = cosine_similarity([emb1], [emb2])[0][0]
            return float(similarity)
        except Exception as e:
            print(f"Error calculating cosine similarity: {e}")
            return 0.0

    def evaluate_groundedness(self, answer: str, contexts: List[str]) -> float:
        """
        Evaluate if the answer is grounded in the retrieved contexts
        Returns a score between 0 and 1
        """
        if not answer or not contexts:
            return 0.0

        combined_context = " ".join(contexts)

        answer_words = set(answer.lower().split())
        context_words = set(combined_context.lower().split())

        overlap = len(answer_words.intersection(context_words))
        total_answer_words = len(answer_words)

        if total_answer_words == 0:
            return 0.0

        word_overlap_score = overlap / total_answer_words

        cosine_score = self.calculate_cosine_similarity(answer, combined_context)

        pattern_score = self._evaluate_bengali_patterns(answer, combined_context)

        groundedness_score = (
            0.4 * word_overlap_score +
            0.4 * cosine_score +
            0.2 * pattern_score
        )

        return min(1.0, groundedness_score)

    def _evaluate_bengali_patterns(self, answer: str, context: str) -> float:
        """Evaluate Bengali-specific patterns"""
        bengali_numbers = ['১', '২', '৩', '৪', '৫', '৬', '৭', '৮', '৯', '০']
        bengali_words = ['বছর', 'সাল', 'মাস', 'দিন']

        score = 0.0

        for num in bengali_numbers:
            if num in answer and num in context:
                score += 0.2

        for word in bengali_words:
            if word in answer and word in context:
                score += 0.1

        return min(1.0, score)

    def evaluate_relevance(self, question: str, retrieved_docs: List[Document]) -> float:
        """
        Evaluate the relevance of retrieved documents to the question
        Returns a score between 0 and 1
        """
        if not question or not retrieved_docs:
            return 0.0

        relevance_scores = []

        for doc in retrieved_docs:
            doc_relevance = self.calculate_cosine_similarity(question, doc.page_content)
            relevance_scores.append(doc_relevance)

        return np.mean(relevance_scores) if relevance_scores else 0.0

    def evaluate_answer_accuracy(self, predicted_answer: str, expected_answer: str) -> float:
        """
        Evaluate how accurately the predicted answer matches the expected answer
        """
        if not predicted_answer or not expected_answer:
            return 0.0

        cosine_score = self.calculate_cosine_similarity(predicted_answer, expected_answer)

        if expected_answer.lower() in predicted_answer.lower():
            exact_match_score = 1.0
        else:
            exact_match_score = 0.0

        accuracy_score = 0.6 * cosine_score + 0.4 * exact_match_score

        return accuracy_score

    def comprehensive_evaluation(self, question: str, answer: str, expected_answer: str,
                               retrieved_docs: List[Document]) -> EvaluationResult:
        """Perform comprehensive evaluation of a single query"""

        contexts = [doc.page_content for doc in retrieved_docs]

        groundedness_score = self.evaluate_groundedness(answer, contexts)
        relevance_score = self.evaluate_relevance(question, retrieved_docs)
        accuracy_score = self.evaluate_answer_accuracy(answer, expected_answer)

        result = EvaluationResult(
            question=question,
            answer=answer,
            expected_answer=expected_answer,
            retrieved_contexts=contexts,
            groundedness_score=groundedness_score,
            relevance_score=relevance_score,
            cosine_similarity_score=accuracy_score,
            timestamp=datetime.now().isoformat()
        )

        self.evaluation_results.append(result)
        return result

class EnhancedBilingualRAGPipeline:
    """Enhanced RAG Pipeline with evaluation capabilities"""

    def __init__(self, file_path: str):
        self.file_path = file_path
        self.vectorstore = None
        self.retriever = None
        self.qa_chain = None

        print("Initializing multilingual embeddings...")
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
            model_kwargs={'device': 'cpu'}
        )

        print("Initializing language model...")
        self.llm = HuggingFaceLLM()

        # Initialize evaluator
        self.evaluator = RAGEvaluator(self.embeddings)

    def load_and_prepare_documents(self) -> List[Document]:
        """Load documents from file"""
        print(f"Loading documents from: {self.file_path}")
        documents = []

        try:
            with open(self.file_path, 'r', encoding='utf-8') as file:
                content = file.read()

            # Split content by separator
            chunks = content.split('---')
            chunks = [chunk.strip() for chunk in chunks if chunk.strip() and len(chunk.strip()) > 10]

            for idx, chunk in enumerate(chunks):
                documents.append(Document(
                    page_content=chunk,
                    metadata={
                        'chunk_id': idx,
                        'source': f"chunk_{idx}",
                        'length': len(chunk)
                    }
                ))

            print(f"Loaded {len(documents)} document chunks")
            return documents

        except Exception as e:
            print(f"Error loading file: {e}")
            return []

    def create_vectorstore(self, documents: List[Document]):
        """Create vector store from documents"""
        print("Creating vector store...")

        if not documents:
            print("No documents to process!")
            return

        self.vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embeddings,
            persist_directory="/content/chroma_db"
        )

        self.retriever = self.vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs={"k": 3}
        )

        print("Vector store created successfully!")

    def setup_qa_chain(self):
        """Set up QA chain"""
        print("Setting up QA chain...")

        prompt_template = """Based on the following context, answer the question concisely.

Context:
{context}

Question: {question}

Provide a direct, short answer. If the question is in Bengali, answer in Bengali. If in English, answer in English.

Answer:"""

        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )

        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.retriever,
            chain_type_kwargs={"prompt": PROMPT},
            return_source_documents=True
        )

        print("QA chain setup complete!")

    def query_with_evaluation(self, question: str, expected_answer: str = None) -> Dict:
        """Query with comprehensive evaluation"""
        if not self.qa_chain:
            raise ValueError("QA chain not initialized. Run setup() first.")

        print(f"Processing query: {question}")

        try:
            relevant_docs = self.retriever.invoke(question)

            answer = self._extract_answer_from_context(question, relevant_docs)

            evaluation_result = None
            if expected_answer:
                evaluation_result = self.evaluator.comprehensive_evaluation(
                    question, answer, expected_answer, relevant_docs
                )

            return {
                "question": question,
                "answer": answer,
                "expected_answer": expected_answer,
                "source_documents": relevant_docs,
                "evaluation": evaluation_result
            }

        except Exception as e:
            print(f"Error processing query: {e}")
            return {
                "question": question,
                "answer": "উত্তর খুঁজে পাওয়া যায়নি",
                "expected_answer": expected_answer,
                "source_documents": [],
                "evaluation": None
            }

    def _extract_answer_from_context(self, question: str, docs: List[Document]) -> str:
        """Enhanced answer extraction with better pattern matching"""
        context = " ".join([doc.page_content for doc in docs])

        if any(word in question for word in ["কার", "কোন", "কি", "কত", "কে", "কোথায়", "কখন"]):
            lines = context.split('।')

            for line in lines:
                line = line.strip()

                if "বয়স" in question:
                    age_match = re.search(r'(\d+|[০-৯]+)\s*(বছর|বয়স)', line)
                    if age_match:
                        return age_match.group(0)

                if "সুপুরুষ" in question or "সপুরুষ" in question:
                    if "শুম্ভনাথ" in line:
                        return "শুম্ভনাথ"

                if "ভাগ্য দেবতা" in question or "মামো" in question:
                    if "মামো" in line:
                        return "মামো"

                question_words = [word for word in question.split() if len(word) > 2]
                if any(word in line for word in question_words):
                    if len(line) > 10:
                        return line[:80]

        sentences = context.split('।')
        for sentence in sentences[:3]:
            sentence = sentence.strip()
            if len(sentence) > 5:
                return sentence[:50]

        return "তথ্য পাওয়া যায়নি"

    def batch_evaluate(self, test_cases: List[Dict]) -> List[EvaluationResult]:
        """Evaluate multiple test cases at once"""
        results = []

        print(f"Running batch evaluation on {len(test_cases)} test cases...")

        for i, test_case in enumerate(test_cases):
            print(f"Evaluating test case {i+1}/{len(test_cases)}")

            question = test_case['question']
            expected_answer = test_case['expected_answer']

            result = self.query_with_evaluation(question, expected_answer)
            if result['evaluation']:
                results.append(result['evaluation'])

        return results

    def generate_evaluation_report(self, results: List[EvaluationResult]) -> str:
        """Generate comprehensive evaluation report"""
        if not results:
            return "No evaluation results available."

        avg_groundedness = np.mean([r.groundedness_score for r in results])
        avg_relevance = np.mean([r.relevance_score for r in results])
        avg_accuracy = np.mean([r.cosine_similarity_score for r in results])

        report = f"""
=== RAG SYSTEM EVALUATION REPORT ===
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

📊 AGGREGATE METRICS:
- Average Groundedness Score: {avg_groundedness:.3f}
- Average Relevance Score: {avg_relevance:.3f}
- Average Answer Accuracy: {avg_accuracy:.3f}

📋 DETAILED RESULTS:
"""

        for i, result in enumerate(results, 1):
            report += f"""
Test Case {i}:
Question: {result.question}
Expected: {result.expected_answer}
Got: {result.answer}
Groundedness: {result.groundedness_score:.3f}
Relevance: {result.relevance_score:.3f}
Accuracy: {result.cosine_similarity_score:.3f}
{'='*50}
"""

        return report

    def setup(self):
        """Complete setup of the RAG pipeline"""
        print("Setting up Enhanced RAG Pipeline with Evaluation...")

        documents = self.load_and_prepare_documents()
        if not documents:
            print("No documents loaded. Please check the file path.")
            return False

        self.create_vectorstore(documents)
        self.setup_qa_chain()

        print("Enhanced RAG Pipeline setup complete!")
        return True

def run_comprehensive_evaluation():
    """Run comprehensive evaluation of the RAG system"""

    file_path = "/content/drive/MyDrive/dataset/HSC26_chunks.txt"
    rag_pipeline = EnhancedBilingualRAGPipeline(file_path)

    success = rag_pipeline.setup()
    if not success:
        print("Failed to setup pipeline.")
        return

    test_cases = [
        {
            "question": "অনুপমের ভাষায় সুপুরুষ কাকে বলা হয়েছে?",
            "expected_answer": "শুম্ভনাথ"
        },
        {
            "question": "কাকে অনুপমের ভাগ্য দেবতা বলে উল্লেখ করা হয়েছে?",
            "expected_answer": "মামো"
        },
        {
            "question": "বিয়ের সময় কল্যাণীর প্রকৃত বয়স কত ছিল?",
            "expected_answer": "১৫ বছর"
        }
    ]

    print("\n🔬 COMPREHENSIVE RAG EVALUATION")
    print("="*60)

    evaluation_results = rag_pipeline.batch_evaluate(test_cases)

    report = rag_pipeline.generate_evaluation_report(evaluation_results)
    print(report)

    results_data = []
    for result in evaluation_results:
        results_data.append({
            'question': result.question,
            'answer': result.answer,
            'expected_answer': result.expected_answer,
            'groundedness_score': result.groundedness_score,
            'relevance_score': result.relevance_score,
            'accuracy_score': result.cosine_similarity_score,
            'timestamp': result.timestamp
        })

    df = pd.DataFrame(results_data)
    print("\n📈 EVALUATION METRICS SUMMARY:")
    print(df.describe())

    return rag_pipeline, evaluation_results

if __name__ == "__main__":
    pipeline, results = run_comprehensive_evaluation()

Initializing multilingual embeddings...
Initializing language model...
Setting up Enhanced RAG Pipeline with Evaluation...
Loading documents from: /content/drive/MyDrive/dataset/HSC26_chunks.txt
Loaded 141 document chunks
Creating vector store...
Vector store created successfully!
Setting up QA chain...
QA chain setup complete!
Enhanced RAG Pipeline setup complete!

🔬 COMPREHENSIVE RAG EVALUATION
Running batch evaluation on 3 test cases...
Evaluating test case 1/3
Processing query: অনুপমের ভাষায় সুপুরুষ কাকে বলা হয়েছে?
Evaluating test case 2/3
Processing query: কাকে অনুপমের ভাগ্য দেবতা বলে উল্লেখ করা হয়েছে?
Evaluating test case 3/3
Processing query: বিয়ের সময় কল্যাণীর প্রকৃত বয়স কত ছিল?

=== RAG SYSTEM EVALUATION REPORT ===
Generated on: 2025-07-26 08:21:26

📊 AGGREGATE METRICS:
- Average Groundedness Score: 0.677
- Average Relevance Score: 0.849
- Average Answer Accuracy: 0.333

📋 DETAILED RESULTS:

Test Case 1:
Question: অনুপমের ভাষায় সুপুরুষ কাকে বলা হয়েছে?
Expected: শুম্ভনা