In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import torch
from rouge_score import rouge_scorer
from textstat import flesch_reading_ease, flesch_kincaid_grade
import re
import json
from typing import List, Dict, Tuple
import warnings
warnings.filterwarnings('ignore')

class WealthManagementEvaluator:
    def __init__(self, base_model_path: str, finetuned_model_path: str):
        """
        Initialize evaluator with base and fine-tuned models
        
        Args:
            base_model_path: Path to base Phi-3 model
            finetuned_model_path: Path to fine-tuned model
        """
        self.base_model_path = base_model_path
        self.finetuned_model_path = finetuned_model_path
        
        # Load models and tokenizers
        print("Loading models...")
        self.base_tokenizer = AutoTokenizer.from_pretrained(base_model_path)
        self.base_model = AutoModelForCausalLM.from_pretrained(base_model_path)
        
        self.ft_tokenizer = AutoTokenizer.from_pretrained(finetuned_model_path)
        self.ft_model = AutoModelForCausalLM.from_pretrained(finetuned_model_path)
        
        # Load sentence transformer for semantic similarity
        self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
        
        # Initialize ROUGE scorer
        self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
        
        # Wealth management keywords for domain-specific evaluation
        self.wealth_keywords = [
            'portfolio', 'diversification', 'risk', 'return', 'investment', 'asset',
            'allocation', 'stocks', 'bonds', 'mutual funds', 'etf', 'tax',
            'retirement', '401k', 'ira', 'dividend', 'capital gains', 'market',
            'volatility', 'hedge', 'financial planning', 'wealth management'
        ]
        
    def generate_response(self, model, tokenizer, prompt: str, max_length: int = 512) -> str:
        """Generate response from model"""
        inputs = tokenizer.encode(prompt, return_tensors='pt')
        
        with torch.no_grad():
            outputs = model.generate(
                inputs,
                max_length=max_length,
                num_return_sequences=1,
                temperature=0.7,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id
            )
        
        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
        # Remove the prompt from response
        response = response[len(prompt):].strip()
        return response

    def evaluate_domain_knowledge(self, test_cases: List[Dict]) -> Dict:
        """Evaluate domain-specific knowledge"""
        results = {
            'base_model': [],
            'finetuned_model': [],
            'prompts': [],
            'categories': []
        }
        
        for case in test_cases:
            prompt = case['prompt']
            category = case.get('category', 'general')
            
            # Generate responses
            base_response = self.generate_response(self.base_model, self.base_tokenizer, prompt)
            ft_response = self.generate_response(self.ft_model, self.ft_tokenizer, prompt)
            
            results['base_model'].append(base_response)
            results['finetuned_model'].append(ft_response)
            results['prompts'].append(prompt)
            results['categories'].append(category)
        
        return results

    def calculate_domain_specificity_score(self, text: str) -> float:
        """Calculate how domain-specific a response is based on wealth management keywords"""
        text_lower = text.lower()
        keyword_count = sum(1 for keyword in self.wealth_keywords if keyword in text_lower)
        # Normalize by text length (words)
        word_count = len(text.split())
        return keyword_count / max(word_count, 1) * 100

    def calculate_semantic_similarity(self, response1: str, response2: str, reference: str = None) -> Dict:
        """Calculate semantic similarity scores"""
        embeddings = self.sentence_model.encode([response1, response2, reference] if reference else [response1, response2])
        
        if reference:
            # Similarity to reference answer
            ref_sim_1 = cosine_similarity([embeddings[0]], [embeddings[2]])[0][0]
            ref_sim_2 = cosine_similarity([embeddings[1]], [embeddings[2]])[0][0]
            return {
                'base_to_reference': ref_sim_1,
                'finetuned_to_reference': ref_sim_2,
                'base_to_finetuned': cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
            }
        else:
            return {
                'base_to_finetuned': cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
            }

    def calculate_readability_scores(self, text: str) -> Dict:
        """Calculate readability metrics"""
        return {
            'flesch_reading_ease': flesch_reading_ease(text),
            'flesch_kincaid_grade': flesch_kincaid_grade(text)
        }

    def detect_hallucinations(self, text: str) -> Dict:
        """Simple hallucination detection based on suspicious patterns"""
        # Look for specific numbers without context (potential made-up statistics)
        suspicious_patterns = [
            r'\d+\.\d+%\s+return',  # Specific return percentages
            r'\$\d+,\d+',  # Specific dollar amounts
            r'studies show that \d+%',  # Unattributed statistics
        ]
        
        hallucination_flags = []
        for pattern in suspicious_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                hallucination_flags.append(pattern)
        
        return {
            'potential_hallucinations': len(hallucination_flags),
            'flags': hallucination_flags
        }

    def evaluate_safety_compliance(self, text: str) -> Dict:
        """Check for appropriate disclaimers and safety measures"""
        safety_indicators = [
            'not financial advice',
            'consult a financial advisor',
            'past performance',
            'risk',
            'disclaimer',
            'professional advice'
        ]
        
        safety_score = sum(1 for indicator in safety_indicators if indicator.lower() in text.lower())
        
        return {
            'safety_score': safety_score,
            'has_disclaimer': any(indicator in text.lower() for indicator in safety_indicators[:3])
        }

    def comprehensive_evaluation(self, test_cases: List[Dict], reference_answers: List[str] = None) -> pd.DataFrame:
        """Run comprehensive evaluation"""
        print("Running comprehensive evaluation...")
        
        # Generate responses
        responses = self.evaluate_domain_knowledge(test_cases)
        
        evaluation_results = []
        
        for i in range(len(test_cases)):
            base_resp = responses['base_model'][i]
            ft_resp = responses['finetuned_model'][i]
            prompt = responses['prompts'][i]
            category = responses['categories'][i]
            
            # Calculate metrics
            base_domain_score = self.calculate_domain_specificity_score(base_resp)
            ft_domain_score = self.calculate_domain_specificity_score(ft_resp)
            
            base_readability = self.calculate_readability_scores(base_resp)
            ft_readability = self.calculate_readability_scores(ft_resp)
            
            base_hallucination = self.detect_hallucinations(base_resp)
            ft_hallucination = self.detect_hallucinations(ft_resp)
            
            base_safety = self.evaluate_safety_compliance(base_resp)
            ft_safety = self.evaluate_safety_compliance(ft_resp)
            
            # Semantic similarity
            reference = reference_answers[i] if reference_answers else None
            similarity = self.calculate_semantic_similarity(base_resp, ft_resp, reference)
            
            result = {
                'prompt': prompt,
                'category': category,
                'base_response': base_resp,
                'finetuned_response': ft_resp,
                'base_domain_score': base_domain_score,
                'finetuned_domain_score': ft_domain_score,
                'domain_score_improvement': ft_domain_score - base_domain_score,
                'base_readability_ease': base_readability['flesch_reading_ease'],
                'ft_readability_ease': ft_readability['flesch_reading_ease'],
                'base_readability_grade': base_readability['flesch_kincaid_grade'],
                'ft_readability_grade': ft_readability['flesch_kincaid_grade'],
                'base_hallucination_flags': base_hallucination['potential_hallucinations'],
                'ft_hallucination_flags': ft_hallucination['potential_hallucinations'],
                'base_safety_score': base_safety['safety_score'],
                'ft_safety_score': ft_safety['safety_score'],
                'base_has_disclaimer': base_safety['has_disclaimer'],
                'ft_has_disclaimer': ft_safety['has_disclaimer'],
                'response_similarity': similarity['base_to_finetuned']
            }
            
            if reference:
                result['base_reference_similarity'] = similarity['base_to_reference']
                result['ft_reference_similarity'] = similarity['finetuned_to_reference']
            
            evaluation_results.append(result)
        
        return pd.DataFrame(evaluation_results)

    def create_evaluation_report(self, results_df: pd.DataFrame) -> None:
        """Create comprehensive evaluation report with visualizations"""
        print("\n" + "="*50)
        print("WEALTH MANAGEMENT MODEL EVALUATION REPORT")
        print("="*50)
        
        # Summary statistics
        print("\n1. DOMAIN KNOWLEDGE COMPARISON")
        print("-" * 30)
        print(f"Average Base Model Domain Score: {results_df['base_domain_score'].mean():.2f}")
        print(f"Average Fine-tuned Domain Score: {results_df['finetuned_domain_score'].mean():.2f}")
        print(f"Average Improvement: {results_df['domain_score_improvement'].mean():.2f}")
        
        print("\n2. READABILITY COMPARISON")
        print("-" * 30)
        print(f"Base Model Avg Reading Ease: {results_df['base_readability_ease'].mean():.2f}")
        print(f"Fine-tuned Avg Reading Ease: {results_df['ft_readability_ease'].mean():.2f}")
        print(f"Base Model Avg Grade Level: {results_df['base_readability_grade'].mean():.2f}")
        print(f"Fine-tuned Avg Grade Level: {results_df['ft_readability_grade'].mean():.2f}")
        
        print("\n3. SAFETY & COMPLIANCE")
        print("-" * 30)
        print(f"Base Model Avg Safety Score: {results_df['base_safety_score'].mean():.2f}")
        print(f"Fine-tuned Avg Safety Score: {results_df['ft_safety_score'].mean():.2f}")
        print(f"Base Model Disclaimer Rate: {results_df['base_has_disclaimer'].mean()*100:.1f}%")
        print(f"Fine-tuned Disclaimer Rate: {results_df['ft_has_disclaimer'].mean()*100:.1f}%")
        
        print("\n4. HALLUCINATION DETECTION")
        print("-" * 30)
        print(f"Base Model Avg Hallucination Flags: {results_df['base_hallucination_flags'].mean():.2f}")
        print(f"Fine-tuned Avg Hallucination Flags: {results_df['ft_hallucination_flags'].mean():.2f}")
        
        # Create visualizations
        self.create_visualizations(results_df)

    def create_visualizations(self, results_df: pd.DataFrame):
        """Create comparison visualizations"""
        plt.style.use('seaborn-v0_8')
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # Domain Score Comparison
        axes[0, 0].boxplot([results_df['base_domain_score'], results_df['finetuned_domain_score']], 
                          labels=['Base Model', 'Fine-tuned'])
        axes[0, 0].set_title('Domain Knowledge Score Comparison')
        axes[0, 0].set_ylabel('Domain Specificity Score')
        
        # Safety Score Comparison
        axes[0, 1].bar(['Base Model', 'Fine-tuned'], 
                      [results_df['base_safety_score'].mean(), results_df['ft_safety_score'].mean()])
        axes[0, 1].set_title('Average Safety Score Comparison')
        axes[0, 1].set_ylabel('Safety Score')
        
        # Readability Comparison
        readability_data = pd.DataFrame({
            'Model': ['Base'] * len(results_df) + ['Fine-tuned'] * len(results_df),
            'Reading_Ease': list(results_df['base_readability_ease']) + list(results_df['ft_readability_ease'])
        })
        sns.boxplot(data=readability_data, x='Model', y='Reading_Ease', ax=axes[1, 0])
        axes[1, 0].set_title('Reading Ease Comparison')
        
        # Category-wise Performance
        if 'category' in results_df.columns:
            category_perf = results_df.groupby('category')['domain_score_improvement'].mean()
            axes[1, 1].bar(category_perf.index, category_perf.values)
            axes[1, 1].set_title('Domain Score Improvement by Category')
            axes[1, 1].set_ylabel('Improvement Score')
            plt.setp(axes[1, 1].xaxis.get_majorticklabels(), rotation=45)
        
        plt.tight_layout()
        plt.savefig('model_comparison_report.png', dpi=300, bbox_inches='tight')
        plt.show()

