# End-to-End Event-Driven Pipeline Test

This notebook tests the complete event-driven trigger chain:

```
Bronze Ingestion (manual trigger)
       ‚Üì
   [Completes]
       ‚Üì
Silver Transformation (auto-triggered)
       ‚Üì
   [Completes]
       ‚Üì
Gold Transformation (auto-triggered)
```

**Purpose**: Verify that the event-driven architecture works correctly:
- Bronze completes successfully
- Silver auto-triggers and completes
- Gold auto-triggers and completes
- All data flows through the complete pipeline

**Note**: This uses actual Prefect deployments and monitors them in real-time.

In [None]:
%load_ext autoreload
%autoreload 2

import asyncio
import time
from datetime import datetime, timedelta
from typing import Optional

from prefect import get_client
from prefect.client.schemas.filters import (
    DeploymentFilter,
    FlowRunFilter,
)
from prefect.client.schemas.sorting import FlowRunSort

print("‚úì All imports successful")
print("\nThis notebook tests the event-driven trigger chain:")
print("  1. Manually trigger Bronze deployment")
print("  2. Monitor Bronze completion")
print("  3. Verify Silver auto-triggers")
print("  4. Monitor Silver completion")
print("  5. Verify Gold auto-triggers")
print("  6. Monitor Gold completion")
print("  7. Verify complete data flow")

## Step 1: Verify Deployments Exist

Check that all required deployments are configured with triggers.

In [None]:
async def check_deployments():
    """Verify all required deployments exist."""
    async with get_client() as client:
        deployments = await client.read_deployments()
        
        deployment_names = {d.name: d for d in deployments}
        
        required = [
            "bronze-ingestion-all-states-daily",
            "silver-transformation-all-states-triggered",
            "gold-transformation-all-states-triggered",
        ]
        
        print("=" * 60)
        print("DEPLOYMENT CHECK")
        print("=" * 60)
        
        all_exist = True
        for name in required:
            if name in deployment_names:
                dep = deployment_names[name]
                print(f"‚úì {name}")
                print(f"  ID: {dep.id}")
            else:
                print(f"‚úó {name} - NOT FOUND")
                all_exist = False
        
        print("=" * 60)
        
        if not all_exist:
            print("\n‚ö†Ô∏è  WARNING: Missing deployments!")
            print("Run: poetry run python scripts/deploy.py --all")
            return False
        
        print("\n‚úì All required deployments exist")
        return True

# Run check
deployments_ok = await check_deployments()

## Step 2: Trigger Bronze Ingestion

Manually trigger the Bronze deployment to start the cascade.

**Note**: This will run actual extraction from the FEC API. To limit scope for testing, we use a small committee list.

In [None]:
async def trigger_bronze_deployment(committee_ids: Optional[list[str]] = None):
    """Trigger bronze deployment and return flow run ID."""
    async with get_client() as client:
        # Find bronze deployment
        deployments = await client.read_deployments(
            deployment_filter=DeploymentFilter(
                name={"any_": ["bronze-ingestion-all-states-daily"]}
            )
        )
        
        if not deployments:
            print("‚úó Bronze deployment not found!")
            return None
        
        deployment = deployments[0]
        
        # Trigger with test parameters
        parameters = {
            "election_cycle": 2026,
            "full_refresh": False,
        }
        
        # Limit to specific committees for fast testing
        if committee_ids:
            parameters["committee_ids"] = committee_ids
        
        flow_run = await client.create_flow_run_from_deployment(
            deployment.id,
            parameters=parameters,
        )
        
        print("=" * 60)
        print("BRONZE DEPLOYMENT TRIGGERED")
        print("=" * 60)
        print(f"Flow Run ID: {flow_run.id}")
        print(f"Flow Run Name: {flow_run.name}")
        print(f"State: {flow_run.state.type}")
        
        if committee_ids:
            print(f"\nTest scope: {len(committee_ids)} committees")
        else:
            print("\nScope: ALL 51 states (will take ~2-4 hours)")
        
        print("=" * 60)
        
        return flow_run.id

# Trigger bronze with small test scope
# Use committee IDs that we know have data
test_committees = [
    "C00840017",  # ALSOBROOKS FOR SENATE
    "C00435974",  # ANDY HARRIS FOR CONGRESS
]

bronze_run_id = await trigger_bronze_deployment(committee_ids=test_committees)

## Step 3: Monitor Bronze Completion

Wait for Bronze to complete and check its status.

