Set up environment

In [None]:
import dspy
from typing import List, Literal
from pydantic import BaseModel, Field
from statistics import mean
import json
import os
from pathlib import Path
from typing import List, Literal
import random
from collections import defaultdict
import pandas as pd
from dotenv import load_dotenv

load_dotenv()

True

Configuration

In [11]:
# Model configuration (parameterized)
PROVIDER = "gemini"  # Options: "openai", "gemini", "anthropic"
TASK_MODEL_TEMP = 0.0  # Low temp for deterministic extraction
REFLECTION_MODEL_TEMP = 1.0  # High temp for diverse reflections

# Paths
QUESTIONS = "quetions"
MARKDOWN_DIR = Path("fin_docs")
PDF_DIR = Path("financial_pdfs/raw")

# Data split
TRAIN_RATIO = 0.75
RANDOM_SEED = 42

# GEPA configuration
GEPA_BUDGET = "light"  # Start with light, can increase to "medium" or "heavy"

Initialize DSPY

In [14]:
# Get API key from environment
if PROVIDER == "openai":
    api_key = os.environ.get("OPENAI_API_KEY")
    TASK_MODEL = "gpt-4.1-mini"  # Model for the extraction task
    REFLECTION_MODEL = "gpt-4.1"  # Model for GEPA reflection
elif PROVIDER == "gemini":
    api_key = os.environ.get("GEMINI_API_KEY")
    TASK_MODEL = "gemini-2.5-flash"  # Model for the extraction task
    REFLECTION_MODEL = "gemini-2.5.pro"  # Model for GEPA reflection
elif PROVIDER == "anthropic":
    api_key = os.environ.get("ANTHROPIC_API_KEY")
    TASK_MODEL = "claude-3"  # Model for the extraction task
    REFLECTION_MODEL = "claude-3"  # Model for GEPA reflection
else:
    raise ValueError(f"Unsupported provider: {PROVIDER}")
if not api_key:
    raise ValueError("API key not found in environment variables")

# Configure task model
task_lm = dspy.LM(
    model=f"{PROVIDER}/{TASK_MODEL}",
    temperature=TASK_MODEL_TEMP,
    api_key=api_key
)

# Configure reflection model for GEPA
reflection_lm = dspy.LM(
    model=f"{PROVIDER}/{REFLECTION_MODEL}",
    temperature=REFLECTION_MODEL_TEMP,
    max_tokens=32000,
    api_key=api_key
)

# Set default model
dspy.configure(lm=task_lm)

print(f"✅ Task Model: {TASK_MODEL}")
print(f"✅ Reflection Model: {REFLECTION_MODEL}")

✅ Task Model: gemini-2.5-flash
✅ Reflection Model: gemini-2.5.pro


Load Markdown file

In [13]:
# List Markdown files
markdown_files = list(MARKDOWN_DIR.glob("*.md"))

# Load Markdown file
def load_markdown_file(file_path: Path) -> str:
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()

# Load all Markdown files
markdown_contents = {md_file.name: load_markdown_file(md_file) for md_file in markdown_files}

# Count loaded files
print(f"✅ Loaded {len(markdown_contents)} Markdown files.")

✅ Loaded 60 Markdown files.


# Evaluator

## Pydantic Models

In [17]:
class InvestorQuestion(BaseModel):
    """A single question an investor would ask"""
    question: str = Field(description="The question text")
    category: str = Field(description="Category: financial, competitive, management, or risk")
    importance: float = Field(description="Importance weight 0-1", ge=0, le=1)
    reasoning: str = Field(description="Why this question matters for the investment decision")


class QuestionList(BaseModel):
    """Collection of investor questions"""
    questions: List[InvestorQuestion]


