# Import Libraries

In [8]:
import pandas as pd
import numpy as np
import json
import random
from typing import Dict, List, Tuple, Any, Optional
from groq import Groq
import re
from datetime import datetime
import statistics

# Define Functions

In [None]:
# Configuration
GROQ_API_KEY = open("/Users/ani/Documents/0_API_KEYS/groq.txt").read().strip()
GROQ_LLM_MODEL = "llama-3.3-70b-versatile"
# Initialize Groq client
client = Groq(api_key=GROQ_API_KEY)

In [None]:
def load_stock_data(csv_path: str) -> pd.DataFrame:
    """Load and preprocess stock data from CSV."""
    df = pd.read_csv(csv_path)
    # Clean column names
    df.columns = df.columns.str.strip()
    return df

In [None]:
def generate_groq_response(prompt: str, max_tokens: int = 1000) -> str:
    """Generate response using Groq API."""
    try:
        chat_completion = client.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            model=GROQ_LLM_MODEL,
            max_tokens=max_tokens,
            temperature=0.1
        )
        return chat_completion.choices[0].message.content
    except Exception as e:
        return f"Error generating response: {str(e)}"

##### Test Cases

In [11]:
def generate_sector_queries(stock_df: pd.DataFrame) -> List[Dict]:
    """Generate sector-based queries for testing."""
    sectors = stock_df['Sector'].dropna().unique()
    
    query_templates = [
        "What are the best {sector} stocks for a conservative portfolio?",
        "Show me high-growth {sector} companies with good fundamentals",
        "Which {sector} stocks have the highest dividend yields?",
        "Find undervalued {sector} stocks with strong performance",
        "What are the top 5 {sector} stocks by market cap?"
    ]
    
    test_cases = []
    for sector in sectors:
        for template in query_templates:
            query = template.format(sector=sector)
            expected_stocks = stock_df[stock_df['Sector'] == sector]['Ticker'].tolist()
            
            test_cases.append({
                'query': query,
                'query_type': 'sector_filter',
                'expected_sector': sector,
                'expected_stocks': expected_stocks,
                'evaluation_criteria': ['sector_relevance', 'stock_recommendations', 'financial_reasoning']
            })
    
    return test_cases

In [12]:
def generate_performance_queries(stock_df: pd.DataFrame) -> List[Dict]:
    """Generate performance-based queries."""
    test_cases = [
        {
            'query': "Find stocks with the highest annualized returns over 20%",
            'query_type': 'performance_filter',
            'filter_condition': lambda df: df['Annualized_Return'] > 20,
            'evaluation_criteria': ['numerical_accuracy', 'performance_ranking', 'return_analysis']
        },
        {
            'query': "Show me stocks with low volatility and positive YTD returns",
            'query_type': 'risk_return',
            'filter_condition': lambda df: (df['Annualized_Volatility'] < 25) & (df['YTD_Return'] > 0),
            'evaluation_criteria': ['risk_assessment', 'return_analysis', 'portfolio_suitability']
        },
        {
            'query': "What are the best value stocks with low P/E ratios under 15?",
            'query_type': 'valuation',
            'filter_condition': lambda df: df['Trailing_PE'] < 15,
            'evaluation_criteria': ['valuation_analysis', 'value_reasoning', 'pe_accuracy']
        },
        {
            'query': "Find high dividend yield stocks above 3% with stable performance",
            'query_type': 'dividend_focus',
            'filter_condition': lambda df: df['Dividend_Yield'] > 3,
            'evaluation_criteria': ['dividend_analysis', 'stability_assessment', 'income_focus']
        }
    ]
    
    return test_cases

