### Imports

In [None]:
import pandas as pd
import polars as pl

import requests
import zipfile
import io
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

from datetime import datetime
from meteostat import Point, Daily, Hourly
from pathlib import Path
import time

### BTS Data for Southwest Region

In [None]:
SOUTHWEST_STATE_AIRPORTS = {
    # Arizona
    "PHX", "TUS",
    # New Mexico
    "ABQ",
    # Texas
    "DAL", "HOU", "AUS", "SAT", "ELP", "LBB", "MAF", "HRL",
    # Oklahoma
    "OKC", "TUL",
    # California
    "LAX", "SAN", "OAK", "SJC", "BUR", "SNA", "ONT", "SMF"
}

OUTPUT_FOLDER = "new_bts_data"
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

def download_bts_data(year, month):
    """Download BTS data for a specific month"""
    url = f'https://www.transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_{year}_{month}.zip'
    
    try:
        response = requests.get(
            url, 
            headers={'User-Agent': 'Mozilla/5.0'},
            timeout=300,
            stream=True
        )
        response.raise_for_status()
        
        with zipfile.ZipFile(io.BytesIO(response.content)) as z:
            csv_file = next((name for name in z.namelist() if name.endswith('.csv')), None)
            if not csv_file:
                return (year, month, None, "No CSV in zip")
            
            with z.open(csv_file) as f:
                df = pd.read_csv(f, encoding='utf-8', low_memory=False)
            
            df = df[
                (df["Reporting_Airline"] == "WN") &
                (df["Origin"].isin(SOUTHWEST_STATE_AIRPORTS))
            ]
            
            if not df.empty:
                month_filename = os.path.join(OUTPUT_FOLDER, f"bts_wn_{year}_{month:02d}.csv")
                df.to_csv(month_filename, index=False)
                print(f"Saved {year}-{month:02d}: {len(df):,} rows to {month_filename}")
            
            return (year, month, df, None)
            
    except requests.exceptions.HTTPError as e:
        return (year, month, None, f"HTTP {e.response.status_code}")
    except Exception as e:
        return (year, month, None, str(e)[:50])

def download_years_parallel(start_year, end_year, max_workers=12):
    """Download multiple years of data with maximum parallelization"""
    months_to_download = []
    for year in range(start_year, end_year + 1):
        for month in range(1, 13):
            current_date = datetime.now()
            if year > current_date.year or (year == current_date.year and month > current_date.month):
                continue
            months_to_download.append((year, month))
    
    total_months = len(months_to_download)
    print(f"Downloading {total_months} months ({start_year}-{end_year})")
    print(f"Using {max_workers} parallel workers\n")
    
    results = {}
    start_time = datetime.now()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(download_bts_data, year, month): (year, month)
            for year, month in months_to_download
        }
        
        for future in as_completed(futures):
            year, month, df, error = future.result()
            if df is not None and not df.empty:
                results[(year, month)] = df
            else:
                print(f"âœ— {year}-{month:02d}: {error or 'No matching rows'}")
    
    elapsed = (datetime.now() - start_time).total_seconds() / 60
    print(f"\nFinished in {elapsed:.1f} minutes")
    
    successful = [df for df in results.values() if df is not None]
    if successful:
        combined_df = pd.concat(successful, ignore_index=True)
        print(f"Combined total: {len(combined_df):,} rows")
        return combined_df
    else:
        print("No data downloaded")
        return None

def save_to_csv_fast(df, filename):
    """Save filtered DataFrame to CSV"""
    print(f"\nSaving combined dataset to {filename}...")
    start = datetime.now()
    df.to_csv(filename, index=False)
    elapsed = (datetime.now() - start).total_seconds()
    size_mb = os.path.getsize(filename) / (1024 * 1024)
    print(f"Saved in {elapsed:.1f}s ({size_mb:.1f} MB)")