class QuestionEvaluation(BaseModel):
    """Evaluation of how well a report answers a question"""
    answerable: Literal["yes", "partial", "no"] = Field(
        description="Can the question be answered from the report?"
    )
    answer: str = Field(description="The answer extracted from the report, or explanation of what's missing")
    evidence: List[str] = Field(description="Direct quotes/references from the report supporting the answer")
    missing_information: List[str] = Field(description="Key information needed but not found in report")
    quality_rating: int = Field(description="Quality score 0-10", ge=0, le=10)
    reasoning: str = Field(description="Explanation of the rating")


class ReportMetrics(BaseModel):
    """Aggregate metrics for a report"""
    coverage_rate: float = Field(description="Percentage of questions answered (0-1)")
    quality_score: float = Field(description="Average quality rating (0-10)")
    answerable_fully: int = Field(description="Count of fully answerable questions")
    answerable_partial: int = Field(description="Count of partially answerable questions")
    not_answerable: int = Field(description="Count of unanswerable questions")
    critical_gaps: List[str] = Field(description="Most important missing information")

## DSPY Signatures

In [18]:
class GenerateExpertQuestions(dspy.Signature):
    """Generate critical questions that a specific investment persona would ask about a company"""
    
    persona_name: str = dspy.InputField(description="Name of the investment persona (e.g., Warren Buffett)")
    persona_philosophy: str = dspy.InputField(description="Investment philosophy and approach")
    company_summary: str = dspy.InputField(description="Basic company information and context")
    n_questions: int = dspy.InputField(description="Number of questions to generate")
    
    questions: QuestionList = dspy.OutputField(description="List of critical investment questions")


class EvaluateReportCoverage(dspy.Signature):
    """Evaluate whether and how well a report answers a specific investment question"""
    
    question: str = dspy.InputField(description="The investment question to evaluate")
    question_category: str = dspy.InputField(description="Question category for context")
    report: str = dspy.InputField(description="The investment report to evaluate")
    persona_name: str = dspy.InputField(description="Investor persona for context")
    
    evaluation: QuestionEvaluation = dspy.OutputField(
        description="Detailed evaluation of question coverage"
    )


class AssessPersonaAlignment(dspy.Signature):
    """Assess whether a report reflects the investment philosophy of the persona"""
    
    persona_name: str = dspy.InputField(description="Investment persona")
    persona_philosophy: str = dspy.InputField(description="Core investment principles")
    report: str = dspy.InputField(description="Investment report")
    questions_coverage: str = dspy.InputField(description="Summary of question coverage")
    
    alignment_score: float = dspy.OutputField(description="Alignment score 0-10")
    alignment_reasoning: str = dspy.OutputField(description="Explanation of alignment assessment")
    philosophy_evidence: List[str] = dspy.OutputField(description="Report sections reflecting philosophy")
    philosophy_gaps: List[str] = dspy.OutputField(description="Missing philosophy elements")

## DSPY Modules

In [19]:
class QuestionGenerator(dspy.Module):
    """Module for generating expert investor questions"""
    
    def __init__(self):
        super().__init__()
        self.generate = dspy.ChainOfThought(GenerateExpertQuestions)
    
    def forward(self, persona_name: str, persona_philosophy: str, 
                company_summary: str, n_questions: int = 15) -> QuestionList:
        result = self.generate(
            persona_name=persona_name,
            persona_philosophy=persona_philosophy,
            company_summary=company_summary,
            n_questions=n_questions
        )
        return result.questions


class CoverageEvaluator(dspy.Module):
    """Module for evaluating report coverage of a single question"""
    
    def __init__(self):
        super().__init__()
        self.evaluate = dspy.ChainOfThought(EvaluateReportCoverage)
    
    def forward(self, question: InvestorQuestion, report: str, 
                persona_name: str) -> QuestionEvaluation:
        result = self.evaluate(
            question=question.question,
            question_category=question.category,
            report=report,
            persona_name=persona_name
        )
        return result.evaluation