In [None]:
async def monitor_flow_run(flow_run_id: str, flow_name: str, timeout: int = 600):
    """Monitor a flow run until completion or timeout."""
    async with get_client() as client:
        print(f"\n{'=' * 60}")
        print(f"MONITORING {flow_name.upper()}")
        print("=" * 60)
        
        start_time = time.time()
        last_state = None
        
        while time.time() - start_time < timeout:
            flow_run = await client.read_flow_run(flow_run_id)
            
            # Print state changes
            if flow_run.state.type != last_state:
                elapsed = int(time.time() - start_time)
                print(f"[{elapsed}s] State: {flow_run.state.type}")
                last_state = flow_run.state.type
            
            # Check if completed
            if flow_run.state.is_completed():
                elapsed = int(time.time() - start_time)
                print(f"\n‚úì {flow_name} completed in {elapsed}s")
                print(f"  Final state: {flow_run.state.type}")
                
                if flow_run.state.is_failed():
                    print(f"  ‚úó FAILED: {flow_run.state.message}")
                    return False
                
                return True
            
            # Wait before next check
            await asyncio.sleep(5)
        
        print(f"\n‚è±Ô∏è  Timeout after {timeout}s")
        return False

# Monitor bronze completion
bronze_success = await monitor_flow_run(
    bronze_run_id,
    "Bronze Ingestion",
    timeout=600  # 10 minutes should be enough for 2 committees
)

## Step 4: Verify Silver Auto-Triggered

Check that Silver transformation was automatically triggered by Bronze completion.

In [None]:
async def find_triggered_flow_run(deployment_name: str, after_time: datetime, timeout: int = 60):
    """Find a flow run that was triggered after a specific time."""
    async with get_client() as client:
        print(f"\nLooking for triggered run of: {deployment_name}")
        print(f"After: {after_time}")
        
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            # Get recent flow runs
            flow_runs = await client.read_flow_runs(
                flow_run_filter=FlowRunFilter(
                    start_time={"after_": after_time}
                ),
                sort=FlowRunSort.START_TIME_DESC,
                limit=20,
            )
            
            # Find matching deployment
            for run in flow_runs:
                if run.deployment_id:
                    deployment = await client.read_deployment(run.deployment_id)
                    if deployment.name == deployment_name:
                        print(f"\n‚úì Found triggered run: {run.name}")
                        print(f"  ID: {run.id}")
                        print(f"  Started: {run.start_time}")
                        print(f"  State: {run.state.type}")
                        return run.id
            
            # Wait and retry
            await asyncio.sleep(5)
        
        print(f"\n‚úó No triggered run found within {timeout}s")
        return None

# Wait for Silver to trigger
print("\nWaiting for Silver transformation to auto-trigger...")
bronze_completion_time = datetime.now() - timedelta(minutes=2)  # Look back 2 min

silver_run_id = await find_triggered_flow_run(
    "silver-transformation-all-states-triggered",
    after_time=bronze_completion_time,
    timeout=60
)

## Step 5: Monitor Silver Completion

Wait for Silver transformation to complete.

In [None]:
if silver_run_id:
    silver_success = await monitor_flow_run(
        silver_run_id,
        "Silver Transformation",
        timeout=300  # 5 minutes
    )
else:
    print("\n‚ö†Ô∏è  Cannot monitor Silver - run not found")
    silver_success = False

## Step 6: Verify Gold Auto-Triggered

Check that Gold transformation was automatically triggered by Silver completion.

In [None]:
if silver_success:
    print("\nWaiting for Gold transformation to auto-trigger...")
    silver_completion_time = datetime.now() - timedelta(minutes=2)
    
    gold_run_id = await find_triggered_flow_run(
        "gold-transformation-all-states-triggered",
        after_time=silver_completion_time,
        timeout=60
    )
else:
    print("\n‚ö†Ô∏è  Skipping Gold check - Silver did not complete successfully")
    gold_run_id = None

## Step 7: Monitor Gold Completion

Wait for Gold transformation to complete.

In [None]:
if gold_run_id:
    gold_success = await monitor_flow_run(
        gold_run_id,
        "Gold Transformation",
        timeout=300  # 5 minutes
    )
else:
    print("\n‚ö†Ô∏è  Cannot monitor Gold - run not found")
    gold_success = False

## Step 8: Verify Data Flow

Check that data actually flowed through all layers.

In [None]:
from sqlalchemy import func, select
from fund_lens_etl.database import get_db_session
from fund_lens_etl.models.bronze.fec import (
    BronzeFECCommittee,
    BronzeFECCandidate,
    BronzeFECScheduleA,
)
from fund_lens_etl.models.silver.fec import (
    SilverFECCommittee,
    SilverFECCandidate,
    SilverFECContribution,
)
from fund_lens_etl.models.gold import (
    GoldCommittee,
    GoldCandidate,
    GoldContributor,
    GoldContribution,
)

