# Pipeline Operations - NYC Mobility & Weather Analytics

This notebook walks through each step of the data pipeline, allowing you to:
- Run the complete pipeline or individual stages
- Validate data quality at each step
- Troubleshoot issues
- Monitor execution and performance

## Pipeline Architecture

```
DLT Ingestion (Bronze)
    ‚Üì
dbt Transformation (Silver)
    ‚Üì
Great Expectations (Gold)
```

**Total Processing:** ~12.5M records transformed through 12 dbt models with 108 tests

---
## Setup & Environment Verification

In [None]:
import subprocess
import json
from pathlib import Path
import duckdb
from datetime import datetime
import pandas as pd

# Set project root
PROJECT_ROOT = Path.cwd().parent
print(f"Project Root: {PROJECT_ROOT}")

# Verify key directories exist
dirs_to_check = ['src/ingestion', 'dbt', 'great_expectations', 'orchestration', 'data']
for dir_path in dirs_to_check:
    full_path = PROJECT_ROOT / dir_path
    status = "‚úÖ" if full_path.exists() else "‚ùå"
    print(f"{status} {dir_path}")

# Check if database exists
DB_PATH = PROJECT_ROOT / "data" / "nyc_mobility.duckdb"
db_exists = DB_PATH.exists()
print(f"\n{'‚úÖ' if db_exists else '‚ö†Ô∏è'} Database: {DB_PATH}")
if db_exists:
    print(f"   Size: {DB_PATH.stat().st_size / 1024**3:.2f} GB")

---
## Step 1: DLT Data Ingestion (Bronze Layer)

Ingest raw data from external sources:
- NYC TLC Yellow Taxi (~8.6M trips)
- CitiBike System Data (~1.4M trips)
- Open-Meteo Weather API (~1.5K hours)

**Expected Duration:** 3-5 minutes

In [None]:
# Run DLT ingestion pipeline
print("üöÄ Starting DLT data ingestion...\n")
print("This will download and load data from:")
print("  - NYC TLC (Yellow Taxi, FHV)")
print("  - CitiBike System Data")
print("  - Open-Meteo Weather API\n")

start_time = datetime.now()

result = subprocess.run(
    ["poetry", "run", "python", "src/ingestion/run_pipeline.py"],
    cwd=PROJECT_ROOT,
    capture_output=True,
    text=True
)

end_time = datetime.now()
duration = (end_time - start_time).total_seconds()

if result.returncode == 0:
    print("‚úÖ Ingestion completed successfully!")
    print(f"‚è±Ô∏è  Duration: {duration:.1f} seconds\n")
    # Show last 20 lines of output
    print("Output (last 20 lines):")
    print("="*60)
    print("\n".join(result.stdout.split("\n")[-20:]))
else:
    print("‚ùå Ingestion failed!")
    print("\nError:")
    print(result.stderr)

### Validate Ingestion

In [None]:
# Connect to DuckDB and check raw data
conn = duckdb.connect(str(DB_PATH))

print("üìä Raw Data Validation\n")
print("="*60)

# Check each raw table
tables = [
    ('yellow_taxi', 8_000_000, 9_000_000),
    ('fhv_taxi', 2_000_000, 3_000_000),
    ('trips', 1_000_000, 2_000_000),  # CitiBike
    ('hourly_weather', 1_400, 1_500),
]

results = []
for table, min_expected, max_expected in tables:
    try:
        count = conn.execute(f"SELECT COUNT(*) FROM raw_data.{table}").fetchone()[0]
        passed = min_expected <= count <= max_expected
        status = "‚úÖ" if passed else "‚ö†Ô∏è"
        results.append({
            'Table': f"raw_data.{table}",
            'Row Count': f"{count:,}",
            'Expected': f"{min_expected:,} - {max_expected:,}",
            'Status': status
        })
    except Exception as e:
        results.append({
            'Table': f"raw_data.{table}",
            'Row Count': 'ERROR',
            'Expected': f"{min_expected:,} - {max_expected:,}",
            'Status': '‚ùå'
        })

df_validation = pd.DataFrame(results)
display(df_validation)

all_passed = all(r['Status'] == '‚úÖ' for r in results)
print(f"\n{'‚úÖ All checks passed!' if all_passed else '‚ö†Ô∏è Some checks failed!'}")

conn.close()

---
## Step 2: dbt Transformation (Silver Layer)

