# Good Demo: Modular DSPy Agent Architecture

## Production-Grade Analytics with Semantic Layer

This notebook demonstrates the **correct** way to build LLM-powered analytics:

### Architecture Principles

1. **Semantic Layer First**: All metrics defined in `config/semantic.yml` with tested SQL
2. **Modular Agents**: Small, focused, testable components with clear contracts
3. **Local-First Logic**: Deterministic rules wherever possible; LLM only for ambiguity
4. **DSPy Signatures**: Structured prompts that are declarative and optimizable
5. **Observability**: Every decision logged with run_id, timings, and provenance

### Business Question

"Which channel mix change is most likely to improve CAC next month, given a recent anomaly in referral traffic?"

### Agent Pipeline

```
Question → Triage → Text-to-Semantic → Metric Compilation → Execution → 
          Hypothesis Simulation → Narration → Observability
```

## 1. Bootstrap: Environment and Database

In [None]:
# Core imports
import sys
import os
from pathlib import Path

# Add utils to path
sys.path.insert(0, str(Path.cwd()))

# Standard libraries
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, Any, List, Optional

# Our utilities
from utils import (
    load_environment,
    get_db_connection,
    validate_schema,
    SemanticLayer,
    plot_channel_metric,
    RunRecord
)

# DSPy and OpenAI
import dspy
from openai import OpenAI

print("✓ Imports successful")

In [None]:
# Load environment
env_config = load_environment()
print("Environment loaded:")
print(f"  Model: {env_config['OPENAI_MODEL']}")
print(f"  DB Path: {env_config['DB_PATH']}")
print(f"  API Key: {env_config['OPENAI_API_KEY'][:8]}...")

# Configure OpenAI
openai_client = OpenAI(api_key=env_config['OPENAI_API_KEY'])

# Configure DSPy
lm = dspy.LM(f"openai/{env_config['OPENAI_MODEL']}", api_key=env_config['OPENAI_API_KEY'])
dspy.settings.configure(lm=lm)

print("\n✓ LLM configured")

In [None]:
# Connect to database (read-only mode)
db_path = env_config['DB_PATH']
conn = get_db_connection(db_path, read_only=True)
print(f"✓ Connected to database: {db_path}")

# Validate schema
schema_validation = validate_schema(conn)
print(f"\n✓ Schema validation passed")
print(f"  Tables found: {len(schema_validation['tables'])}")
for table in sorted(schema_validation['tables']):
    print(f"    - {table}")

In [None]:
# Set defaults for the session
DEFAULT_WINDOW_DAYS = 90
DEFAULT_LIMIT = 1000
OFFLINE_MODE = False  # Set to True to disable LLM calls for rehearsals

print("Session defaults:")
print(f"  Window: {DEFAULT_WINDOW_DAYS} days")
print(f"  Limit: {DEFAULT_LIMIT} rows")
print(f"  Offline mode: {OFFLINE_MODE}")

## 2. Semantic Catalogue

Load the semantic layer that defines:
- Canonical dimensions (channel, campaign_name, device, region)
- Safe base queries (spend, conversions, revenue by channel)
- Derived metrics (ROAS, CAC by channel)
- Join rules (revenue attribution path)

In [None]:
# Load semantic layer
semantic = SemanticLayer('../config/semantic.yml')
print("✓ Semantic layer loaded")
print(f"  Spec version: {semantic.config.get('version')}")
print(f"  Spec hash: {semantic.spec_hash}")
print(f"\n{semantic.describe_catalogue()}")

In [None]:
# Show key metric definitions
print("\nCanonical Metric Definitions:")
print("=" * 60)
for metric, formula in semantic.get_metric_definitions().items():
    print(f"  {metric:15} = {formula}")