print("\n" + "=" * 60)
print("DATA FLOW VERIFICATION")
print("=" * 60)

with get_db_session() as session:
    # Bronze
    bronze_committees = session.execute(
        select(func.count()).select_from(BronzeFECCommittee)
        .where(BronzeFECCommittee.committee_id.in_(test_committees))
    ).scalar()
    
    bronze_contributions = session.execute(
        select(func.count()).select_from(BronzeFECScheduleA)
        .where(BronzeFECScheduleA.committee_id.in_(test_committees))
    ).scalar()
    
    # Silver
    silver_committees = session.execute(
        select(func.count()).select_from(SilverFECCommittee)
        .where(SilverFECCommittee.source_committee_id.in_(test_committees))
    ).scalar()
    
    silver_contributions = session.execute(
        select(func.count()).select_from(SilverFECContribution)
        .where(SilverFECContribution.committee_id.in_(test_committees))
    ).scalar()
    
    # Gold
    gold_committees = session.execute(
        select(func.count()).select_from(GoldCommittee)
        .where(GoldCommittee.fec_committee_id.in_(test_committees))
    ).scalar()
    
    gold_contributions = session.execute(
        select(func.count()).select_from(GoldContribution)
        .where(GoldContribution.recipient_committee_id.in_(
            select(GoldCommittee.id)
            .where(GoldCommittee.fec_committee_id.in_(test_committees))
        ))
    ).scalar()
    
    print("\n‚úì BRONZE LAYER (Test Committees):")
    print(f"  Committees:    {bronze_committees:,}")
    print(f"  Contributions: {bronze_contributions:,}")
    
    print("\n‚úì SILVER LAYER (Test Committees):")
    print(f"  Committees:    {silver_committees:,}")
    print(f"  Contributions: {silver_contributions:,}")
    
    print("\n‚úì GOLD LAYER (Test Committees):")
    print(f"  Committees:    {gold_committees:,}")
    print(f"  Contributions: {gold_contributions:,}")
    
    # Verify consistency
    print("\n" + "=" * 60)
    print("CONSISTENCY CHECK")
    print("=" * 60)
    
    checks = [
        ("Bronze ‚Üí Silver contributions", bronze_contributions == silver_contributions),
        ("Silver ‚Üí Gold contributions", silver_contributions == gold_contributions),
        ("Data in all layers", bronze_contributions > 0 and silver_contributions > 0 and gold_contributions > 0),
    ]
    
    all_passed = True
    for check_name, passed in checks:
        status = "‚úì" if passed else "‚úó"
        print(f"{status} {check_name}")
        if not passed:
            all_passed = False
    
    print("=" * 60)
    
    if all_passed:
        print("\n‚úÖ ALL CHECKS PASSED - Data flowed through complete pipeline!")
    else:
        print("\n‚ö†Ô∏è  SOME CHECKS FAILED - Review data flow")

## Final Summary

Summary of the complete trigger chain test.

In [None]:
print("\n" + "=" * 60)
print("END-TO-END TRIGGER CHAIN TEST SUMMARY")
print("=" * 60)

results = [
    ("1. Deployments configured", deployments_ok),
    ("2. Bronze triggered", bronze_run_id is not None),
    ("3. Bronze completed", bronze_success if 'bronze_success' in locals() else False),
    ("4. Silver auto-triggered", silver_run_id is not None),
    ("5. Silver completed", silver_success if 'silver_success' in locals() else False),
    ("6. Gold auto-triggered", gold_run_id is not None),
    ("7. Gold completed", gold_success if 'gold_success' in locals() else False),
]

for step, success in results:
    status = "‚úÖ" if success else "‚ùå"
    print(f"{status} {step}")

print("=" * 60)

all_success = all(success for _, success in results)

if all_success:
    print("\nüéâ SUCCESS! Event-driven trigger chain working correctly!")
    print("\nNext steps:")
    print("  - Production pipeline will run M-F at 1 AM")
    print("  - Silver will auto-trigger when Bronze completes")
    print("  - Gold will auto-trigger when Silver completes")
    print("  - Monthly bulk will also trigger Silver ‚Üí Gold chain")
else:
    print("\n‚ö†Ô∏è  Some steps failed - review logs above")
    print("\nTroubleshooting:")
    print("  - Check Prefect UI for flow run details")
    print("  - Verify trigger configuration: poetry run prefect deployment inspect <name>")
    print("  - Check worker is running: systemctl status prefect-worker")