Transform raw data through:
- **Staging** (4 models): Clean and standardize source data
- **Intermediate** (2 models): Lightweight transformations
- **Marts** (6 models): Dimension and fact tables
- **Tests** (108 tests): Data quality validation

**Expected Duration:** 1-2 minutes

In [None]:
# Run dbt build (models + tests)
print("üî® Starting dbt transformation...\n")

start_time = datetime.now()

result = subprocess.run(
    ["poetry", "run", "dbt", "build"],
    cwd=PROJECT_ROOT / "dbt",
    capture_output=True,
    text=True
)

end_time = datetime.now()
duration = (end_time - start_time).total_seconds()

if result.returncode == 0:
    print("‚úÖ dbt build completed successfully!")
    print(f"‚è±Ô∏è  Duration: {duration:.1f} seconds\n")
else:
    print("‚ùå dbt build failed!")

# Show last 30 lines of output
print("Output (last 30 lines):")
print("="*60)
print("\n".join(result.stdout.split("\n")[-30:]))

### Validate dbt Results

In [None]:
# Parse dbt run results
run_results_path = PROJECT_ROOT / "dbt" / "target" / "run_results.json"

if run_results_path.exists():
    with open(run_results_path) as f:
        run_results = json.load(f)
    
    print("üìä dbt Execution Summary\n")
    print("="*60)
    
    # Overall stats
    total = len(run_results['results'])
    passed = sum(1 for r in run_results['results'] if r['status'] == 'success')
    failed = sum(1 for r in run_results['results'] if r['status'] == 'error')
    
    print(f"Total runs: {total}")
    print(f"‚úÖ Passed: {passed}")
    print(f"‚ùå Failed: {failed}")
    print(f"Success rate: {100 * passed / total:.1f}%\n")
    
    # Break down by type
    models = [r for r in run_results['results'] if r['unique_id'].startswith('model')]
    tests = [r for r in run_results['results'] if r['unique_id'].startswith('test')]
    
    print(f"Models: {len(models)} ({sum(1 for m in models if m['status'] == 'success')} passed)")
    print(f"Tests: {len(tests)} ({sum(1 for t in tests if t['status'] == 'success')} passed)\n")
    
    # Show any failures
    failures = [r for r in run_results['results'] if r['status'] != 'success']
    if failures:
        print("‚ö†Ô∏è Failed runs:")
        for fail in failures:
            print(f"  - {fail['unique_id']}")
    else:
        print("‚úÖ All runs passed!")
else:
    print("‚ö†Ô∏è run_results.json not found. Did dbt run successfully?")

### Check Transformed Data

In [None]:
# Validate transformed tables
conn = duckdb.connect(str(DB_PATH))

print("üìä Transformed Data Validation\n")
print("="*60)

# Check staging models
print("\nü•â Staging Models (Bronze)")
staging_tables = [
    ('stg_tlc__yellow_taxi', 8_000_000, 9_000_000),
    ('stg_tlc__fhv_taxi', 2_000_000, 3_000_000),
    ('stg_citibike__trips', 1_000_000, 2_000_000),
    ('stg_weather__hourly', 1_400, 1_500),
]

for table, min_exp, max_exp in staging_tables:
    count = conn.execute(f"SELECT COUNT(*) FROM core.{table}").fetchone()[0]
    status = "‚úÖ" if min_exp <= count <= max_exp else "‚ö†Ô∏è"
    print(f"{status} {table:30} {count:>12,} rows")

# Check dimension tables
print("\nü•à Dimension Tables (Silver)")
dim_tables = [
    ('dim_date', 120, 125),
    ('dim_time', 24, 24),
    ('dim_weather', 1_400, 1_500),
    ('dim_location', 260, 270),
]

for table, min_exp, max_exp in dim_tables:
    count = conn.execute(f"SELECT COUNT(*) FROM core_core.{table}").fetchone()[0]
    status = "‚úÖ" if min_exp <= count <= max_exp else "‚ö†Ô∏è"
    print(f"{status} {table:30} {count:>12,} rows")

# Check fact tables
print("\nü•á Fact Tables (Gold)")
fact_tables = [
    ('fct_trips', 12_000_000, 13_000_000),
    ('fct_hourly_mobility', 4_000, 5_000),
]

for table, min_exp, max_exp in fact_tables:
    count = conn.execute(f"SELECT COUNT(*) FROM core_core.{table}").fetchone()[0]
    status = "‚úÖ" if min_exp <= count <= max_exp else "‚ö†Ô∏è"
    print(f"{status} {table:30} {count:>12,} rows")