In [None]:
# Show join rules (critical for correctness)
print("\nJoin Rules (Revenue Attribution):")
print("=" * 60)
join_rules = semantic.get_join_rules()
revenue_rule = join_rules.get('revenue_attribution', {})
print(f"Rule: {revenue_rule.get('description')}")
print("Path:")
for step in revenue_rule.get('path', []):
    print(f"  → {step}")
print("\n⚠️ This is enforced in semantic.yml queries; LLM cannot override it")

## 3. Agent Architecture with DSPy

Define agents as **DSPy signatures** with clear input/output contracts.

### Agent Roles

1. **TriageAgent**: Classify question type (search vs analysis)
2. **TextToSemanticAgent**: Map natural language to semantic request
3. **MetricRunner**: Compile and execute queries (deterministic)
4. **HypothesisAgent**: Simulate budget scenarios (deterministic)
5. **NarratorAgent**: Generate decision memo with constraints

### Design Principles

- **Local rules first**: Use keywords/templates before LLM
- **Fallback only**: LLM called only when confidence < threshold
- **Validated outputs**: All LLM outputs validated against semantic layer
- **Observable**: Every decision logged with confidence and reasoning

In [None]:
# DSPy Signatures

class TriageSignature(dspy.Signature):
    """Classify a user question as search or analysis."""
    question: str = dspy.InputField(desc="User's business question")
    mode: str = dspy.OutputField(desc="Either 'search' or 'analysis'")
    confidence: float = dspy.OutputField(desc="Confidence score 0-1")
    reason: str = dspy.OutputField(desc="Brief explanation of classification")


class TextToSemanticSignature(dspy.Signature):
    """Map natural language question to semantic request."""
    question: str = dspy.InputField(desc="User's business question")
    available_metrics: str = dspy.InputField(desc="List of available metrics")
    available_dimensions: str = dspy.InputField(desc="List of available dimensions")
    metric: str = dspy.OutputField(desc="Selected metric name")
    dimensions: str = dspy.OutputField(desc="Comma-separated dimension names")
    filters: str = dspy.OutputField(desc="Optional filters as JSON")
    window_days: int = dspy.OutputField(desc="Time window in days")


class NarratorSignature(dspy.Signature):
    """Generate concise decision memo from analysis results."""
    question: str = dspy.InputField(desc="Original business question")
    metrics_used: str = dspy.InputField(desc="Metrics that were queried")
    key_findings: str = dspy.InputField(desc="Key data findings")
    recommendation: str = dspy.InputField(desc="Proposed action with confidence interval")
    memo: str = dspy.OutputField(desc="Decision memo (max 150 words) with risks and next steps")


print("✓ DSPy signatures defined")

In [None]:
# Agent Implementations

class TriageAgent:
    """Classify questions as search or analysis."""
    
    def __init__(self, offline_mode: bool = False):
        self.offline_mode = offline_mode
        self.predictor = dspy.Predict(TriageSignature)
        
        # Local keyword rules
        self.analysis_keywords = [
            'cac', 'roas', 'improve', 'optimize', 'compare', 'which',
            'recommend', 'should', 'best', 'worst', 'trend', 'anomaly'
        ]
        self.search_keywords = [
            'what is', 'show me', 'list', 'find', 'get', 'display'
        ]
    
    def __call__(self, question: str) -> Dict[str, Any]:
        question_lower = question.lower()
        
        # Try local rules first
        analysis_score = sum(1 for kw in self.analysis_keywords if kw in question_lower)
        search_score = sum(1 for kw in self.search_keywords if kw in question_lower)
        
        if analysis_score > search_score and analysis_score >= 2:
            return {
                'mode': 'analysis',
                'confidence': 0.9,
                'reason': f'Matched {analysis_score} analysis keywords',
                'method': 'local_rules'
            }
        elif search_score > analysis_score and search_score >= 2:
            return {
                'mode': 'search',
                'confidence': 0.9,
                'reason': f'Matched {search_score} search keywords',
                'method': 'local_rules'
            }
        
        # Fallback to LLM
        if self.offline_mode:
            return {
                'mode': 'analysis',
                'confidence': 0.5,
                'reason': 'Default in offline mode',
                'method': 'offline_default'
            }
        
        result = self.predictor(question=question)
        return {
            'mode': result.mode,
            'confidence': float(result.confidence),
            'reason': result.reason,
            'method': 'dspy_fallback'
        }