In [13]:
def generate_comparison_queries(stock_df: pd.DataFrame) -> List[Dict]:
    """Generate comparative analysis queries."""
    test_cases = []
    
    # Random stock comparisons
    tickers = stock_df['Ticker'].dropna().unique()
    for _ in range(10):
        selected_stocks = random.sample(list(tickers), 3)
        
        test_cases.append({
            'query': f"Compare {', '.join(selected_stocks)} and recommend which is best for growth investing",
            'query_type': 'stock_comparison',
            'target_stocks': selected_stocks,
            'evaluation_criteria': ['comparative_analysis', 'growth_metrics', 'recommendation_quality']
        })
    
    return test_cases

In [14]:
def generate_portfolio_queries() -> List[Dict]:
    """Generate portfolio construction queries."""
    return [
        {
            'query': "Build a balanced portfolio for a 35-year-old with moderate risk tolerance",
            'query_type': 'portfolio_construction',
            'investor_profile': {'age': 35, 'risk_tolerance': 'moderate'},
            'evaluation_criteria': ['asset_allocation', 'risk_management', 'age_appropriate']
        },
        {
            'query': "Create a retirement portfolio for someone 60 years old focused on income",
            'query_type': 'portfolio_construction',
            'investor_profile': {'age': 60, 'focus': 'income'},
            'evaluation_criteria': ['income_generation', 'capital_preservation', 'age_appropriate']
        },
        {
            'query': "Design an aggressive growth portfolio for a young investor",
            'query_type': 'portfolio_construction',
            'investor_profile': {'age': 25, 'risk_tolerance': 'aggressive'},
            'evaluation_criteria': ['growth_focus', 'risk_taking', 'long_term_strategy']
        }
    ]

##### Evaluation Functions

In [15]:
def evaluate_sector_relevance(response: str, expected_sector: str, stock_df: pd.DataFrame) -> Dict:
    """Evaluate if response correctly focuses on the specified sector."""
    
    eval_prompt = f"""
    Evaluate if this stock recommendation response correctly focuses on the {expected_sector} sector.
    
    Response: {response}
    Expected Sector: {expected_sector}
    
    Rate on a scale of 1-5:
    1 = No mention of sector or completely wrong sector
    2 = Minimal sector relevance
    3 = Some sector relevance but mixed with other sectors
    4 = Mostly correct sector focus
    5 = Perfect sector focus with detailed sector analysis
    
    Provide your rating and brief reasoning in JSON format:
    {{"score": <1-5>, "reasoning": "<explanation>"}}
    """
    
    evaluation = generate_groq_response(eval_prompt, max_tokens=200)
    
    try:
        eval_result = json.loads(evaluation)
        return {
            'metric': 'sector_relevance',
            'score': eval_result.get('score', 0),
            'max_score': 5,
            'reasoning': eval_result.get('reasoning', 'No reasoning provided')
        }
    except:
        return {
            'metric': 'sector_relevance',
            'score': 0,
            'max_score': 5,
            'reasoning': 'Failed to parse evaluation response'
        }

In [16]:
def evaluate_stock_recommendations(response: str, expected_stocks: List[str]) -> Dict:
    """Evaluate quality and relevance of stock recommendations."""
    
    # Extract mentioned tickers from response
    mentioned_tickers = re.findall(r'\b[A-Z]{1,5}\b', response)
    mentioned_tickers = [ticker for ticker in mentioned_tickers if len(ticker) <= 5]
    
    # Calculate overlap with expected stocks
    overlap = len(set(mentioned_tickers) & set(expected_stocks))
    precision = overlap / len(mentioned_tickers) if mentioned_tickers else 0
    recall = overlap / len(expected_stocks) if expected_stocks else 0
    
    eval_prompt = f"""
    Evaluate the quality of stock recommendations in this response:
    
    Response: {response}
    Mentioned Tickers: {mentioned_tickers}
    
    Rate the overall quality of recommendations on 1-5:
    1 = No specific stock recommendations
    2 = Poor recommendations with little justification
    3 = Adequate recommendations with basic reasoning
    4 = Good recommendations with solid analysis
    5 = Excellent recommendations with comprehensive analysis
    
    Provide rating in JSON: {{"score": <1-5>, "reasoning": "<explanation>"}}
    """
    
    evaluation = generate_groq_response(eval_prompt, max_tokens=200)
    
    try:
        eval_result = json.loads(evaluation)
        quality_score = eval_result.get('score', 0)
    except:
        quality_score = 0
    
    # Combine quality and relevance scores
    relevance_score = (precision + recall) / 2 * 5  # Convert to 1-5 scale
    final_score = (quality_score + relevance_score) / 2
    
    return {
        'metric': 'stock_recommendations',
        'score': round(final_score, 2),
        'max_score': 5,
        'reasoning': f"Quality: {quality_score}/5, Relevance: {relevance_score:.1f}/5, Precision: {precision:.2f}, Recall: {recall:.2f}"
    }

