In [None]:

                                                 # Ontology Guided KG evaluation
# To assess KG quality, we implemented a two-layered LLM-based  validation framework. The first layer, , utilises
# DeepSeek R1 to compare triples to the source text, identifying false positives and false negatives. The second layer applies
# the KGValidator method to verify ontology conformity, assigning Pass/not pass labels and confidence scores. These layers yield
# precision, recall, F1-score, and semantic validity metrics.


import pandas as pd
import requests
import json
import time
from typing import List, Dict, Tuple, Any, Optional
import logging
from dataclasses import dataclass
from collections import defaultdict
import re

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class EvaluationResult:
    """Data class to store evaluation results for a chunk"""
    chunk_id: str
    precision: float
    recall: float
    f1_score: float
    true_positives: int
    false_positives: int
    false_negatives: int
    total_extracted: int
    total_ground_truth: int
    false_positive_triples: List[str]
    false_negative_facts: List[str]
    ontology_alignment: str
    ontology_confidence: float

class DeepSeekEvaluator:
    """Main evaluator class using DeepSeek API"""
    
    def __init__(self, api_key: str, api_url: str):
        self.api_key = api_key
        self.api_url = api_url
        self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-70B"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def call_deepseek_api(self, prompt: str, max_retries: int = 3) -> str:
        """Make API call to DeepSeek with retry logic"""
        for attempt in range(max_retries):
            try:
                payload = {
                    "model": self.model,
                    "messages": [
                        {
                            "role": "system",
                            "content": "You are an expert knowledge graph evaluator. Provide precise, structured responses following the exact format requested."
                        },
                        {
                            "role": "user",
                            "content": prompt
                        }
                    ],
                    "temperature": 0.1,
                    "max_tokens": 2000
                }
                
                response = requests.post(self.api_url, headers=self.headers, json=payload, timeout=60)
                response.raise_for_status()
                
                result = response.json()
                return result['choices'][0]['message']['content'].strip()
                
            except requests.exceptions.RequestException as e:
                logger.warning(f"API call attempt {attempt + 1} failed: {str(e)}")
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)  # Exponential backoff
        
        raise Exception("Max retries exceeded")
    
    def identify_false_positives(self, original_text: str, extracted_triples: List[Dict]) -> List[str]:
        """Identify incorrectly extracted triples (false positives)"""
        if not extracted_triples:
            return []
            
        triples_str = "\n".join([f"{t['subject']} -> {t['predicate']} -> {t['object']}" 
                                for t in extracted_triples])
        
        prompt = f"""
TASK: Identify FALSE POSITIVE triples - triples that are incorrectly extracted from the text.

ORIGINAL TEXT:
{original_text}

EXTRACTED TRIPLES:
{triples_str}

INSTRUCTIONS:
1. Carefully read the original text
2. For each extracted triple, determine if it accurately represents information from the text
3. A triple is a FALSE POSITIVE if:
   - The relationship doesn't exist in the text
   - The entities are not mentioned in the text
   - The relationship is misrepresented or incorrect
   - The triple makes assumptions not supported by the text

OUTPUT FORMAT:
List each false positive triple exactly as shown above, one per line.
If no false positives, respond with "NONE"

FALSE POSITIVES:
"""
        
        try:
            response = self.call_deepseek_api(prompt)
            logger.debug(f"False positives response: {response[:200]}...")
            
            if "NONE" in response.upper():
                return []
            
            # Extract false positive triples from response
            false_positives = []
            lines = response.split('\n')
            for line in lines:
                line = line.strip()
                if '->' in line and not line.startswith('FALSE POSITIVES:'):
                    false_positives.append(line)
            
            # Validate that we don't have more FPs than total triples
            if len(false_positives) > len(extracted_triples):
                logger.warning(f"Found {len(false_positives)} FPs but only {len(extracted_triples)} triples exist. Limiting to actual triples.")
                # Only keep FPs that match actual extracted triples
                validated_fps = []
                for fp in false_positives:
                    for triple in extracted_triples:
                        triple_str = f"{triple['subject']} -> {triple['predicate']} -> {triple['object']}"
                        if fp.strip() == triple_str.strip():
                            validated_fps.append(fp)
                            break
                false_positives = validated_fps
            
            logger.info(f"Identified {len(false_positives)} false positives")
            return false_positives
            
        except Exception as e:
            logger.error(f"Error identifying false positives: {e}")
            return []
    
    def identify_false_negatives(self, original_text: str, extracted_triples: List[Dict]) -> List[str]:
        """Identify missing facts (false negatives)"""
        triples_str = "\n".join([f"{t['subject']} -> {t['predicate']} -> {t['object']}" 
                                for t in extracted_triples]) if extracted_triples else "No triples extracted"
        
        prompt = f"""
TASK: Identify FALSE NEGATIVE facts - important facts present in the original text but missing from extracted triples.

ORIGINAL TEXT:
{original_text}

EXTRACTED TRIPLES:
{triples_str}

INSTRUCTIONS:
1. Carefully read the original text and identify all factual relationships
2. Compare with the extracted triples
3. List facts from the text that are NOT captured by any extracted triple
4. Focus on important relationships, entities, and attributes mentioned in the text
5. Express missing facts in the format: "Entity -> Relationship -> Entity/Value"

OUTPUT FORMAT:
List each missing fact as a potential triple, one per line.
If no missing facts, respond with "NONE"

MISSING FACTS:
"""
        
        try:
            response = self.call_deepseek_api(prompt)
            logger.debug(f"False negatives response: {response[:200]}...")
            
            if "NONE" in response.upper():
                return []
            
            # Extract missing facts from response
            missing_facts = []
            lines = response.split('\n')
            for line in lines:
                line = line.strip()
                if '->' in line and not line.startswith('MISSING FACTS:'):
                    missing_facts.append(line)
            
            logger.info(f"Identified {len(missing_facts)} false negatives")
            return missing_facts
            
        except Exception as e:
            logger.error(f"Error identifying false negatives: {e}")
            return []
    
    def evaluate_ontology_alignment(self, original_text: str, extracted_triples: List[Dict], 
                                  ontology_content: str) -> Tuple[str, float]:
        """Evaluate if triples align with the ontology"""
        if not extracted_triples:
            return "Not Pass", 0.0
            
        triples_str = "\n".join([f"{t['subject']} -> {t['predicate']} -> {t['object']}" 
                                for t in extracted_triples])
        
        prompt = f"""
TASK: Judge if the QUESTION (original text) and ANSWER (extracted triples) align well with the ONTOLOGY.

ONTOLOGY:
{ontology_content[:2000]}  # Truncate if too long

QUESTION (Original Text):
{original_text}

ANSWER (Extracted Triples):
{triples_str}

EVALUATION CRITERIA:
The QUESTION and ANSWER align well with the ONTOLOGY if:
1. They are in the same knowledge domain as the ONTOLOGY
2. The ANSWER follows the relationships defined in the ONTOLOGY
3. The entities and predicates used are consistent with the ontology structure
4. The semantic relationships respect the ontology constraints

OUTPUT FORMAT (exactly as specified):
(your judgment: Pass/Not Pass, confidence score)

Where confidence score is between 0.0 and 1.0

JUDGMENT:
"""
        
        try:
            response = self.call_deepseek_api(prompt)
            logger.debug(f"Ontology alignment response: {response}")
            
            # Parse the response to extract judgment and confidence
            # Look for pattern like "(Pass, 0.85)" or "(Not Pass, 0.6)"
            pattern = r'\((.*?),\s*([\d.]+)\)'
            match = re.search(pattern, response)
            
            if match:
                judgment = match.group(1).strip()
                confidence = float(match.group(2))
                # Validate confidence is in range
                confidence = max(0.0, min(1.0, confidence))
                return judgment, confidence
            else:
                # Fallback parsing
                if "Pass" in response and "Not Pass" not in response:
                    judgment = "Pass"
                else:
                    judgment = "Not Pass"
                
                # Try to extract confidence score
                conf_pattern = r'[\d.]+(?=\s*\)|\s*$)'
                conf_match = re.search(conf_pattern, response)
                confidence = float(conf_match.group()) if conf_match else 0.5
                confidence = max(0.0, min(1.0, confidence))
                
                return judgment, confidence
                
        except Exception as e:
            logger.warning(f"Error parsing ontology alignment response: {e}")
            return "Not Pass", 0.0