class TextToSemanticAgent:
    """Map natural language to semantic request."""
    
    def __init__(self, semantic_layer: SemanticLayer, offline_mode: bool = False):
        self.semantic = semantic_layer
        self.offline_mode = offline_mode
        self.predictor = dspy.Predict(TextToSemanticSignature)
        
        # Template mappings (utterance → semantic request)
        self.templates = {
            'cac by channel': {
                'metric': 'cac_by_channel',
                'dimensions': ['channel'],
                'window_days': 90
            },
            'roas by channel': {
                'metric': 'roas_by_channel',
                'dimensions': ['channel'],
                'window_days': 90
            },
            'channel performance': {
                'metric': 'roas_by_channel',
                'dimensions': ['channel'],
                'window_days': 90
            }
        }
    
    def __call__(self, question: str) -> Dict[str, Any]:
        question_lower = question.lower()
        
        # Try template matching first
        for template_key, semantic_req in self.templates.items():
            if template_key in question_lower:
                return {
                    'metric': semantic_req['metric'],
                    'dimensions': semantic_req['dimensions'],
                    'filters': {},
                    'window_days': semantic_req['window_days'],
                    'method': 'template_match',
                    'matched_template': template_key
                }
        
        # For complex questions, we may need multiple metrics
        # Check if question mentions CAC optimization
        if 'cac' in question_lower and ('improve' in question_lower or 'optimize' in question_lower):
            # Need both CAC and ROAS for optimization
            return {
                'metric': 'cac_by_channel',  # Primary metric
                'secondary_metrics': ['roas_by_channel'],
                'dimensions': ['channel'],
                'filters': {},
                'window_days': 90,
                'method': 'pattern_match',
                'reason': 'CAC optimization requires both CAC and ROAS by channel'
            }
        
        # Fallback to LLM with constraints
        if self.offline_mode:
            return {
                'metric': 'cac_by_channel',
                'dimensions': ['channel'],
                'filters': {},
                'window_days': 90,
                'method': 'offline_default'
            }
        
        available_metrics = ', '.join(self.semantic.list_available_metrics())
        available_dimensions = ', '.join(self.semantic.get_dimension_names())
        
        result = self.predictor(
            question=question,
            available_metrics=available_metrics,
            available_dimensions=available_dimensions
        )
        
        # Validate against semantic layer
        metric = result.metric.strip()
        if metric not in self.semantic.list_available_metrics():
            raise ValueError(
                f"LLM proposed unknown metric '{metric}'. "
                f"Available: {available_metrics}"
            )
        
        dimensions = [d.strip() for d in result.dimensions.split(',')]
        for dim in dimensions:
            if not self.semantic.validate_dimension(dim):
                raise ValueError(
                    f"LLM proposed unknown dimension '{dim}'. "
                    f"Available: {available_dimensions}"
                )
        
        return {
            'metric': metric,
            'dimensions': dimensions,
            'filters': {},
            'window_days': int(result.window_days),
            'method': 'dspy_constrained'
        }


class MetricRunner:
    """Compile and execute queries from semantic layer."""
    
    def __init__(self, db_conn, semantic_layer: SemanticLayer):
        self.conn = db_conn
        self.semantic = semantic_layer
    
    def __call__(self, metric: str, window_days: int, limit: int = 1000) -> Dict[str, Any]:
        # Compile query from semantic layer
        query_info = self.semantic.compile_query(metric, window_days, limit)
        
        # Execute with timing
        start_time = time.time()
        result_df = self.conn.execute(query_info['sql']).df()
        elapsed_ms = (time.time() - start_time) * 1000
        
        # Sanity checks
        row_count = len(result_df)
        
        return {
            'query_info': query_info,
            'df': result_df,
            'elapsed_ms': elapsed_ms,
            'row_count': row_count
        }