# Example usage and test cases
def create_wealth_management_test_cases():
    """Create comprehensive test cases for wealth management evaluation"""
    return [
        {
            'prompt': "I'm 35 years old with $50k to invest. How should I diversify my portfolio?",
            'category': 'investment_advice'
        },
        {
            'prompt': "I'm worried about market volatility affecting my retirement savings. What should I do?",
            'category': 'risk_management'
        },
        {
            'prompt': "What are the tax implications of selling my stocks this year?",
            'category': 'tax_planning'
        },
        {
            'prompt': "Should I invest in ETFs or mutual funds for long-term growth?",
            'category': 'investment_products'
        },
        {
            'prompt': "How much should I contribute to my 401k vs Roth IRA?",
            'category': 'retirement_planning'
        },
        {
            'prompt': "The market crashed 20% last week. Should I sell everything?",
            'category': 'market_sentiment'
        },
        {
            'prompt': "I inherited $100k. What's the best way to invest it?",
            'category': 'inheritance_planning'
        },
        {
            'prompt': "Is it better to pay off my mortgage or invest in the stock market?",
            'category': 'debt_vs_investment'
        }
    ]

def run_evaluation_example():
    """Example of how to run the evaluation"""
    # Initialize evaluator (replace with your actual model paths)
    evaluator = WealthManagementEvaluator(
        base_model_path="phi-finetuned-financial\checkpoint7500",
        finetuned_model_path="phi3-wealth-lora"
    )
    
    # Create test cases
    test_cases = create_wealth_management_test_cases()
    
    # Run evaluation
    results = evaluator.comprehensive_evaluation(test_cases)
    
    # Generate report
    evaluator.create_evaluation_report(results)
    
    # Save detailed results
    results.to_csv('detailed_evaluation_results.csv', index=False)
    
    return results

