Notebook for optimizing the trend analyzer code.
To run the notebook, run the command "jupyter notebook --ip=0.0.0.0 --port=5024 --allow-root --no-browser". Then open the webpage that opens in replit in a new tab, and enter the token for the server you find from the command "jupter server list". If you enter the token in the replit preview it will give you a 403 error.

In [4]:
#import necessary modules
from modules import *
import os
import mlflow

In [None]:
#setup mlflow
mlflow_tracking_uri = "../../mlflow/experiments"
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment("TrendFinderOptmizer")
mlflow.dspy.autolog()

In [5]:
#setup dspy
api_key = os.environ['paul2']
lm = dspy.LM('gemini/gemini-2.5-flash', api_key=api_key, max_tokens=8000)
dspy.configure(lm=lm)

In [None]:
analyze_trends = trend_analyzer()

## DSPy Optimization for doc_analyzer

Since we don't have labeled training data, we'll use DSPy's signature optimization and prompt engineering techniques to improve the `doc_analyzer` signature performance.

In [None]:
# Import optimization modules
import dspy
from dspy.teleprompt import BootstrapFewShot, SignatureOptimizer
from dspy.evaluate import Evaluate
import json
import pandas as pd

In [None]:
# Create synthetic examples for different document types and categories
# This helps DSPy understand the expected input/output patterns

def create_synthetic_examples():
    examples = []
    
    # Example 1: Financial document analysis
    examples.append(dspy.Example(
        document="Sample financial report with revenue: $50,000, expenses: $30,000, profit: $20,000",
        categories=["revenue", "expenses", "profit"],
        in_csv="",
        last_context="Analyzing quarterly financial reports",
        next_context="Found consistent profit margins across documents",
        out_csv="document,revenue,expenses,profit\nfinancial_report.pdf,$50000,$30000,$20000"
    ))
    
    # Example 2: Employee data analysis
    examples.append(dspy.Example(
        document="Employee record: John Doe, Department: Engineering, Salary: $75,000, Years: 3",
        categories=["name", "department", "salary", "experience"],
        in_csv="",
        last_context="Processing employee database for HR analysis",
        next_context="Engineering department shows higher average salaries",
        out_csv="document,name,department,salary,experience\nemployee_data.csv,John Doe,Engineering,$75000,3 years"
    ))
    
    # Example 3: Inspection report (similar to your use case)
    examples.append(dspy.Example(
        document="Inspection Report ID: 32037, Violations: 2, Details: Missing safety equipment, Improper storage",
        categories=["document", "number of violations", "list and details of violations"],
        in_csv="",
        last_context="Analyzing safety inspection reports for compliance trends",
        next_context="Safety equipment violations are common across multiple sites",
        out_csv="document,number of violations,list and details of violations\n32037_inspection.pdf,2,Missing safety equipment; Improper storage"
    ))
    
    return examples

synthetic_examples = create_synthetic_examples()
print(f"Created {len(synthetic_examples)} synthetic examples")

In [None]:
# Define evaluation metrics for the doc_analyzer
def evaluate_doc_analyzer_output(example, pred, trace=None):
    """Evaluate the quality of doc_analyzer output"""
    score = 0.0
    max_score = 4.0
    
    # Check if CSV format is valid
    try:
        import io
        pd.read_csv(io.StringIO(pred.out_csv))
        score += 1.0  # Valid CSV format
    except:
        pass
    
    # Check if context is updated (not identical to input)
    if pred.next_context != example.last_context and len(pred.next_context.strip()) > 0:
        score += 1.0
    
    # Check if all categories are addressed in CSV
    try:
        csv_data = pd.read_csv(io.StringIO(pred.out_csv))
        csv_columns = set(csv_data.columns.tolist())
        expected_categories = set(example.categories)
        if expected_categories.issubset(csv_columns):
            score += 1.0
    except:
        pass
    
    # Check if output contains meaningful data (not just N/A)
    if "N/A" not in pred.out_csv or pred.out_csv.count("N/A") < len(example.categories):
        score += 1.0
    
    return score / max_score

print("Evaluation metric defined")

In [None]:
# Optimize the doc_analyzer signature
# We'll use SignatureOptimizer to improve the prompt and reasoning

# Create an instance of the doc_analyzer module
doc_analyzer_module = dspy.ChainOfThought(doc_analyzer)

# Set up the optimizer
optimizer = SignatureOptimizer(
    metric=evaluate_doc_analyzer_output,
    breadth=10,  # Number of candidate prompts to generate
    depth=2,     # Number of optimization rounds
    init_temperature=1.4,
    verbose=True
)