class HypothesisAgent:
    """Simulate budget reallocation scenarios."""
    
    def __init__(self, n_bootstrap: int = 1000):
        self.n_bootstrap = n_bootstrap
    
    def __call__(self, cac_df: pd.DataFrame, spend_col: str = 'spend',
                 cac_col: str = 'cac', channel_col: str = 'channel',
                 shift_percentage: float = 5.0) -> Dict[str, Any]:
        """
        Simulate budget shift from worst to best CAC channel.
        
        Args:
            cac_df: DataFrame with CAC by channel
            spend_col: Column name for spend
            cac_col: Column name for CAC
            channel_col: Column name for channel
            shift_percentage: Percentage points to shift (default 5pp)
        
        Returns:
            Dict with scenario results and confidence interval
        """
        # Filter valid CAC values
        valid_df = cac_df[cac_df[cac_col].notna()].copy()
        
        if len(valid_df) < 2:
            raise ValueError("Need at least 2 channels with valid CAC for hypothesis testing")
        
        # Sort by CAC (ascending = better)
        valid_df = valid_df.sort_values(cac_col)
        
        # Identify best and worst channels
        best_channel = valid_df.iloc[0][channel_col]
        worst_channel = valid_df.iloc[-1][channel_col]
        best_cac = valid_df.iloc[0][cac_col]
        worst_cac = valid_df.iloc[-1][cac_col]
        
        # Current blended CAC
        total_spend = valid_df[spend_col].sum()
        weights = valid_df[spend_col] / total_spend
        current_blended_cac = (weights * valid_df[cac_col]).sum()
        
        # Simulate shift: move 5pp from worst to best
        shift_fraction = shift_percentage / 100.0
        new_weights = weights.copy()
        
        best_idx = valid_df[valid_df[channel_col] == best_channel].index[0]
        worst_idx = valid_df[valid_df[channel_col] == worst_channel].index[0]
        
        new_weights[worst_idx] -= shift_fraction
        new_weights[best_idx] += shift_fraction
        
        # Projected CAC
        projected_cac = (new_weights * valid_df[cac_col]).sum()
        delta_cac = projected_cac - current_blended_cac
        
        # Bootstrap confidence interval
        bootstrap_deltas = []
        for _ in range(self.n_bootstrap):
            # Resample channels with replacement
            sample_df = valid_df.sample(n=len(valid_df), replace=True)
            sample_weights = sample_df[spend_col] / sample_df[spend_col].sum()
            
            current_sample = (sample_weights * sample_df[cac_col]).sum()
            
            # Apply same shift
            new_sample_weights = sample_weights.copy()
            sample_best_idx = sample_df[sample_df[cac_col] == sample_df[cac_col].min()].index[0]
            sample_worst_idx = sample_df[sample_df[cac_col] == sample_df[cac_col].max()].index[0]
            
            new_sample_weights[sample_worst_idx] -= shift_fraction
            new_sample_weights[sample_best_idx] += shift_fraction
            
            projected_sample = (new_sample_weights * sample_df[cac_col]).sum()
            bootstrap_deltas.append(projected_sample - current_sample)
        
        # 95% CI
        ci_lower = current_blended_cac + np.percentile(bootstrap_deltas, 2.5)
        ci_upper = current_blended_cac + np.percentile(bootstrap_deltas, 97.5)
        
        return {
            'best_channel': best_channel,
            'best_cac': float(best_cac),
            'worst_channel': worst_channel,
            'worst_cac': float(worst_cac),
            'current_blended_cac': float(current_blended_cac),
            'shift_percentage': shift_percentage,
            'projected_blended_cac': float(projected_cac),
            'delta_cac': float(delta_cac),
            'ci_lower': float(ci_lower),
            'ci_upper': float(ci_upper),
            'n_bootstrap': self.n_bootstrap,
            'recommendation': f"Shift {shift_percentage}pp budget from {worst_channel} to {best_channel}"
        }