In [17]:
def evaluate_financial_reasoning(response: str) -> Dict:
    """Evaluate the quality of financial analysis and reasoning."""
    
    eval_prompt = f"""
    Evaluate the financial reasoning and analysis quality in this response:
    
    Response: {response}
    
    Consider:
    - Use of relevant financial metrics (P/E, ROE, debt ratios, etc.)
    - Understanding of risk-return tradeoffs
    - Market context and economic factors
    - Investment strategy coherence
    
    Rate on 1-5:
    1 = No financial reasoning
    2 = Basic mentions of financial concepts
    3 = Adequate financial analysis
    4 = Strong financial reasoning with multiple metrics
    5 = Sophisticated financial analysis with deep insights
    
    Provide rating in JSON: {{"score": <1-5>, "reasoning": "<explanation>"}}
    """
    
    evaluation = generate_groq_response(eval_prompt, max_tokens=300)
    
    try:
        eval_result = json.loads(evaluation)
        return {
            'metric': 'financial_reasoning',
            'score': eval_result.get('score', 0),
            'max_score': 5,
            'reasoning': eval_result.get('reasoning', 'No reasoning provided')
        }
    except:
        return {
            'metric': 'financial_reasoning',
            'score': 0,
            'max_score': 5,
            'reasoning': 'Failed to parse evaluation response'
        }

In [18]:
def evaluate_numerical_accuracy(response: str, stock_df: pd.DataFrame) -> Dict:
    """Evaluate accuracy of numerical data mentioned in response."""
    
    # Extract potential numerical values and tickers
    numbers = re.findall(r'(\d+\.?\d*)', response)
    tickers = re.findall(r'\b([A-Z]{1,5})\b', response)
    
    accuracy_score = 5  # Start with perfect score
    errors = []
    
    # Check if mentioned stocks have data that matches response claims
    for ticker in tickers:
        if ticker in stock_df['Ticker'].values:
            stock_data = stock_df[stock_df['Ticker'] == ticker].iloc[0]
            
            # Check for common metric mentions
            if 'P/E' in response or 'PE' in response:
                if stock_data['Trailing_PE'] > 0:
                    pe_str = f"{stock_data['Trailing_PE']:.1f}"
                    if pe_str[:3] not in response:
                        errors.append(f"PE ratio for {ticker} may be inaccurate")
    
    if errors:
        accuracy_score = max(1, 5 - len(errors))
    
    return {
        'metric': 'numerical_accuracy',
        'score': accuracy_score,
        'max_score': 5,
        'reasoning': f"Errors found: {'; '.join(errors)}" if errors else "No obvious numerical errors detected"
    }

In [19]:
def evaluate_response_completeness(response: str, query: str) -> Dict:
    """Evaluate if response adequately addresses the query."""
    
    eval_prompt = f"""
    Evaluate how completely this response addresses the original query:
    
    Query: {query}
    Response: {response}
    
    Rate completeness on 1-5:
    1 = Doesn't address the query at all
    2 = Partially addresses some aspects
    3 = Addresses main points but lacks depth
    4 = Thoroughly addresses most aspects
    5 = Comprehensively addresses all aspects with detail
    
    Provide rating in JSON: {{"score": <1-5>, "reasoning": "<explanation>"}}
    """
    
    evaluation = generate_groq_response(eval_prompt, max_tokens=200)
    
    try:
        eval_result = json.loads(evaluation)
        return {
            'metric': 'response_completeness',
            'score': eval_result.get('score', 0),
            'max_score': 5,
            'reasoning': eval_result.get('reasoning', 'No reasoning provided')
        }
    except:
        return {
            'metric': 'response_completeness',
            'score': 0,
            'max_score': 5,
            'reasoning': 'Failed to parse evaluation response'
        }