conn.close()

---
## Step 3: Great Expectations Validation (Gold Layer)

Run comprehensive data quality checks:
- **10 validation suites**
- **56 individual expectations**
- Checks for completeness, consistency, and correctness

**Expected Duration:** 30 seconds

In [None]:
# Run Great Expectations validation
print("üîç Starting Great Expectations validation...\n")

start_time = datetime.now()

result = subprocess.run(
    ["poetry", "run", "python", "great_expectations/run_validations.py"],
    cwd=PROJECT_ROOT,
    capture_output=True,
    text=True
)

end_time = datetime.now()
duration = (end_time - start_time).total_seconds()

if result.returncode == 0:
    print("‚úÖ Validation completed successfully!")
    print(f"‚è±Ô∏è  Duration: {duration:.1f} seconds\n")
else:
    print("‚ö†Ô∏è Some validations may have failed")

# Show output
print("Output:")
print("="*60)
print(result.stdout)

### View Data Quality Report

In [None]:
# Check if validation results exist
ge_docs_path = PROJECT_ROOT / "great_expectations" / "uncommitted" / "data_docs" / "local_site" / "index.html"

if ge_docs_path.exists():
    print("‚úÖ Data quality report available!")
    print(f"\nüìÑ Open in browser: file://{ge_docs_path}")
    print("\nOr run: open great_expectations/uncommitted/data_docs/local_site/index.html")
else:
    print("‚ö†Ô∏è Data quality report not found")

---
## Step 4: Overall Pipeline Validation

Final checks to ensure the complete pipeline executed correctly

In [None]:
# Comprehensive pipeline validation
conn = duckdb.connect(str(DB_PATH))

print("üéØ Pipeline Health Check\n")
print("="*60)

checks = []

# 1. Check data freshness
try:
    latest_trip = conn.execute("""
        SELECT MAX(pickup_datetime) as latest
        FROM core_core.fct_trips
    """).fetchone()[0]
    checks.append(('Data Freshness', f"Latest trip: {latest_trip}", '‚úÖ'))
except Exception as e:
    checks.append(('Data Freshness', 'ERROR', '‚ùå'))

# 2. Check weather coverage
try:
    coverage = conn.execute("""
        SELECT ROUND(100.0 * SUM(CASE WHEN weather_key IS NOT NULL THEN 1 ELSE 0 END) / COUNT(*), 4)
        FROM core_core.fct_trips
    """).fetchone()[0]
    checks.append(('Weather Join Coverage', f"{coverage}%", '‚úÖ' if coverage >= 99 else '‚ö†Ô∏è'))
except Exception as e:
    checks.append(('Weather Join Coverage', 'ERROR', '‚ùå'))

# 3. Check for null primary keys
try:
    null_keys = conn.execute("""
        SELECT COUNT(*) FROM core_core.fct_trips WHERE trip_key IS NULL
    """).fetchone()[0]
    checks.append(('Null Primary Keys', f"{null_keys} found", '‚úÖ' if null_keys == 0 else '‚ùå'))
except Exception as e:
    checks.append(('Null Primary Keys', 'ERROR', '‚ùå'))

# 4. Check duplicate keys
try:
    duplicates = conn.execute("""
        SELECT COUNT(*) - COUNT(DISTINCT trip_key)
        FROM core_core.fct_trips
    """).fetchone()[0]
    checks.append(('Duplicate Keys', f"{duplicates} found", '‚úÖ' if duplicates == 0 else '‚ùå'))
except Exception as e:
    checks.append(('Duplicate Keys', 'ERROR', '‚ùå'))

# 5. Check date dimension completeness
try:
    date_range = conn.execute("""
        SELECT 
            MIN(date_actual) as min_date,
            MAX(date_actual) as max_date,
            COUNT(*) as total_dates
        FROM core_core.dim_date
    """).fetchone()
    checks.append(('Date Dimension', f"{date_range[2]} dates ({date_range[0]} to {date_range[1]})", '‚úÖ'))
except Exception as e:
    checks.append(('Date Dimension', 'ERROR', '‚ùå'))

# Display results
for check_name, result, status in checks:
    print(f"{status} {check_name:25} {result}")

all_passed = all(c[2] == '‚úÖ' for c in checks)
print("\n" + "="*60)
print(f"{'‚úÖ Pipeline health check PASSED!' if all_passed else '‚ö†Ô∏è Some checks failed'}")

conn.close()

---
## Step 5: Pipeline Performance Metrics