class NarratorAgent:
    """Generate decision memo from analysis results."""
    
    def __init__(self, offline_mode: bool = False):
        self.offline_mode = offline_mode
        self.predictor = dspy.Predict(NarratorSignature)
    
    def __call__(self, question: str, metrics_used: List[str],
                 key_findings: str, recommendation: str) -> Dict[str, str]:
        
        if self.offline_mode:
            memo = f"""Based on analysis of {', '.join(metrics_used)}, {key_findings}. 
            {recommendation}. 
            Risks: (1) Data quality assumptions, (2) Attribution model simplifications. 
            Next steps: (1) Pilot small shift, (2) Monitor for 14 days."""
            return {'memo': memo, 'word_count': len(memo.split())}
        
        result = self.predictor(
            question=question,
            metrics_used=', '.join(metrics_used),
            key_findings=key_findings,
            recommendation=recommendation
        )
        
        memo = result.memo
        word_count = len(memo.split())
        
        # Validate constraints
        if word_count > 200:
            # Truncate to ~150 words
            words = memo.split()[:150]
            memo = ' '.join(words) + '...'
            word_count = 150
        
        # Check that it references at least one metric
        memo_lower = memo.lower()
        metric_referenced = any(m.lower() in memo_lower for m in metrics_used)
        if not metric_referenced:
            memo = f"[Metrics: {', '.join(metrics_used)}] " + memo
        
        return {
            'memo': memo,
            'word_count': word_count,
            'constraints_met': word_count <= 200 and metric_referenced
        }


print("✓ Agent implementations complete")

In [None]:
# Initialize agents
triage_agent = TriageAgent(offline_mode=OFFLINE_MODE)
text_to_semantic_agent = TextToSemanticAgent(semantic, offline_mode=OFFLINE_MODE)
metric_runner = MetricRunner(conn, semantic)
hypothesis_agent = HypothesisAgent(n_bootstrap=1000)
narrator_agent = NarratorAgent(offline_mode=OFFLINE_MODE)

print("✓ All agents initialized")

In [None]:
# Initialize observability
run_record = RunRecord(
    model_name=env_config['OPENAI_MODEL'],
    semantic_spec_hash=semantic.spec_hash
)

print(f"✓ Run record initialized: {run_record.run_id}")

## 4. Triage: Classify Question Type

In [None]:
# Business question
business_question = """Which channel mix change is most likely to improve CAC next month, 
given a recent anomaly in referral traffic?"""

print(f"Question: {business_question}")
print("\nRunning triage...")

triage_result = triage_agent(business_question)
run_record.record_triage(triage_result)

print("\nTriage Result:")
print("=" * 60)
for key, value in triage_result.items():
    print(f"  {key}: {value}")

assert triage_result['mode'] == 'analysis', "Expected analysis mode for this question"
print("\n✓ Triage complete: mode=analysis")

## 5. Semantic Mapping: NL → Structured Request

In [None]:
print("Mapping question to semantic request...")

semantic_request = text_to_semantic_agent(business_question)
run_record.record_semantic_request(semantic_request)

print("\nSemantic Request:")
print("=" * 60)
for key, value in semantic_request.items():
    print(f"  {key}: {value}")

print("\n✓ Semantic mapping complete")

## 6. Metric Compilation and Execution

Compile SQL from semantic layer (no LLM) and execute safely.

In [None]:
# We need both CAC and ROAS by channel for informed decision
metrics_to_run = ['cac_by_channel', 'roas_by_channel']
window_days = semantic_request.get('window_days', DEFAULT_WINDOW_DAYS)