##### Evaluation Pipeline

In [20]:
def run_single_evaluation(test_case: Dict, rag_response: str, stock_df: pd.DataFrame) -> Dict:
    """Run evaluation for a single test case."""
    
    results = {
        'test_case': test_case,
        'response': rag_response,
        'evaluations': [],
        'timestamp': datetime.now().isoformat()
    }
    
    # Run evaluations based on criteria
    criteria = test_case.get('evaluation_criteria', [])
    
    if 'sector_relevance' in criteria and 'expected_sector' in test_case:
        eval_result = evaluate_sector_relevance(
            rag_response, 
            test_case['expected_sector'], 
            stock_df
        )
        results['evaluations'].append(eval_result)
    
    if 'stock_recommendations' in criteria:
        expected_stocks = test_case.get('expected_stocks', [])
        eval_result = evaluate_stock_recommendations(rag_response, expected_stocks)
        results['evaluations'].append(eval_result)
    
    if 'financial_reasoning' in criteria:
        eval_result = evaluate_financial_reasoning(rag_response)
        results['evaluations'].append(eval_result)
    
    if 'numerical_accuracy' in criteria:
        eval_result = evaluate_numerical_accuracy(rag_response, stock_df)
        results['evaluations'].append(eval_result)
    
    # Always evaluate completeness
    eval_result = evaluate_response_completeness(rag_response, test_case['query'])
    results['evaluations'].append(eval_result)
    
    # Calculate overall score
    if results['evaluations']:
        scores = [eval_res['score'] for eval_res in results['evaluations']]
        max_scores = [eval_res['max_score'] for eval_res in results['evaluations']]
        results['overall_score'] = sum(scores) / sum(max_scores) * 100
    else:
        results['overall_score'] = 0
    
    return results

In [21]:
def generate_test_suite(stock_df: pd.DataFrame, sample_size: int = 20) -> List[Dict]:
    """Generate comprehensive test suite."""
    
    test_cases = []
    
    # Generate different types of test cases
    sector_tests = generate_sector_queries(stock_df)
    performance_tests = generate_performance_queries(stock_df)
    comparison_tests = generate_comparison_queries(stock_df)
    portfolio_tests = generate_portfolio_queries()
    
    # Sample from each category
    test_cases.extend(random.sample(sector_tests, min(8, len(sector_tests))))
    test_cases.extend(random.sample(performance_tests, min(4, len(performance_tests))))
    test_cases.extend(random.sample(comparison_tests, min(5, len(comparison_tests))))
    test_cases.extend(random.sample(portfolio_tests, min(3, len(portfolio_tests))))
    
    return test_cases[:sample_size]