In [None]:
# Get overall pipeline stats
conn = duckdb.connect(str(DB_PATH))

print("üìà Pipeline Performance Summary\n")
print("="*60)

# Total records processed
stats = conn.execute("""
    SELECT
        (SELECT COUNT(*) FROM raw_data.yellow_taxi) as raw_yellow,
        (SELECT COUNT(*) FROM raw_data.fhv_taxi) as raw_fhv,
        (SELECT COUNT(*) FROM raw_data.trips) as raw_citibike,
        (SELECT COUNT(*) FROM core.stg_tlc__yellow_taxi) as stg_yellow,
        (SELECT COUNT(*) FROM core.stg_tlc__fhv_taxi) as stg_fhv,
        (SELECT COUNT(*) FROM core.stg_citibike__trips) as stg_citibike,
        (SELECT COUNT(*) FROM core_core.fct_trips) as fact_trips
""").fetchone()

total_raw = stats[0] + stats[1] + stats[2]
total_staging = stats[3] + stats[4] + stats[5]
total_fact = stats[6]

print(f"Raw Records Ingested:     {total_raw:>15,}")
print(f"Staging Records:          {total_staging:>15,}")
print(f"Fact Table Records:       {total_fact:>15,}")
print(f"\nData Retention Rate:      {100 * total_fact / total_raw:>14.2f}%")

# Database size
db_size_gb = DB_PATH.stat().st_size / 1024**3
print(f"\nDatabase Size:            {db_size_gb:>14.2f} GB")

# dbt stats from run results
if run_results_path.exists():
    with open(run_results_path) as f:
        run_results = json.load(f)
    
    total_time = sum(r.get('execution_time', 0) for r in run_results['results'])
    print(f"dbt Execution Time:       {total_time:>14.1f} seconds")
    print(f"Models Built:             {len([r for r in run_results['results'] if r['unique_id'].startswith('model')]):>15}")
    print(f"Tests Passed:             {len([r for r in run_results['results'] if r['unique_id'].startswith('test') and r['status'] == 'success']):>15}")

print("\n" + "="*60)
print("‚úÖ Pipeline metrics calculated successfully!")

conn.close()

---
## Dagster Integration

For production usage, run the pipeline through Dagster for:
- Visual lineage graph
- Dependency tracking
- Execution history
- Asset versioning
- Scheduling

In [None]:
# Check Dagster definitions
print("üîß Dagster Configuration Check\n")
print("="*60)

try:
    import sys
    sys.path.insert(0, str(PROJECT_ROOT))
    from orchestration import defs
    
    asset_count = len(list(defs.get_all_asset_specs()))
    job_count = len(list(defs.get_all_job_defs()))
    
    print(f"‚úÖ Dagster definitions loaded successfully\n")
    print(f"Assets defined: {asset_count}")
    print(f"Jobs defined: {job_count}")
    
    print("\nüìù Available Jobs:")
    for job in defs.get_all_job_defs():
        if not job.name.startswith('__'):
            print(f"  - {job.name}")
    
    print("\nüöÄ To run via Dagster UI:")
    print("   poetry run dagster dev -w orchestration/workspace.yaml")
    print("   Then open: http://localhost:3000")
    
except Exception as e:
    print(f"‚ö†Ô∏è Could not load Dagster definitions: {e}")
    print("   This is optional - pipeline can run without Dagster")

---
## Quick Commands Reference

### Via Pipeline Script
```bash
# Full pipeline (DLT + dbt + GE)
./scripts/run_pipeline.sh full

# Ingestion only
./scripts/run_pipeline.sh ingestion

# Quick test
./scripts/run_pipeline.sh quick

# Validation only
./scripts/run_pipeline.sh validate
```

### Via Direct Commands
```bash
# DLT ingestion
poetry run python src/ingestion/run_pipeline.py

# dbt build
cd dbt && poetry run dbt build

# Great Expectations
poetry run python great_expectations/run_validations.py
```

### Via Dagster
```bash
# Start Dagster UI
poetry run dagster dev -w orchestration/workspace.yaml

# Then in UI: Jobs ‚Üí full_pipeline ‚Üí Launch Run
```

---
## Troubleshooting

### Common Issues

**Database locked error:**
```bash
pkill -f duckdb  # Close all DuckDB connections
```

**dbt compilation error:**
```bash
cd dbt
rm -rf target/ dbt_packages/
poetry run dbt deps
poetry run dbt compile
```

**Missing dependencies:**
```bash
poetry install
```