# Politician Trading Data Ingestion Orchestrator

Master orchestrator for all ingestion pipelines.

## CLI Commands
```bash
mcli run ingest run                  # Run all pipelines
mcli run ingest run --us-only        # Run US pipelines only
mcli run ingest run --intl-only      # Run international only
mcli run ingest status               # Check status of all pipelines
mcli run ingest list-pipelines       # List available pipelines
```

## Pipelines

### US Federal
- `us-house` - US House of Representatives
- `us-senate` - US Senate
- `third-party` - QuiverQuant, ProPublica, StockNear

### US States
- `us-states` - CA, TX, NY, FL, IL, PA, MA

### International
- `uk-parliament` - UK Parliament Register of Interests
- `eu-parliament` - EU Parliament MEP Declarations
- `eu-states` - Germany, France, Italy, Spain, Netherlands

In [None]:
import asyncio
import json
import logging
import sys
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import Optional

import click

project_root = Path.cwd().parent.parent
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root / 'src'))

from politician_trading.config import WorkflowConfig
from politician_trading.scrapers.scrapers import CongressTradingScraper, QuiverQuantScraper, EUParliamentScraper
from politician_trading.scrapers.scrapers_uk import run_uk_parliament_collection
from politician_trading.scrapers.scrapers_eu import run_eu_member_states_collection
from politician_trading.scrapers.scrapers_us_states import run_us_states_collection
from politician_trading.scrapers.scrapers_california import run_california_collection

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
@click.group(name="ingest")
def ingest():
    """Master orchestrator for all politician trading data ingestion."""
    pass

