In [None]:
import os
import time
import requests
import zipfile
import glob
import urllib3
import duckdb

# =============================================================================
# 1. CONFIGURATION (Environment-Agnostic)
# =============================================================================
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Ensures pathing works in both Jupyter Notebooks and standalone .py scripts
try:
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
except NameError:
    BASE_DIR = os.getcwd() 

DATA_RAW = os.path.join(BASE_DIR, 'data_raw')
DATA_PROCESSED = os.path.join(BASE_DIR, 'data_processed')

# Ensure necessary directories exist
os.makedirs(DATA_RAW, exist_ok=True)
os.makedirs(DATA_PROCESSED, exist_ok=True)

YEAR = 2024
MONTHS = range(1, 13)

# =============================================================================
# 2. INGESTION (BTS DOWNLOAD & EXTRACTION)
# =============================================================================
def download_and_extract():
    """Downloads monthly ZIPs from BTS and extracts raw CSVs for processing."""
    base_url = "https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_{}_{}.zip"
    print(f"\n--- [STEP 1] Starting Ingestion for {YEAR} ---")
    
    for month in MONTHS:
        zip_name = f"flights_{YEAR}_{month}.zip"
        zip_path = os.path.join(DATA_RAW, zip_name)
        
        # Download logic with retry mechanism
        if not os.path.exists(zip_path):
            print(f"Downloading {month}/{YEAR}...", end=" ", flush=True)
            for attempt in range(3):
                try:
                    time.sleep(2)
                    r = requests.get(base_url.format(YEAR, month), verify=False, stream=True, timeout=60)
                    if r.status_code == 200:
                        with open(zip_path, 'wb') as f:
                            for chunk in r.iter_content(chunk_size=1024):
                                f.write(chunk)
                        print("Success.")
                        break
                except Exception:
                    time.sleep(5)
            else: print("Failed.")

    # Extraction logic
    print("\nChecking Extraction status...")
    zips = glob.glob(os.path.join(DATA_RAW, "*.zip"))
    for z in zips:
        with zipfile.ZipFile(z, 'r') as zip_ref:
            csv_files = [f for f in zip_ref.namelist() if f.endswith('.csv')]
            for csv in csv_files:
                target_path = os.path.join(DATA_RAW, csv)
                if not os.path.exists(target_path):
                    print(f"Extracting: {csv}")
                    zip_ref.extract(csv, DATA_RAW)

# =============================================================================
# 3. VECTORIZED ETL & FINANCIAL MODELING (DUCKDB)
# =============================================================================
def run_aviation_pipeline():
    """Vectorized processing of 7.07M records using DuckDB for memory efficiency."""
    print(f"\n--- [STEP 2] Initializing DuckDB Pipeline ---")
    
    # Connect to an in-memory DuckDB instance
    con = duckdb.connect(':memory:')

    # Process Flight Logs: Generate IDs, Standardize Tails, and Apply Financial Logic
    print("Processing Fact Table (Generating UUIDs & Financial Logic)...")
    con.execute(f"""
        CREATE OR REPLACE VIEW bts_flights AS 
        SELECT 
            -- Upstream Surrogate Key Generation
            uuid() AS Flight_ID,
            CAST(FlightDate AS DATE) AS FlightDate,
            
            -- Standardize Tail Number: Force 'N' prefix to prevent join failures
            CASE 
                WHEN Tail_Number IS NULL OR TRIM(Tail_Number) = '' THEN 'UNKNOWN'
                WHEN TRIM(Tail_Number) NOT LIKE 'N%' THEN 'N' || TRIM(Tail_Number)
                ELSE TRIM(Tail_Number)
            END AS Clean_Tail,
            
            Origin, Dest, Reporting_Airline,
            TRY_CAST(AirTime AS DOUBLE) AS AirTime,
            TRY_CAST(Cancelled AS INTEGER) AS Cancelled,
            
            -- Corrected MRL Liability: $0 for Cancelled or Missing AirTime
            CASE 
                WHEN TRY_CAST(Cancelled AS INTEGER) = 1 OR TRY_CAST(AirTime AS DOUBLE) IS NULL THEN 0.0
                ELSE ((TRY_CAST(AirTime AS DOUBLE) / 60.0) * 250.0) + 180.0
            END AS MRL_Liability,
            
            -- Zero-Fill Root Cause Delays
            COALESCE(TRY_CAST(CarrierDelay AS DOUBLE), 0.0) AS CarrierDelay,
            COALESCE(TRY_CAST(WeatherDelay AS DOUBLE), 0.0) AS WeatherDelay,
            COALESCE(TRY_CAST(NASDelay AS DOUBLE), 0.0) AS NASDelay,
            COALESCE(TRY_CAST(SecurityDelay AS DOUBLE), 0.0) AS SecurityDelay,
            COALESCE(TRY_CAST(LateAircraftDelay AS DOUBLE), 0.0) AS LateAircraftDelay
            
        FROM read_csv_auto('{DATA_RAW}/*.csv', all_varchar=true, header=true, union_by_name=true)
    """)

    # Mount Optimized FAA Master Registry (Parquet Integration)
    master_parquet = os.path.join(DATA_RAW, 'FAA_Registry_Master.parquet')
    if not os.path.exists(master_parquet):
        print("WARNING: 'FAA_Registry_Master.parquet' not found in 'data_raw'. Asset Audit will skip.")
        has_master = False
    else:
        print("Mounting Optimized FAA Parquet Registry...")
        # DuckDB natively reads Parquet with zero-copy overhead
        con.execute(f"""
            CREATE OR REPLACE VIEW faa_master AS
            SELECT * FROM read_parquet('{master_parquet}')
        """)
        has_master = True

    # Export Highly Compressed Parquet Assets
    print("Exporting optimized Parquet assets to 'data_processed'...")
    con.execute(f"COPY bts_flights TO '{DATA_PROCESSED}/Aviation_Fact_Table.parquet' (FORMAT PARQUET, CODEC 'SNAPPY')")
    
    if has_master:
        # Generate Master Dimension only for aircraft present in the active 2024 network
        con.execute(f"""
            COPY (
                SELECT *
                FROM faa_master 
                WHERE Tail_Number IN (SELECT DISTINCT Clean_Tail FROM bts_flights)
            ) TO '{DATA_PROCESSED}/Master_Dim.parquet' (FORMAT PARQUET, CODEC 'SNAPPY')
        """)

    # Final Audit Summary to verify results
    report = con.execute("""
        SELECT 
            SUM(MRL_Liability) as Total_Liability,
            COUNT(*) as Total_Rows
        FROM bts_flights
    """).fetchone()

    print("\n" + "="*60)
    print(" ðŸ“Š FINAL PIPELINE REPORT (7.07M RECORDS) ðŸ“Š")
    print("="*60)
    print(f"TOTAL PROCESSED ROWS   : {report[1]:,}")
    print(f"VERIFIED MRL LIABILITY : ${report[0]:,.2f}")
    print("="*60)

# =============================================================================
# 4. EXECUTION GATE
# =============================================================================
if __name__ == "__main__":
    # 1. Download & Extract the raw files (Uncomment if you need to fetch data)
    # download_and_extract() 
    
    # 2. Execute the vectorized transformation pipeline
    run_aviation_pipeline()