results = {}

for metric in metrics_to_run:
    print(f"\nCompiling and executing: {metric}")
    result = metric_runner(metric, window_days)
    
    # Record in observability
    run_record.record_query(result['query_info'])
    run_record.record_execution(
        result['query_info']['query_id'],
        result['elapsed_ms'],
        result['row_count']
    )
    
    results[metric] = result
    
    print(f"  Query ID: {result['query_info']['query_id']}")
    print(f"  Execution time: {result['elapsed_ms']:.2f} ms")
    print(f"  Rows returned: {result['row_count']}")

print("\n✓ All metrics executed")

In [None]:
# Display CAC by channel
cac_df = results['cac_by_channel']['df']
print("\nCAC by Channel:")
print("=" * 60)
print(cac_df.to_string(index=False))

# Plot
fig1 = plot_channel_metric(
    cac_df,
    channel_col='channel',
    metric_col='cac',
    title=f'CAC by Channel (Last {window_days} Days)',
    ylabel='CAC ($)'
)
plt.show()

# Save
output_path = Path('./outputs')
output_path.mkdir(exist_ok=True)
fig1.savefig(output_path / f"{run_record.run_id}_cac_by_channel.png", dpi=100, bbox_inches='tight')
run_record.add_artifact('chart', str(output_path / f"{run_record.run_id}_cac_by_channel.png"))

In [None]:
# Display ROAS by channel
roas_df = results['roas_by_channel']['df']
print("\nROAS by Channel:")
print("=" * 60)
print(roas_df.to_string(index=False))

# Plot
fig2 = plot_channel_metric(
    roas_df,
    channel_col='channel',
    metric_col='roas',
    title=f'ROAS by Channel (Last {window_days} Days)',
    ylabel='ROAS (X)'
)
plt.show()

# Save
fig2.savefig(output_path / f"{run_record.run_id}_roas_by_channel.png", dpi=100, bbox_inches='tight')
run_record.add_artifact('chart', str(output_path / f"{run_record.run_id}_roas_by_channel.png"))

## 7. Hypothesis Simulation

Test budget reallocation: shift 5pp from worst to best CAC channel.

In [None]:
print("Running hypothesis simulation...\n")

hypothesis_result = hypothesis_agent(cac_df)

# Record
hypothesis_params = {
    'shift_percentage': hypothesis_result['shift_percentage'],
    'n_bootstrap': hypothesis_result['n_bootstrap']
}
run_record.record_hypothesis(hypothesis_params, hypothesis_result)

print("Hypothesis Results:")
print("=" * 60)
print(f"Best channel: {hypothesis_result['best_channel']} (CAC: ${hypothesis_result['best_cac']:.2f})")
print(f"Worst channel: {hypothesis_result['worst_channel']} (CAC: ${hypothesis_result['worst_cac']:.2f})")
print(f"\nCurrent blended CAC: ${hypothesis_result['current_blended_cac']:.2f}")
print(f"Projected blended CAC: ${hypothesis_result['projected_blended_cac']:.2f}")
print(f"Expected delta: ${hypothesis_result['delta_cac']:.2f}")
print(f"\n95% Confidence Interval: [${hypothesis_result['ci_lower']:.2f}, ${hypothesis_result['ci_upper']:.2f}]")
print(f"\nRecommendation: {hypothesis_result['recommendation']}")

print("\n✓ Hypothesis simulation complete")

In [None]:
# Plot hypothesis
from utils.plotting import plot_hypothesis_comparison

fig3 = plot_hypothesis_comparison(
    current_cac=hypothesis_result['current_blended_cac'],
    projected_cac=hypothesis_result['projected_blended_cac'],
    ci_lower=hypothesis_result['ci_lower'],
    ci_upper=hypothesis_result['ci_upper'],
    title='Projected CAC Impact with 95% CI'
)
plt.show()