# Human evaluation template
def create_human_evaluation_template(results_df: pd.DataFrame):
    """Create template for human evaluation"""
    template = []
    
    for idx, row in results_df.iterrows():
        template.append({
            'prompt': row['prompt'],
            'base_response': row['base_response'],
            'finetuned_response': row['finetuned_response'],
            'evaluation_criteria': {
                'accuracy': 'Rate 1-5: How accurate is the financial information?',
                'relevance': 'Rate 1-5: How relevant is the response to the query?',
                'clarity': 'Rate 1-5: How clear and understandable is the response?',
                'professionalism': 'Rate 1-5: How professional is the tone?',
                'safety': 'Rate 1-5: Does it include appropriate disclaimers?'
            },
            'base_scores': {'accuracy': '', 'relevance': '', 'clarity': '', 'professionalism': '', 'safety': ''},
            'finetuned_scores': {'accuracy': '', 'relevance': '', 'clarity': '', 'professionalism': '', 'safety': ''},
            'overall_preference': 'base/finetuned/tie',
            'comments': ''
        })
    
    with open('human_evaluation_template.json', 'w') as f:
        json.dump(template, f, indent=2)
    
    print("Human evaluation template saved to 'human_evaluation_template.json'")
    return template

if __name__ == "__main__":
    # Run the evaluation
    print("Starting Wealth Management Model Evaluation...")
    results = run_evaluation_example()
    
    # Create human evaluation template
    create_human_evaluation_template(results)
    
    print("\nEvaluation complete! Check the generated files:")
    print("- model_comparison_report.png")
    print("- detailed_evaluation_results.csv") 
    print("- human_evaluation_template.json")

Starting Wealth Management Model Evaluation...
Loading models...


Loading checkpoint shards: 100%|██████████| 2/2 [00:42<00:00, 21.20s/it]


HFValidationError: Repo id must use alphanumeric chars or '-', '_', '.', '--' and '..' are forbidden, '-' and '.' cannot start or end the name, max length is 96: 'phi-finetuned-financial\checkpoint-7500'.