class PersonaAlignmentChecker(dspy.Module):
    """Module for checking persona alignment"""
    
    def __init__(self):
        super().__init__()
        self.assess = dspy.ChainOfThought(AssessPersonaAlignment)
    
    def forward(self, persona_name: str, persona_philosophy: str,
                report: str, questions_coverage: str):
        return self.assess(
            persona_name=persona_name,
            persona_philosophy=persona_philosophy,
            report=report,
            questions_coverage=questions_coverage
        )

## Main Evaluator

In [20]:
class ReportEvaluator(dspy.Module):
    """Complete report evaluation pipeline"""
    
    def __init__(self, persona_definitions: dict):
        """
        Args:
            persona_definitions: Dict mapping persona names to their philosophies
                e.g., {"Warren Buffett": "Focus on quality businesses with moats..."}
        """
        super().__init__()
        self.persona_definitions = persona_definitions
        
        # Initialize sub-modules
        self.question_generator = QuestionGenerator()
        self.coverage_evaluator = CoverageEvaluator()
        self.alignment_checker = PersonaAlignmentChecker()
        
        # Storage
        self.questions = []
        self.evaluation_results = []
    
    def generate_expert_questions(self, persona_name: str, 
                                 company_summary: str, n: int = 15) -> List[InvestorQuestion]:
        """Generate expert questions for the persona"""
        persona_philosophy = self.persona_definitions[persona_name]
        
        question_list = self.question_generator(
            persona_name=persona_name,
            persona_philosophy=persona_philosophy,
            company_summary=company_summary,
            n_questions=n
        )
        
        self.questions = question_list.questions
        return self.questions
    
    def evaluate_report_coverage(self, report: str, persona_name: str) -> List[QuestionEvaluation]:
        """Evaluate how well the report covers each question"""
        self.evaluation_results = []
        
        for question in self.questions:
            evaluation = self.coverage_evaluator(
                question=question,
                report=report,
                persona_name=persona_name
            )
            self.evaluation_results.append(evaluation)
        
        return self.evaluation_results
    
    def compute_metrics(self, persona_name: str, report: str) -> ReportMetrics:
        """Compute aggregate metrics"""
        if not self.evaluation_results:
            raise ValueError("Must run evaluate_report_coverage first")
        
        # Count answerability
        answerable_counts = {
            "yes": sum(1 for r in self.evaluation_results if r.answerable == "yes"),
            "partial": sum(1 for r in self.evaluation_results if r.answerable == "partial"),
            "no": sum(1 for r in self.evaluation_results if r.answerable == "no")
        }
        
        total = len(self.evaluation_results)
        coverage_rate = (answerable_counts["yes"] + 0.5 * answerable_counts["partial"]) / total
        
        # Average quality
        quality_score = mean([r.quality_rating for r in self.evaluation_results])
        
        # Identify critical gaps (questions with high importance and low coverage)
        critical_gaps = []
        for q, eval_result in zip(self.questions, self.evaluation_results):
            if q.importance > 0.7 and eval_result.quality_rating < 5:
                critical_gaps.extend(eval_result.missing_information)
        
        # Get persona alignment
        persona_philosophy = self.persona_definitions[persona_name]
        coverage_summary = self._summarize_coverage()
        
        alignment = self.alignment_checker(
            persona_name=persona_name,
            persona_philosophy=persona_philosophy,
            report=report,
            questions_coverage=coverage_summary
        )
        
        return ReportMetrics(
            coverage_rate=coverage_rate,
            quality_score=quality_score,
            answerable_fully=answerable_counts["yes"],
            answerable_partial=answerable_counts["partial"],
            not_answerable=answerable_counts["no"],
            critical_gaps=list(set(critical_gaps))[:5]  # Top 5 unique gaps
        ), alignment
    
    def _summarize_coverage(self) -> str:
        """Create a summary of question coverage for persona alignment"""
        summary_parts = []
        for q, eval_result in zip(self.questions, self.evaluation_results):
            summary_parts.append(
                f"Q: {q.question}\n"
                f"Coverage: {eval_result.answerable}\n"
                f"Quality: {eval_result.quality_rating}/10\n"
            )
        return "\n".join(summary_parts)
    
    def forward(self, persona_name: str, company_summary: str, 
                report: str, n_questions: int = 15):
        """Complete evaluation pipeline"""
        # Generate questions
        self.generate_expert_questions(persona_name, company_summary, n_questions)
        
        # Evaluate coverage
        self.evaluate_report_coverage(report, persona_name)
        
        # Compute metrics
        metrics, alignment = self.compute_metrics(persona_name, report)
        
        return {
            "questions": self.questions,
            "evaluations": self.evaluation_results,
            "metrics": metrics,
            "alignment": alignment
        }