class KnowledgeGraphEvaluator:
    """Main evaluation orchestrator"""
    
    def __init__(self, chunks_file: str, triples_file: str, ontology_file: str, api_key: str):
        self.chunks_file = chunks_file
        self.triples_file = triples_file
        self.ontology_file = ontology_file
        self.evaluator = DeepSeekEvaluator(api_key, "https://inference.api.nscale.com/v1/chat/completions")
        
        # Load data
        self.chunks_df = pd.read_csv(chunks_file)
        self.triples_df = pd.read_csv(triples_file)
        self.ontology_content = self.load_ontology()
        
        logger.info(f"Loaded {len(self.chunks_df)} chunks and {len(self.triples_df)} triples")
    
    def load_ontology(self) -> str:
        """Load RDF ontology file"""
        try:
            with open(self.ontology_file, 'r', encoding='utf-8') as f:
                content = f.read()
                logger.info(f"Loaded ontology file: {len(content)} characters")
                return content
        except Exception as e:
            logger.error(f"Error loading ontology: {e}")
            return ""
    
    def get_triples_for_chunk(self, chunk_id: str) -> List[Dict]:
        """Get all triples associated with a chunk ID"""
        chunk_triples = self.triples_df[self.triples_df['chunk_id'] == chunk_id]
        triples = [
            {
                'subject': str(row['subject']),
                'predicate': str(row['predicate']),
                'object': str(row['object'])
            }
            for _, row in chunk_triples.iterrows()
        ]
        logger.debug(f"Found {len(triples)} triples for chunk {chunk_id}")
        return triples
    
    def calculate_metrics(self, total_extracted: int, false_positives: int, 
                         false_negatives: int) -> Tuple[float, float, float]:
        """Calculate precision, recall, and F1 score with proper validation"""
        
        # Validate inputs
        if false_positives > total_extracted:
            logger.error(f"Invalid metrics: {false_positives} FPs > {total_extracted} total extracted")
            false_positives = total_extracted
        
        # Calculate true positives (with protection against negative values)
        true_positives = max(0, total_extracted - false_positives)
        
        # Calculate precision
        if total_extracted == 0:
            precision = 0.0
        else:
            precision = true_positives / total_extracted
        
        # Calculate ground truth (what should have been extracted)
        # Ground truth = True Positives + False Negatives
        total_ground_truth = true_positives + false_negatives
        
        # Calculate recall
        if total_ground_truth == 0:
            recall = 0.0
        else:
            recall = true_positives / total_ground_truth
        
        # Calculate F1 score
        if precision + recall == 0:
            f1_score = 0.0
        else:
            f1_score = 2 * (precision * recall) / (precision + recall)
        
        # Log the calculation for debugging
        logger.debug(f"Metrics calculation: TP={true_positives}, FP={false_positives}, FN={false_negatives}, "
                    f"Precision={precision:.3f}, Recall={recall:.3f}, F1={f1_score:.3f}")
        
        return precision, recall, f1_score
    
    def evaluate_chunk(self, chunk_id: str) -> Optional[EvaluationResult]:
        """Evaluate a single chunk"""
        logger.info(f"Evaluating chunk: {chunk_id}")
        
        try:
            # Get chunk text
            chunk_row = self.chunks_df[self.chunks_df['chunk_id'] == chunk_id]
            if chunk_row.empty:
                logger.error(f"Chunk ID {chunk_id} not found")
                return None
            
            original_text = str(chunk_row.iloc[0]['text'])
            
            # Get associated triples
            extracted_triples = self.get_triples_for_chunk(chunk_id)
            
            if not extracted_triples:
                logger.warning(f"No triples found for chunk {chunk_id}")
                return EvaluationResult(
                    chunk_id=chunk_id,
                    precision=0.0, recall=0.0, f1_score=0.0,
                    true_positives=0, false_positives=0, false_negatives=0,
                    total_extracted=0, total_ground_truth=0,
                    false_positive_triples=[], false_negative_facts=[],
                    ontology_alignment="Not Pass", ontology_confidence=0.0
                )
            
            # Identify false positives
            false_positive_triples = self.evaluator.identify_false_positives(original_text, extracted_triples)
            
            # Identify false negatives
            false_negative_facts = self.evaluator.identify_false_negatives(original_text, extracted_triples)
            
            # Evaluate ontology alignment
            ontology_judgment, ontology_confidence = self.evaluator.evaluate_ontology_alignment(
                original_text, extracted_triples, self.ontology_content
            )
            
            # Calculate metrics
            total_extracted = len(extracted_triples)
            false_positives = len(false_positive_triples)
            false_negatives = len(false_negative_facts)
            
            # Validate false positives
            if false_positives > total_extracted:
                logger.warning(f"Capping FPs: {false_positives} > {total_extracted}")
                false_positives = total_extracted
                false_positive_triples = false_positive_triples[:total_extracted]
            
            precision, recall, f1_score = self.calculate_metrics(
                total_extracted, false_positives, false_negatives
            )
            
            true_positives = max(0, total_extracted - false_positives)
            total_ground_truth = true_positives + false_negatives
            
            result = EvaluationResult(
                chunk_id=chunk_id,
                precision=precision,
                recall=recall,
                f1_score=f1_score,
                true_positives=true_positives,
                false_positives=false_positives,
                false_negatives=false_negatives,
                total_extracted=total_extracted,
                total_ground_truth=total_ground_truth,
                false_positive_triples=false_positive_triples,
                false_negative_facts=false_negative_facts,
                ontology_alignment=ontology_judgment,
                ontology_confidence=ontology_confidence
            )
            
            logger.info(f"Chunk {chunk_id} evaluated: F1={f1_score:.3f}, P={precision:.3f}, R={recall:.3f}, "
                       f"TP={true_positives}, FP={false_positives}, FN={false_negatives}")
            
            return result
            
        except Exception as e:
            logger.error(f"Error evaluating chunk {chunk_id}: {e}")
            import traceback
            traceback.print_exc()
            return None
    
    def evaluate_all(self, max_chunks: int = None) -> List[EvaluationResult]:
        # iterate over every chunk, even if it has no triples
        unique_chunk_ids = self.triples_df['chunk_id'].unique()
        total_chunks    = len(unique_chunk_ids)

        
        results = []
        total_chunks = len(unique_chunk_ids)
        
        for i, chunk_id in enumerate(unique_chunk_ids):
            try:
                logger.info(f"Processing chunk {i+1}/{total_chunks}: {chunk_id}")
                result = self.evaluate_chunk(chunk_id)
                if result is not None:
                    results.append(result)
                
                # Add delay to avoid rate limiting
                if i < total_chunks - 1:  # Don't sleep after last chunk
                    time.sleep(1)
                
            except Exception as e:
                logger.error(f"Error evaluating chunk {chunk_id}: {e}")
                continue
        
        logger.info(f"Successfully evaluated {len(results)} out of {total_chunks} chunks")
        return results
    
    def generate_report(self, results: List[EvaluationResult]) -> Dict[str, Any]:
        """Generate comprehensive evaluation report"""
        if not results:
            return {"error": "No results to report"}
        
        # Calculate overall metrics
        total_tp = sum(r.true_positives for r in results)
        total_fp = sum(r.false_positives for r in results)
        total_fn = sum(r.false_negatives for r in results)
        total_extracted = sum(r.total_extracted for r in results)
        
        # Use the same validated calculation method
        overall_precision, overall_recall, overall_f1 = self.calculate_metrics(
            total_extracted, total_fp, total_fn
        )
        
        # Ground truth is TP + FN
        total_ground_truth = total_tp + total_fn
        
        # Ontology alignment statistics
        pass_count = sum(1 for r in results if r.ontology_alignment == "Pass")
        avg_confidence = sum(r.ontology_confidence for r in results) / len(results) if results else 0
        
        # Calculate per-chunk statistics
        chunk_metrics = []
        for result in results:
            chunk_metrics.append({
                'chunk_id': result.chunk_id,
                'precision': result.precision,
                'recall': result.recall,
                'f1_score': result.f1_score,
                'extracted_triples': result.total_extracted,
                'true_positives': result.true_positives,
                'false_positives': result.false_positives,
                'false_negatives': result.false_negatives,
                'ground_truth': result.total_ground_truth,
                'ontology_alignment': result.ontology_alignment,
                'ontology_confidence': result.ontology_confidence
            })
        
        # Sort chunks by F1 score
        chunk_metrics_sorted = sorted(chunk_metrics, key=lambda x: x['f1_score'], reverse=True)
        
        report = {
            'summary': {
                'total_chunks_evaluated': len(results),
                'average_f1_score': overall_f1,
                'average_precision': overall_precision,
                'average_recall': overall_recall,
                'ontology_pass_rate': pass_count / len(results) if results else 0
            },
            'overall_metrics': {
                'precision': overall_precision,
                'recall': overall_recall,
                'f1_score': overall_f1,
                'total_extracted_triples': total_extracted,
                'total_true_positives': total_tp,
                'total_false_positives': total_fp,
                'total_false_negatives': total_fn,
                'total_ground_truth_facts': total_ground_truth
            },
            'ontology_alignment': {
                'pass_rate': pass_count / len(results) if results else 0,
                'pass_count': pass_count,
                'total_chunks': len(results),
                'average_confidence': avg_confidence
            },
            'best_performing_chunks': chunk_metrics_sorted[:5],
            'worst_performing_chunks': chunk_metrics_sorted[-5:],
            'chunk_level_metrics': chunk_metrics,
            'detailed_results': results
        }
        
        return report
    
    def save_results(self, results: List[EvaluationResult], output_file: str):
        """Save detailed results to CSV"""
        data = []
        for result in results:
            data.append({
                'chunk_id': result.chunk_id,
                'precision': round(result.precision, 4),
                'recall': round(result.recall, 4),
                'f1_score': round(result.f1_score, 4),
                'true_positives': result.true_positives,
                'false_positives': result.false_positives,
                'false_negatives': result.false_negatives,
                'total_extracted': result.total_extracted,
                'total_ground_truth': result.total_ground_truth,
                'ontology_alignment': result.ontology_alignment,
                'ontology_confidence': round(result.ontology_confidence, 4),
                'false_positive_triples': '; '.join(result.false_positive_triples),
                'false_negative_facts': '; '.join(result.false_negative_facts)
            })
        
        df = pd.DataFrame(data)
        df.to_csv(output_file, index=False)
        logger.info(f"Results saved to {output_file}")