if __name__ == "__main__":
    START_YEAR = 2015 # 10 years worth of data
    END_YEAR = 2025
    MAX_WORKERS = 10

    combined_data = download_years_parallel(START_YEAR, END_YEAR, max_workers=MAX_WORKERS)
    
    if combined_data is not None:
        output_file = os.path.join(OUTPUT_FOLDER, f"bts_wn_southwest_combined_{START_YEAR}_{END_YEAR}.csv")
        save_to_csv_fast(combined_data, output_file)
        print(f"\nCombined file saved as {output_file} ({len(combined_data):,} rows)")
        print("\nHead of combined dataset:")
        print(combined_data.head())
    else:
        print("No data available after filtering.")

### Meteostat Weather Data

In [None]:
SWA_AIRPORTS = {
    # Arizona
    "PHX": (33.4342, -112.0116),
    "TUS": (32.1161, -110.9410),
    # New Mexico
    "ABQ": (35.0494, -106.6172),
    # Texas
    "DAL": (32.8471, -96.8517),
    "HOU": (29.6454, -95.2789),
    "AUS": (30.1975, -97.6664),
    "SAT": (29.5337, -98.4698),
    "ELP": (31.8070, -106.3779),
    "LBB": (33.6609, -101.8214),
    "MAF": (31.9369, -102.2016),
    "HRL": (26.2285, -97.6544),
    # Oklahoma
    "OKC": (35.3931, -97.6008),
    "TUL": (36.1986, -95.8880),
    # California
    "LAX": (33.9425, -118.4081),
    "SAN": (32.7338, -117.1933),
    "OAK": (37.7126, -122.2197),
    "SJC": (37.3639, -121.9289),
    "BUR": (34.2007, -118.3587),
    "SNA": (33.6757, -117.8682),
    "ONT": (34.0559, -117.6009),
    "SMF": (38.6950, -121.5908)
}

OUTPUT_DIR = Path("meteostat_hourly")
OUTPUT_DIR.mkdir(exist_ok=True)

START_YEAR = 2015
END_YEAR = 2025
MAX_WORKERS = 4

def fetch_airport_hourly(airport, coords):
    lat, lon = coords
    point = Point(lat, lon)
    all_data = []
    
    for year in range(START_YEAR, END_YEAR + 1):
        start = datetime(year, 1, 1)
        end = datetime(year, 12, 31)
        try:
            data = Hourly(point, start, end).fetch()
            if not data.empty:
                data = data.reset_index().rename(columns = {"time": "date"})
                data['airport'] = airport
                all_data.append(data)
                print(f"Fetched {airport} {year} ({len(data)} rows)")
        except Exception as e:
            print(f"Error fetching {airport} {year}: {e}")
        time.sleep(0.05)
    
    if all_data:
        df = pd.concat(all_data)
        out_file = OUTPUT_DIR / f"{airport}.csv"
        df.to_csv(out_file, index=True)
        print(f"Saved {airport} to {out_file}")

def main():
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = [executor.submit(fetch_airport_hourly, airport, coords)
                   for airport, coords in SWA_AIRPORTS.items()]
        for fut in as_completed(futures):
            fut.result()

    all_files = list(OUTPUT_DIR.glob("*.csv"))
    combined = pd.concat((pd.read_csv(f, index_col=0) for f in all_files), ignore_index=True)
    combined.to_csv(OUTPUT_DIR / "weather_data.csv", index=False)
    print(f"\nAll airports combined: {len(combined)} rows")

if __name__ == "__main__":
    main()

### Joining BTS and Meteostat Data

In [None]:
with zipfile.ZipFile("combined_data.zip") as z:
    csv_name = next(name for name in z.namelist() if name.endswith(".csv"))
    flights = pl.scan_csv(io.BytesIO(z.read(csv_name)), low_memory=True)

weather = pl.scan_csv("Meteostat_daily_2000-2025.csv", low_memory=True)

joined = flights.join(
    weather,
    how="inner",
    left_on=["Origin", "FlightDate"],
    right_on=["airport", "date"]
)

joined.sink_csv("joined_data.csv")

print("Streaming join completed successfully. File saved as 'joined_data.csv'")