In [22]:
def run_evaluation_suite(test_cases: List[Dict], rag_system_function, stock_df: pd.DataFrame) -> Dict:
    """Run complete evaluation suite."""
    
    results = {
        'summary': {
            'total_tests': len(test_cases),
            'start_time': datetime.now().isoformat(),
            'test_types': {}
        },
        'individual_results': [],
        'aggregate_metrics': {}
    }
    
    for i, test_case in enumerate(test_cases):
        print(f"Running test {i+1}/{len(test_cases)}: {test_case['query'][:50]}...")
        
        # Get RAG system response
        try:
            rag_response = rag_system_function(test_case['query'])
        except Exception as e:
            rag_response = f"Error: {str(e)}"
        
        # Evaluate response
        eval_result = run_single_evaluation(test_case, rag_response, stock_df)
        results['individual_results'].append(eval_result)
        
        # Track test types
        test_type = test_case.get('query_type', 'unknown')
        if test_type not in results['summary']['test_types']:
            results['summary']['test_types'][test_type] = 0
        results['summary']['test_types'][test_type] += 1
    
    # Calculate aggregate metrics
    all_scores = [result['overall_score'] for result in results['individual_results']]
    results['summary']['end_time'] = datetime.now().isoformat()
    results['summary']['average_score'] = statistics.mean(all_scores)
    results['summary']['median_score'] = statistics.median(all_scores)
    results['summary']['min_score'] = min(all_scores)
    results['summary']['max_score'] = max(all_scores)
    results['summary']['std_score'] = statistics.stdev(all_scores) if len(all_scores) > 1 else 0
    
    # Calculate metric-specific aggregates
    metric_scores = {}
    for result in results['individual_results']:
        for evaluation in result['evaluations']:
            metric = evaluation['metric']
            score = evaluation['score'] / evaluation['max_score'] * 100
            
            if metric not in metric_scores:
                metric_scores[metric] = []
            metric_scores[metric].append(score)
    
    for metric, scores in metric_scores.items():
        results['aggregate_metrics'][metric] = {
            'average': statistics.mean(scores),
            'count': len(scores),
            'std': statistics.stdev(scores) if len(scores) > 1 else 0
        }
    
    return results

In [23]:
def save_evaluation_results(results: Dict, output_path: str):
    """Save evaluation results to JSON file."""
    with open(output_path, 'w') as f:
        json.dump(results, f, indent=2, default=str)
    print(f"Results saved to {output_path}")

In [24]:
def print_evaluation_summary(results: Dict):
    """Print formatted evaluation summary."""
    
    summary = results['summary']
    
    print("\n" + "="*50)
    print("RAG-LLM EVALUATION SUMMARY")
    print("="*50)
    
    print(f"Total Tests: {summary['total_tests']}")
    print(f"Average Score: {summary['average_score']:.1f}%")
    print(f"Median Score: {summary['median_score']:.1f}%")
    print(f"Score Range: {summary['min_score']:.1f}% - {summary['max_score']:.1f}%")
    print(f"Standard Deviation: {summary['std_score']:.1f}%")
    
    print("\nTest Types:")
    for test_type, count in summary['test_types'].items():
        print(f"  {test_type}: {count}")
    
    print("\nMetric Performance:")
    for metric, stats in results['aggregate_metrics'].items():
        print(f"  {metric}: {stats['average']:.1f}% (±{stats['std']:.1f}%)")
    
    print("\nTop 3 Best Performing Tests:")
    sorted_results = sorted(results['individual_results'], 
                          key=lambda x: x['overall_score'], reverse=True)
    for i, result in enumerate(sorted_results[:3]):
        print(f"  {i+1}. Score: {result['overall_score']:.1f}% - {result['test_case']['query'][:60]}...")
    
    print("\nTop 3 Worst Performing Tests:")
    for i, result in enumerate(sorted_results[-3:]):
        print(f"  {i+1}. Score: {result['overall_score']:.1f}% - {result['test_case']['query'][:60]}...")

##### Run Evaluation

In [None]:
# ========================
# USAGE EXAMPLE
# ========================

def dummy_rag_system(query: str) -> str:
    """Dummy RAG system for testing. Replace with your actual RAG system."""
    return f"This is a dummy response to: {query}"

# Example usage:
if __name__ == "__main__":
    # Load your stock data
    stock_df = load_stock_data("sample_data.csv")
    
    # Generate test cases
    test_cases = generate_test_suite(stock_df, sample_size=10)
    
    # Run evaluations (replace dummy_rag_system with your actual system)
    results = run_evaluation_suite(test_cases, dummy_rag_system, stock_df)
    
    # Print and save results
    print_evaluation_summary(results)
    save_evaluation_results(results, "rag_evaluation_results.json")