def main():
    """Main execution function"""
    # Configuration
    API_KEY = "Put your api key here" 
    CHUNKS_FILE = "chunks.csv"
    TRIPLES_FILE = "Ontology_Guided_Triples.csv"
    ONTOLOGY_FILE = "EFRO.rdf"
    
    # Initialize evaluator
    try:
        evaluator = KnowledgeGraphEvaluator(
            chunks_file=CHUNKS_FILE,
            triples_file=TRIPLES_FILE,
            ontology_file=ONTOLOGY_FILE,
            api_key=API_KEY
        )
        
        # Run evaluation
        logger.info("Starting evaluation...")
        results = evaluator.evaluate_all()  
        
        if not results:
            logger.error("No results generated")
            return
        
        # Generate and print report
        report = evaluator.generate_report(results)
        
        print("\n" + "="*60)
        print("KNOWLEDGE GRAPH EVALUATION REPORT")
        print("="*60)
        
        print(f"\nSUMMARY:")
        print(f"Chunks Evaluated: {report['summary']['total_chunks_evaluated']}")
        print(f"Average F1 Score: {report['summary']['average_f1_score']:.3f}")
        print(f"Average Precision: {report['summary']['average_precision']:.3f}")
        print(f"Average Recall: {report['summary']['average_recall']:.3f}")
        print(f"Ontology Pass Rate: {report['summary']['ontology_pass_rate']:.1%}")
        
        print(f"\nOVERALL METRICS:")
        print(f"Total Extracted Triples: {report['overall_metrics']['total_extracted_triples']}")
        print(f"True Positives: {report['overall_metrics']['total_true_positives']}")
        print(f"False Positives: {report['overall_metrics']['total_false_positives']}")
        print(f"False Negatives: {report['overall_metrics']['total_false_negatives']}")
        print(f"Ground Truth Facts: {report['overall_metrics']['total_ground_truth_facts']}")
        
        print(f"\nONTOLOGY ALIGNMENT:")
        print(f"Pass Rate: {report['ontology_alignment']['pass_rate']:.1%}")
        print(f"Passed Chunks: {report['ontology_alignment']['pass_count']}/{report['ontology_alignment']['total_chunks']}")
        print(f"Average Confidence: {report['ontology_alignment']['average_confidence']:.3f}")
        
        print(f"\nTOP 5 BEST PERFORMING CHUNKS:")
        for i, chunk in enumerate(report['best_performing_chunks'][:5], 1):
            print(f"{i}. {chunk['chunk_id']}: F1={chunk['f1_score']:.3f}, P={chunk['precision']:.3f}, R={chunk['recall']:.3f}")
        
        print(f"\nBOTTOM 5 WORST PERFORMING CHUNKS:")
        for i, chunk in enumerate(report['worst_performing_chunks'][:5], 1):
            print(f"{i}. {chunk['chunk_id']}: F1={chunk['f1_score']:.3f}, P={chunk['precision']:.3f}, R={chunk['recall']:.3f}")
        
        # Save detailed results
        evaluator.save_results(results, "evaluation_results.csv")
        
        # Save report as JSON
        with open("evaluation_report.json", "w") as f:
            # Convert EvaluationResult objects to dicts for JSON serialization
            report_copy = report.copy()
            report_copy['detailed_results'] = [
                {
                    'chunk_id': r.chunk_id,
                    'precision': r.precision,
                    'recall': r.recall,
                    'f1_score': r.f1_score,
                    'true_positives': r.true_positives,
                    'false_positives': r.false_positives,
                    'false_negatives': r.false_negatives,
                    'ontology_alignment': r.ontology_alignment,
                    'ontology_confidence': r.ontology_confidence,
                    'false_positive_triples': r.false_positive_triples,
                    'false_negative_facts': r.false_negative_facts
                }
                for r in results
            ]
            json.dump(report_copy, f, indent=2)
        
        logger.info("\nEvaluation completed successfully!")
        logger.info("Output files:")
        logger.info("- evaluation_results.csv (detailed metrics)")
        logger.info("- evaluation_report.json (complete report)")
        
    except Exception as e:
        logger.error(f"Evaluation failed: {e}")
        import traceback
        traceback.print_exc()
        raise

if __name__ == "__main__":
    main()