def click_async(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        return asyncio.run(f(*args, **kwargs))
    return wrapper

config = WorkflowConfig.default()
scraping_config = config.scraping
OUTPUT_DIR = project_root / 'data' / 'raw'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

PIPELINES = {
    'us_house': {'category': 'us_federal', 'desc': 'US House of Representatives'},
    'us_senate': {'category': 'us_federal', 'desc': 'US Senate'},
    'third_party': {'category': 'us_federal', 'desc': 'QuiverQuant, ProPublica'},
    'us_states': {'category': 'us_states', 'desc': 'TX, NY, FL, IL, PA, MA'},
    'california': {'category': 'us_states', 'desc': 'California NetFile/FPPC'},
    'uk_parliament': {'category': 'international', 'desc': 'UK Parliament'},
    'eu_parliament': {'category': 'international', 'desc': 'EU Parliament MEPs'},
    'eu_member_states': {'category': 'international', 'desc': 'DE, FR, IT, ES, NL'},
}

In [None]:
async def run_single_pipeline(name: str, scraping_config) -> tuple:
    """Run a single pipeline and return (name, count, status)."""
    start = datetime.now()
    try:
        logger.info(f"Starting: {name}")
        disclosures = []
        
        if name == 'us_house':
            scraper = CongressTradingScraper(scraping_config)
            async with scraper:
                disclosures = await scraper.scrape_house_disclosures()
        elif name == 'us_senate':
            scraper = CongressTradingScraper(scraping_config)
            async with scraper:
                disclosures = await scraper.scrape_senate_disclosures()
        elif name == 'third_party':
            scraper = QuiverQuantScraper(scraping_config)
            async with scraper:
                disclosures = await scraper.scrape_congress_trades()
        elif name == 'us_states':
            disclosures = await run_us_states_collection(scraping_config)
        elif name == 'california':
            disclosures = await run_california_collection(scraping_config)
        elif name == 'uk_parliament':
            disclosures = await run_uk_parliament_collection(scraping_config)
        elif name == 'eu_parliament':
            scraper = EUParliamentScraper(scraping_config)
            async with scraper:
                disclosures = await scraper.scrape_mep_declarations()
        elif name == 'eu_member_states':
            disclosures = await run_eu_member_states_collection(scraping_config)
        
        duration = (datetime.now() - start).total_seconds()
        logger.info(f"Completed {name}: {len(disclosures)} records in {duration:.1f}s")
        return (name, len(disclosures), 'success', disclosures)
    except Exception as e:
        logger.error(f"Failed {name}: {e}")
        return (name, 0, f'failed: {e}', [])

In [None]:
@ingest.command(name="run")
@click.option('--us-only', is_flag=True, help='Run only US pipelines')
@click.option('--intl-only', is_flag=True, help='Run only international pipelines')
@click.option('--pipeline', default=None, help='Run specific pipeline')
@click.option('--output', default=None, help='Output directory')
@click_async
async def run_ingestion(us_only: bool, intl_only: bool, pipeline: Optional[str], output: Optional[str]):
    """Run ingestion pipelines."""
    pipelines_to_run = []
    
    if pipeline:
        pipeline = pipeline.lower().replace('-', '_')
        if pipeline not in PIPELINES:
            click.echo(f"Unknown pipeline: {pipeline}", err=True)
            click.echo(f"Available: {', '.join(PIPELINES.keys())}")
            return
        pipelines_to_run = [pipeline]
    elif us_only:
        pipelines_to_run = [p for p, info in PIPELINES.items() if info['category'] in ('us_federal', 'us_states')]
    elif intl_only:
        pipelines_to_run = [p for p, info in PIPELINES.items() if info['category'] == 'international']
    else:
        pipelines_to_run = list(PIPELINES.keys())
    
    click.echo(f"Running {len(pipelines_to_run)} pipelines...\n")
    
    results = []
    all_disclosures = []
    
    for name in pipelines_to_run:
        click.echo(f"  [{name}] Starting...")
        name, count, status, disclosures = await run_single_pipeline(name, scraping_config)
        results.append((name, count, status))
        all_disclosures.extend(disclosures)
        status_icon = "OK" if status == 'success' else "FAIL"
        click.echo(f"  [{name}] {status_icon} - {count} records")
    
    # Summary
    click.echo("\n" + "="*50)
    click.echo("SUMMARY")
    click.echo("="*50)
    
    total = sum(r[1] for r in results)
    successful = sum(1 for r in results if r[2] == 'success')
    failed = len(results) - successful
    
    click.echo(f"Total records: {total}")
    click.echo(f"Pipelines: {successful} success, {failed} failed")
    
    # Save summary
    out_dir = Path(output) if output else OUTPUT_DIR
    summary_file = out_dir / f'ingestion_summary_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
    
    with open(summary_file, 'w') as f:
        json.dump({
            'run_timestamp': datetime.now().isoformat(),
            'total_records': total,
            'successful': successful,
            'failed': failed,
            'results': [{'pipeline': n, 'records': c, 'status': s} for n, c, s in results],
        }, f, indent=2)
    click.echo(f"\nSaved summary: {summary_file}")

In [None]:
@ingest.command(name="list-pipelines")
def list_pipelines():
    """List available ingestion pipelines."""
    click.echo("Available pipelines:\n")
    
    categories = {'us_federal': 'US Federal', 'us_states': 'US States', 'international': 'International'}
    
    for cat_key, cat_name in categories.items():
        click.echo(f"{cat_name}:")
        for name, info in PIPELINES.items():
            if info['category'] == cat_key:
                click.echo(f"  - {name}: {info['desc']}")
        click.echo()

In [None]:
@ingest.command(name="status")
def check_status():
    """Check status of all ingestion pipelines."""
    click.echo("Checking pipeline data status...\n")
    
    pipeline_dirs = {
        'us_house': 'us_house',
        'us_senate': 'us_senate',
        'third_party': 'third_party',
        'us_states': 'us_states',
        'california': 'us_states',
        'uk_parliament': 'uk_parliament',
        'eu_parliament': 'eu_parliament',
        'eu_member_states': 'eu_member_states',
    }
    
    for pipeline, subdir in pipeline_dirs.items():
        data_dir = OUTPUT_DIR / subdir
        if data_dir.exists():
            files = list(data_dir.glob('*.json'))
            if files:
                latest = max(files, key=lambda p: p.stat().st_mtime)
                mtime = datetime.fromtimestamp(latest.stat().st_mtime)
                click.echo(f"  {pipeline}: {len(files)} files, latest: {mtime.strftime('%Y-%m-%d %H:%M')}")
            else:
                click.echo(f"  {pipeline}: No data")
        else:
            click.echo(f"  {pipeline}: No data directory")
    
    # Check for summary files
    summaries = list(OUTPUT_DIR.glob('ingestion_summary_*.json'))
    if summaries:
        latest = max(summaries, key=lambda p: p.stat().st_mtime)
        with open(latest) as f:
            data = json.load(f)
        click.echo(f"\nLatest run: {data.get('run_timestamp', 'Unknown')}")
        click.echo(f"Total records: {data.get('total_records', 'Unknown')}")