print("Starting signature optimization...")
print("This may take several minutes as it tests different prompt variations.")

In [None]:
# Run the optimization with MLflow tracking
with mlflow.start_run(run_name="doc_analyzer_optimization") as run:
    # Log parameters
    mlflow.log_param("optimizer_type", "SignatureOptimizer")
    mlflow.log_param("num_examples", len(synthetic_examples))
    mlflow.log_param("breadth", 10)
    mlflow.log_param("depth", 2)
    
    # Optimize the signature
    optimized_module = optimizer.compile(
        doc_analyzer_module,
        trainset=synthetic_examples[:2],  # Use first 2 for training
        valset=synthetic_examples[2:],    # Use remaining for validation
    )
    
    # Log the optimized signature
    mlflow.log_text(str(optimized_module.signature), "optimized_signature.txt")
    
    print("Optimization completed!")
    print(f"Run ID: {run.info.run_id}")

In [None]:
# Test the optimized module with a sample
test_example = synthetic_examples[0]

print("Testing Original Module:")
print("=" * 40)
original_result = doc_analyzer_module(
    document=test_example.document,
    categories=test_example.categories,
    in_csv=test_example.in_csv,
    last_context=test_example.last_context
)
print(f"Context: {original_result.next_context}")
print(f"CSV: {original_result.out_csv}")
print(f"Score: {evaluate_doc_analyzer_output(test_example, original_result)}")

print("\nTesting Optimized Module:")
print("=" * 40)
optimized_result = optimized_module(
    document=test_example.document,
    categories=test_example.categories,
    in_csv=test_example.in_csv,
    last_context=test_example.last_context
)
print(f"Context: {optimized_result.next_context}")
print(f"CSV: {optimized_result.out_csv}")
print(f"Score: {evaluate_doc_analyzer_output(test_example, optimized_result)}")

In [None]:
# Save the optimized module for use in the main application
import pickle
import os

# Create optimized modules directory if it doesn't exist
os.makedirs("optimized_modules", exist_ok=True)

# Save the optimized module
with open("optimized_modules/optimized_doc_analyzer.pkl", "wb") as f:
    pickle.dump(optimized_module, f)

# Also save the signature as text for inspection
with open("optimized_modules/optimized_signature.txt", "w") as f:
    f.write(str(optimized_module.signature))

print("Optimized module saved!")
print("You can load it in your main application using:")
print("with open('optimized_modules/optimized_doc_analyzer.pkl', 'rb') as f:")
print("    optimized_doc_analyzer = pickle.load(f)")

In [None]:
# Test the optimized module with your actual documents
print("Testing with real documents from your training data...")

# Use the same setup as in your test.py
categories = [
    "document", "number of violations", "list and details of violations"
]

documents = []
documents.append(
    Attachments(
        "TrainingData/32037 ALR10C6BZ 097 07-10-2025 INSPR AEPACS NA.pdf"))

# Create an optimized trend analyzer
class optimized_trend_analyzer(dspy.Module):
    def __init__(self, optimized_doc_analyzer):
        super().__init__()
        self.doc_analyzer_sql = optimized_doc_analyzer

    def forward(self, documents: list[Attachments], categories: list[str], context: str):
        doc_summary = ""
        for document in documents:
            result = self.doc_analyzer_sql(
                document=document,
                categories=categories,
                in_csv=doc_summary,
                last_context=context
            )
            context = result.next_context
            doc_summary = result.out_csv
        return doc_summary, context

# Test with optimized module
optimized_analyzer = optimized_trend_analyzer(optimized_module)
result, context = optimized_analyzer(
    documents=documents[:1],  # Test with first document
    categories=categories,
    context=""
)

print("Optimized Result:")
print(f"CSV Output: {result}")
print(f"Context: {context}")

## Optimization Summary

This notebook optimizes the `doc_analyzer` signature using:

1. **Synthetic Examples**: Created representative examples for different document types
2. **SignatureOptimizer**: Improved the prompt and reasoning chains
3. **Custom Metrics**: Evaluated CSV format, context updates, and data extraction quality
4. **MLflow Tracking**: Logged optimization experiments for comparison

The optimized module should perform better at:
- Extracting relevant information from documents
- Generating proper CSV format
- Maintaining meaningful context across documents
- Handling various document types and categories

To use the optimized module in production, load the saved pickle file and replace the original `doc_analyzer` in your `trend_analyzer` class.