## Example

In [None]:
def example_usage():
    """Example of how to use the ReportEvaluator"""
    
    # Configure DSPy
    lm = dspy.LM(f'{PROVIDER/{TASK_MODEL}}', max_tokens=4000)
    dspy.configure(lm=lm)
    
    # Define personas
    persona_definitions = {
        "Warren Buffett": """
        Focus on high-quality businesses with durable competitive advantages.
        Key criteria: strong economic moat, high ROE (>15%), consistent earnings,
        low debt, excellent management. Buy and hold forever mentality.
        """,
        "Benjamin Graham": """
        Deep value with margin of safety. Focus on price < intrinsic value.
        Key criteria: P/B < 1.0, P/E < 10, strong balance sheet, low debt,
        tangible asset backing. Seek distressed opportunities.
        """
    }
    
    # Initialize evaluator
    evaluator = ReportEvaluator(persona_definitions)
    
    # Example data
    company_summary = """
    GraceKennedy Ltd: Jamaican conglomerate with operations in food trading,
    banking, insurance, and remittances. Market cap: $50B JMD.
    Established 1922, listed on JSE Main Market.
    """
    
    report = """
    [Your generated investment report here]
    """
    
    # Run evaluation
    results = evaluator(
        persona_name="Warren Buffett",
        company_summary=company_summary,
        report=report,
        n_questions=15
    )
    
    # Access results
    print(f"Coverage Rate: {results['metrics'].coverage_rate:.2%}")
    print(f"Quality Score: {results['metrics'].quality_score:.1f}/10")
    print(f"Persona Alignment: {results['alignment'].alignment_score:.1f}/10")
    
    return results

# Optimize

In [None]:
# ============================================================================
# OPTIMIZATION WITH DSPY
# ============================================================================

class ReportQualityMetric:
    """Metric for optimizing report evaluation"""
    
    def __call__(self, example, prediction, trace=None) -> float:
        """
        Score based on:
        - Coverage rate (40%)
        - Quality score (40%)
        - Persona alignment (20%)
        """
        metrics = prediction['metrics']
        alignment = prediction['alignment']
        
        coverage_score = metrics.coverage_rate
        quality_score = metrics.quality_score / 10  # Normalize to 0-1
        alignment_score = alignment.alignment_score / 10
        
        total_score = (
            0.4 * coverage_score +
            0.4 * quality_score +
            0.2 * alignment_score
        )
        
        return total_score


def optimize_evaluator(train_examples, evaluator):
    """
    Optimize the evaluator using DSPy's optimizer
    
    Args:
        train_examples: List of dspy.Example objects with:
            - persona_name
            - company_summary
            - report
            - (optionally) expected_metrics for supervision
    """
    from dspy.teleprompt import BootstrapFewShot
    
    metric = ReportQualityMetric()
    
    # Optimize with few-shot examples
    optimizer = BootstrapFewShot(
        metric=metric,
        max_bootstrapped_demos=4,
        max_labeled_demos=4
    )
    
    optimized_evaluator = optimizer.compile(
        evaluator,
        trainset=train_examples
    )
    
    return optimized_evaluator