# Save
fig3.savefig(output_path / f"{run_record.run_id}_hypothesis.png", dpi=100, bbox_inches='tight')
run_record.add_artifact('chart', str(output_path / f"{run_record.run_id}_hypothesis.png"))

## 8. Narration: Generate Decision Memo

In [None]:
# Prepare inputs for narrator
metrics_used = [r['query_info']['query_id'] for r in results.values()]

key_findings = f"""
Analyzed {len(cac_df)} channels over {window_days} days. 
Best CAC: {hypothesis_result['best_channel']} at ${hypothesis_result['best_cac']:.2f}. 
Worst CAC: {hypothesis_result['worst_channel']} at ${hypothesis_result['worst_cac']:.2f}. 
Current blended CAC: ${hypothesis_result['current_blended_cac']:.2f}.
""".strip()

recommendation_text = f"""
Shift {hypothesis_result['shift_percentage']}pp budget from {hypothesis_result['worst_channel']} 
to {hypothesis_result['best_channel']}. 
Projected CAC: ${hypothesis_result['projected_blended_cac']:.2f} 
(95% CI: [${hypothesis_result['ci_lower']:.2f}, ${hypothesis_result['ci_upper']:.2f}]).
""".strip()

print("Generating decision memo...\n")

narration_result = narrator_agent(
    question=business_question,
    metrics_used=metrics_used,
    key_findings=key_findings,
    recommendation=recommendation_text
)

# Record
run_record.record_narration(narration_result['memo'])

print("Decision Memo:")
print("=" * 60)
print(narration_result['memo'])
print("\n" + "=" * 60)
print(f"Word count: {narration_result['word_count']}")
print(f"Constraints met: {narration_result.get('constraints_met', 'N/A')}")

print("\n✓ Narration complete")

## 9. Observability: Run Record

In [None]:
# Finalize run record
run_record.finalize()

# Display
print("Run Record:")
print("=" * 60)
print(run_record.to_json())
print("\n" + "=" * 60)

# Save to file
saved_path = run_record.save(output_dir='./outputs')
print(f"\n✓ Run record saved: {saved_path}")

# Summary
print("\n" + run_record.summary())

## 10. Inline Tests and Smoke Checks

In [None]:
print("Running inline tests...\n")

# Test 1: Schema presence
try:
    schema_check = validate_schema(conn)
    run_record.record_test('schema_validation', True, 'All required tables and columns present')
    print("✓ Test 1: Schema validation passed")
except Exception as e:
    run_record.record_test('schema_validation', False, str(e))
    print(f"✗ Test 1: Schema validation failed: {e}")

# Test 2: SQL compilation for derived metrics
try:
    for metric in ['roas_by_channel', 'cac_by_channel']:
        compiled = semantic.compile_query(metric, DEFAULT_WINDOW_DAYS)
        assert 'sql' in compiled
        assert len(compiled['sql']) > 0
    run_record.record_test('sql_compilation', True, 'Both derived metrics compile successfully')
    print("✓ Test 2: SQL compilation passed")
except Exception as e:
    run_record.record_test('sql_compilation', False, str(e))
    print(f"✗ Test 2: SQL compilation failed: {e}")

# Test 3: No cartesian explosion (channel count sanity)
try:
    expected_max_channels = 10  # Reasonable upper bound
    for metric, result in results.items():
        row_count = result['row_count']
        assert row_count <= expected_max_channels, f"{metric} returned {row_count} rows (expected <= {expected_max_channels})"
    run_record.record_test('no_cartesian_explosion', True, f'All queries returned <= {expected_max_channels} rows')
    print("✓ Test 3: No cartesian explosion")
except Exception as e:
    run_record.record_test('no_cartesian_explosion', False, str(e))
    print(f"✗ Test 3: Cartesian explosion detected: {e}")

# Test 4: Triage accuracy on canned queries
try:
    test_cases = [
        ('What is the CAC by channel?', 'analysis'),
        ('Show me campaign performance', 'analysis'),
        ('Which channel should I optimize?', 'analysis'),
        ('List all campaigns', 'search'),
    ]
    correct = 0
    for query, expected_mode in test_cases:
        result = triage_agent(query)
        if result['mode'] == expected_mode:
            correct += 1
    
    accuracy = correct / len(test_cases)
    threshold = 0.75
    passed = accuracy >= threshold
    run_record.record_test('triage_accuracy', passed, f'Accuracy: {accuracy:.2%} (threshold: {threshold:.2%})')
    if passed:
        print(f"✓ Test 4: Triage accuracy {accuracy:.2%} >= {threshold:.2%}")
    else:
        print(f"✗ Test 4: Triage accuracy {accuracy:.2%} < {threshold:.2%}")
except Exception as e:
    run_record.record_test('triage_accuracy', False, str(e))
    print(f"✗ Test 4: Triage accuracy test failed: {e}")

# Test 5: Narration lint (length and metric reference)
try:
    memo = narration_result['memo']
    word_count = narration_result['word_count']
    
    # Check word count
    assert word_count <= 200, f"Narration too long: {word_count} words"
    
    # Check metric reference
    memo_lower = memo.lower()
    metric_referenced = any(m.lower() in memo_lower for m in metrics_used) or 'cac' in memo_lower or 'roas' in memo_lower
    assert metric_referenced, "Narration does not reference any metrics"
    
    run_record.record_test('narration_lint', True, f'Word count: {word_count}, metric referenced: {metric_referenced}')
    print(f"✓ Test 5: Narration lint passed ({word_count} words, metric referenced)")
except AssertionError as e:
    run_record.record_test('narration_lint', False, str(e))
    print(f"✗ Test 5: Narration lint failed: {e}")
except Exception as e:
    run_record.record_test('narration_lint', False, str(e))
    print(f"✗ Test 5: Narration lint error: {e}")

# Summary
total_tests = len(run_record.test_results)
passed_tests = sum(1 for t in run_record.test_results.values() if t['passed'])
print(f"\n{'=' * 60}")
print(f"Tests passed: {passed_tests}/{total_tests}")
print(f"{'=' * 60}")

## Summary: What We Built

### Architecture Components

1. **Semantic Layer** (`config/semantic.yml`)
   - Canonical metric definitions
   - Safe, tested base queries
   - Join rules enforced
   - Versioned with hash

2. **Modular Agents** (DSPy signatures)
   - TriageAgent: Local rules + LLM fallback
   - TextToSemanticAgent: Template matching + constrained LLM
   - MetricRunner: Deterministic SQL compilation
   - HypothesisAgent: Bootstrap confidence intervals
   - NarratorAgent: Structured output with constraints

3. **Observability**
   - Unique run_id for every execution
   - All decisions logged with provenance
   - Timings, row counts, SQL IDs tracked
   - Artifacts (charts, data) referenced
   - Test results recorded

### Key Differences from Bad Demo

| Bad Demo | Good Demo |
|----------|----------|
| One-shot LLM on raw data | Modular agents with semantic layer |
| No validation | Schema validation, sanity checks, tests |
| Wrong joins, time windows | Enforced in semantic.yml |
| No confidence intervals | Bootstrap CI for projections |
| Overconfident narratives | Constrained output with risks |
| No reproducibility | Full observability with run_id |

### Production-Ready Features

- ✓ All data access through semantic layer
- ✓ LLM used only for ambiguity resolution
- ✓ Deterministic logic wherever possible
- ✓ Comprehensive testing and validation
- ✓ Full observability and reproducibility
- ✓ Quantified uncertainty (confidence intervals)
- ✓ Clear decision memos with risks and next steps

In [None]:
# Cleanup
conn.close()
print("\n✓ Demo complete. Database connection closed.")
print(f"\nAll artifacts saved to: {output_path.absolute()}")