## Imports

In [None]:
!pip install pyarrow fastparquet linearmodels -q

In [None]:

from google.colab import drive
drive.mount('/content/drive')
print("‚úÖ Google Drive mounted successfully.")

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import os
import glob
import zipfile
import logging
from tqdm.auto import tqdm
from typing import List, Dict Any, Tuple Optional
from collections import defaultdict
import requests
from io import BytesIO


pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
sns.set_style("whitegrid")

GOOGLE_DRIVE_BASE_DIR = '/content/drive/My Drive/AirlineData/'
RAW_DATA_DIR = os.path.join(GOOGLE_DRIVE_BASE_DIR, 'raw')
PROCESSED_DATA_DIR = os.path.join(GOOGLE_DRIVE_BASE_DIR, 'processed')
FINAL_DATAFRAMES_DIR = os.path.join(GOOGLE_DRIVE_BASE_DIR, 'processed_tables')

os.makedirs(RAW_DATA_DIR, exist_ok=True)
os.makedirs(PROCESSED_DATA_DIR, exist_ok=True)
os.makedirs(FINAL_DATAFRAMES_DIR, exist_ok=True)

print(f"‚úÖ Directory structure created in Google Drive:")
print(f"   - Raw data path: {RAW_DATA_DIR}")
print(f"   - Processed data path: {PROCESSED_DATA_DIR}")
print(f"   - Final dataframes path: {FINAL_DATAFRAMES_DIR}")
print("‚úÖ Environment setup complete. All libraries are installed and imported.")

## Fetch Data

In [None]:
START_YEAR = 2008
END_YEAR = 2016
STUDY_PERIOD = range(START_YEAR, END_YEAR + 1)


# Data will be filtered to include only these airports.
DELTA_HUBS = [
    'ATL',  # Atlanta Hartsfield-Jackson
    'DTW',  # Detroit Metropolitan
    'MSP',  # Minneapolis-Saint Paul
    'SLC',  # Salt Lake City
    'JFK',  # New York - JFK
    'LGA',  # New York - LaGuardia
    #'MEM',  # Memphis (De-hubbed September 3, 2013 )
]
AMERICAN_HUBS = [
    'DFW',  # Dallas/Fort Worth
    'ORD',  # Chicago O'Hare
    'MIA',  # Miami
    'LAX',  # Los Angeles International
    'JFK',  # New York - JFK
]
US_AIRLINES_HUBS = [
    'CLT',  # Charlotte Douglas
    'PHL',  # Philadelphia
    'DCA',  # Washington Reagan
    'PHX',  # Phoenix Sky Harbor
]
UNITED_HUBS = [
    'ORD',  # Chicago O'Hare
    'DEN',  # Denver
    'SFO',  # San Francisco
    'IAD',  # Washington Dulles
    'LAX',  # Los Angeles International
    'CLE',  # Cleveland (De-hubbed April 2014 - June 2014 )
]
CONTINENTAL_HUBS = [
    'EWR',  # Newark
    'IAH',  # Houston Intercontinental
    'CLE',  # Cleveland
]
LLC_BASES = [
    'ACY',  # Atlantic City
    'LAS',  # Las Vegas
    'BOS',  # Boston Logan
    'JFK',  # New York - JFK
    'IAD',  # Washington Dulles
    'ORD',  # Chicago O'Hare
    'DFW',  # Dallas/Fort Worth
    'DEN',  # Denver
    'FLL',  # Fort Lauderdale
    'MCO',  # Orlando International
]
TREATMENT_AIRPORTS = [
    'DTW',  # Detroit Metropolitan
    'FLL',  # Fort Lauderdale
    'MCO',  # Orlando International
    'TPA',  # Tampa
    'RSW',  # Fort Myers
]
NON_FLORIDA_LEISURE = [
    'ACY',  # Atlantic City
    'LAS',  # Las Vegas
    'PHX',  # Phoenix Sky Harbor
    'MSY',  # New Orleans
]
NON_LEISURE = [
    'LGA',  # New York - LaGuardia
    'DCA',  # Washington Reagan
    'MSP',  # Minneapolis-Saint Paul
    'ORD',  # Chicago O'Hare
    'ATL',  # Atlanta Hartsfield-Jackson
]
FLORIDA_LEISURE = [
    'PBI',  # West Palm Beach
    'EYW',  # Key West
    'SFB',  # Orlando Sanford
    'PIE',  # St. Pete-Clearwater
    'ECP',  # Panama City
]
FLORIDA_NON_LEISURE = [
    'MIA',  # Miami
    'JAX',  # Jacksonville
    'DAB',  # Daytona Beach
    'PNS',  # Pensacola
    'TLH',  # Tallahassee
    'MLB',  # Melbourne/Orlando
]
CARIBBEAN_BAHAMAS = [
    'CUN',  # Mexico
    'NAS',  # Bahamas
    'PUJ',  # Dominican Republic
    'MBJ',  # Puerto Rico
    'SJU',  # Dominican Republic
    'GCM',  # Cayman Islands
    'AUA',  # Aruba
]
LATIN_AMERICAN = [
    'BOG',  # Colombia
    'PTY',  # Panama
    'GRU',  # Brazil
    'LIM',  # Peru
    'MDE',  # Colombia
    'SJO',  # Costa Rica
]


# A combined list of all major legacy hubs for efficient filtering.
ALL_HUBS = [str(hub) for hub in np.unique(DELTA_HUBS
                                          + AMERICAN_HUBS
                                          + US_AIRLINES_HUBS
                                          + UNITED_HUBS
                                          + CONTINENTAL_HUBS
                                          + LLC_BASES
                                          + TREATMENT_AIRPORTS
                                          + NON_FLORIDA_LEISURE
                                          + NON_LEISURE
                                          + FLORIDA_LEISURE
                                          + FLORIDA_NON_LEISURE
                                          + CARIBBEAN_BAHAMAS
                                          + LATIN_AMERICAN
)]

# Data will be filtered to include only these carriers.
LEGACY_CARRIERS = ['DL', 'AA', 'UA',] # Delta,  American, United
MERGED_CARRIERS = ['NW', 'US', 'CO', 'CS', 'FL'] # Northwest, US Airways, Continental, Continental Micronesia, AirTran
LCC_CARRIERS = ['WN', 'B6'] # Southwest, JetBlue
ULCC_CARRIERS = ['NK', 'G4', 'F9'] # Spirit, Allegiant, Frontier

# Combined list of all carriers included in the study.
ALL_CARRIERS = LEGACY_CARRIERS + MERGED_CARRIERS + LCC_CARRIERS + ULCC_CARRIERS

CHUNKSIZE = 100000

print("‚úÖ Global constants and configuration loaded.")
print(f"   - Study Period: {START_YEAR} - {END_YEAR}")
print(f"   - Target Hubs for Filtering: {ALL_HUBS}")
print(f"   - Target Carriers for Filtering: {ALL_CARRIERS}")
print(f"   - Chunk Size for Processing: {CHUNKSIZE}")

COUPON_REQUIRED_COLS = [
    'ItinID', 'MktID', 'Year', 'Quarter', 'Origin', 'Dest', 'RPCarrier',
    'OpCarrier', 'SeqNum', 'Coupons', 'FareClass', 'Break',
    'CouponType', 'Distance',
]

COUPON_OPTIMIZED_SCHEMA = {
    'ItinID': 'uint32',
    'MktID': 'uint32',
    'Year': 'uint16',
    'Quarter': 'uint8',
    'Origin': 'category',
    'Dest': 'category',
    'RPCarrier': 'category',
    'OpCarrier': 'category',
    'SeqNum': 'uint8',
    'Coupons': 'uint8',
    'FareClass': 'category',
    'Break': 'category',
    'CouponType': 'category',
    'Distance': 'uint16',
}

MARKET_REQUIRED_COLS = [
    'ItinID', 'MktID', 'MktCoupons', 'Year', 'Quarter', 'Origin', 'OriginState',
    'Dest', 'DestState','RPCarrier', 'OpCarrier', 'Passengers',
    'MktFare', 'MktMilesFlown', 'OpCarrierChange',
]

MARKET_OPTIMIZED_SCHEMA = {
    'ItinID': 'uint32',
    'MktID': 'uint32',
    'MktCoupons': 'uint8',
    'Year': 'uint16',
    'Quarter': 'uint8',
    'Origin': 'category',
    'OriginState': 'category',
    'Dest': 'category',
    'DestState': 'category',
    'RPCarrier': 'category',
    'OpCarrier': 'category',
    'Passengers': 'uint16',
    'MktFare': 'float32',
    'MktMilesFlown': 'uint16',
    'OpCarrierChange': 'uint8',
}

TICKET_REQUIRED_COLS = [
    'ItinID', 'Coupons', 'Passengers', 'Year', 'Quarter',
    'Origin', 'RPCarrier', 'RoundTrip', 'FarePerMile', 'ItinFare', 'MilesFlown',
    'RoundTrip', 'OnLine',

]


TICKET_OPTIMIZED_SCHEMA = {
    'ItinID': 'uint32',
    'Coupons': 'uint8',
    'Passengers': 'uint16',
    'Year': 'uint16',
    'Quarter': 'uint8',
    'Origin': 'category',
    'RPCarrier': 'category',
    'RoundTrip': 'uint8',
    'FarePerMile': 'float32',
    'ItinFare': 'float32',
    'MilesFlown': 'uint16',
    'RoundTrip': 'uint8',
    'OnLine': 'uint8',
}


print("‚úÖ Required columns and optimized schemas defined for Coupon, Market, and Ticket tables.")
print("\n--- Coupon Table Definitions ---")
print(f"   - COUPON_REQUIRED_COLS ({len(COUPON_REQUIRED_COLS)} columns): {COUPON_REQUIRED_COLS}")
print(f"   - COUPON_OPTIMIZED_SCHEMA ({len(COUPON_OPTIMIZED_SCHEMA)} columns): {COUPON_OPTIMIZED_SCHEMA}")

print("\n--- Market Table Definitions ---")
print(f"   - MARKET_REQUIRED_COLS ({len(MARKET_REQUIRED_COLS)} columns): {MARKET_REQUIRED_COLS}")
print(f"   - MARKET_OPTIMIZED_SCHEMA ({len(MARKET_OPTIMIZED_SCHEMA)} columns): {MARKET_OPTIMIZED_SCHEMA}")

print("\n--- Ticket Table Definitions ---")
print(f"   - TICKET_REQUIRED_COLS ({len(TICKET_REQUIRED_COLS)} columns): {TICKET_REQUIRED_COLS}")
print(f"   - TICKET_OPTIMIZED_SCHEMA ({len(TICKET_OPTIMIZED_SCHEMA)} columns): {TICKET_OPTIMIZED_SCHEMA}")




# Helper Functions for Data Acquisition and Processing
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler()]
)

def download_file(url: str, dest_path: str) -> bool:
    """
    Downloads a single file from a given URL with a progress bar.

    Args:
        url: The URL of the file to download.
        dest_path: The local path where the downloaded file will be saved.

    Returns:
        True if the download was successful, False otherwise.
    """
    filename = os.path.basename(dest_path)
    try:
        logging.info(f"Attempting to download: {filename} from {url}")
        with requests.get(url, stream=True, timeout=120) as r:
            r.raise_for_status()
            total_size = int(r.headers.get('content-length', 0))
            with open(dest_path, 'wb') as f, tqdm.wrapattr(
                f, "write", total=total_size, unit='B', unit_scale=True,
                desc=f"Downloading {filename}"
            ) as file_obj:
                for chunk in r.iter_content(chunk_size=8192):
                    file_obj.write(chunk)
        logging.info(f"Successfully downloaded: {filename}")
        return True
    except requests.exceptions.RequestException as e:
        logging.error(f"Failed to download {filename}. Error: {e}")
        return False
    except Exception as e:
        logging.error(f"An unexpected error occurred during download of {filename}: {e}")
        return False


def unzip_file(zip_path: str, extract_dir: str, expected_csv_name: str):
    """
    Unzips a file, extracts a specific CSV, renames it to an expected name,
    and deletes the original zip archive.

    Args:
        zip_path: Path to the input zip file.
        extract_dir: Directory where the CSV should be extracted.
        expected_csv_name: The desired name for the extracted CSV file.
    """
    filename = os.path.basename(zip_path)
    logging.info(f"Unzipping {filename}")
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            csv_files_in_zip = [f for f in zip_ref.namelist() if f.lower().endswith('.csv')]
            if not csv_files_in_zip:
                logging.error(f"No CSV file found in zip archive: {filename}")
                return
            csv_to_extract = csv_files_in_zip[0]
            extracted_path = zip_ref.extract(csv_to_extract, extract_dir)
            final_csv_path = os.path.join(extract_dir, expected_csv_name)
            os.makedirs(os.path.dirname(final_csv_path), exist_ok=True)
            os.rename(extracted_path, final_csv_path)
            logging.info(f"Successfully extracted and renamed '{csv_to_extract}' to '{expected_csv_name}' from: {filename}")
        if os.path.exists(zip_path):
             os.remove(zip_path)
             logging.info(f"Removed zip archive: {filename}")
    except (zipfile.BadZipFile, FileNotFoundError, OSError) as e:
        logging.error(f"Error unzipping or renaming file from {filename}: {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred during unzip of {filename}: {e}")


def process_db1b_data(
    csv_path: str,
    output_path: str,
    required_cols: List[str],
    optimized_schema: Dict[str, str],
    table_name: str
):
    """
    Reads a raw DB1B CSV file in chunks, applies cleaning, filtering, and
    transformation logic based on the table type (Coupon, Market, Ticket),
    and saves the processed data as an optimized Parquet file.

    This function adapts the processing rules based on the input table:
    - Applies carrier filtering using 'RPCarrier' where available.
    - Applies data type optimization using the provided optimized_schema.
    - Includes specific filtering logic for the 'Ticket' table (ItinFare > 0, single coupon, hub origin).
    - Includes specific transformation logic for the 'Ticket' table (round-trip fare adjustment, passenger scaling).

    Args:
        csv_path: Path to the raw input CSV file.
        output_path: Path to save the processed Parquet file.
        required_cols: List of columns to load from the CSV.
        optimized_schema: Dictionary mapping column names to optimized dtypes.
        table_name: The name of the DB1B table ('Coupon', 'Market', or 'Ticket').
    """
    filename = os.path.basename(csv_path)
    logging.info(f"Processing {filename} for {table_name} table...")

    try:
        csv_cols = pd.read_csv(csv_path, nrows=0).columns.str.strip()
        final_schema = {k: v for k, v in optimized_schema.items() if k in csv_cols}
        cols_to_use = [col for col in required_cols if col in csv_cols]
        missing_required_cols = set(required_cols) - set(cols_to_use)
        if missing_required_cols:
            logging.warning(f"Required columns not found in {filename} for {table_name}: {missing_required_cols}. Processing with available columns.")

        critical_cols = ['ItinID']
        if table_name == 'Ticket':
             critical_cols.extend(['ItinFare', 'Passengers', 'Year', 'Quarter'])
        elif table_name == 'Market':
             critical_cols.extend(['MktFare', 'Passengers', 'Year', 'Quarter'])
        elif table_name == 'Coupon':
             critical_cols.extend(['Coupons'])


        if not all(col in cols_to_use for col in critical_cols):
             missing = set(critical_cols) - set(cols_to_use)
             logging.error(f"Critical columns missing in {filename} for {table_name}: {missing}. Cannot process.")
             return

    except Exception as e:
        logging.error(f"Could not read columns from {csv_path} for {table_name}. Error: {e}")
        return

    chunk_list = []
    try:
        with pd.read_csv(
            csv_path,
            usecols=cols_to_use,
            dtype={k: v for k, v in final_schema.items() if v != 'category'},
            chunksize=CHUNKSIZE,
            encoding='utf-8',
            low_memory=False
        ) as reader:
            for chunk in tqdm(reader, desc=f"Filtering {filename}"):
                if not all(col in chunk.columns for col in critical_cols):
                     logging.warning(f"Skipping chunk in {filename} for {table_name} due to missing critical columns.")
                     continue

                processed_chunk = chunk.copy()

                if 'RPCarrier' in processed_chunk.columns:
                     processed_chunk = processed_chunk[processed_chunk['RPCarrier'].isin(ALL_CARRIERS)].copy()
                #if 'OpCarrier' in processed_chunk.columns:
                     #processed_chunk = processed_chunk[processed_chunk['OpCarrier'].isin(ALL_CARRIERS)].copy()

                #if 'Origin' in processed_chunk.columns and 'Dest' in processed_chunk.columns:
                    #processed_chunk = processed_chunk[
                        #(processed_chunk['Origin'].isin(ALL_HUBS)) |
                        #(processed_chunk['Dest'].isin(ALL_HUBS))
                    #].copy()
                #elif 'Origin' in processed_chunk.columns: # If only Origin is available, filter by Origin
                     #processed_chunk = processed_chunk[processed_chunk['Origin'].isin(ALL_HUBS)].copy()
                #elif 'Dest' in processed_chunk.columns: # If only Dest is available, filter by Dest
                     #processed_chunk = processed_chunk[processed_chunk['Dest'].isin(ALL_HUBS)].copy()

                #if table_name == 'Ticket':
                    # Apply Ticket-specific rules from the MASTER PROMPT:
                    # Filter out invalid fares (ItinFare > 0).
                    #processed_chunk = processed_chunk[processed_chunk['ItinFare'] > 0].copy()
                if not processed_chunk.empty:
                    chunk_list.append(processed_chunk)
    except Exception as e:
        logging.error(f"Error processing chunks for {csv_path} for {table_name}. Error: {e}")
        return

    if not chunk_list:
        logging.warning(f"No relevant data found in {filename} for {table_name} after filtering. No output file generated.")
        return

    logging.info(f"Concatenating {len(chunk_list)} chunks for {filename} for {table_name}...")
    df = pd.concat(chunk_list, ignore_index=True)
    for col, dtype in final_schema.items():
        if dtype == 'category' and col in df.columns:
            df[col] = df[col].astype('category')

    logging.info(f"Final DataFrame for {filename} for {table_name} has {len(df):,} rows. Optimizing memory.")
    logging.info(f"Memory usage before saving: {df.memory_usage(deep=True).sum() / 1e6:.2f} MB")

    try:
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        df.to_parquet(output_path, engine='pyarrow', compression='snappy')
        logging.info(f"Successfully saved processed data to: {output_path}")
    except Exception as e:
        logging.error(f"Failed to save Parquet file {output_path} for {table_name}. Error: {e}")

print("‚úÖ Adapted helper functions for data acquisition and processing defined for all three tables.")


# Data Acquisition & Processing Pipeline Execution

DB1B_URL_TEMPLATE = "https://transtats.bts.gov/PREZIP/Origin_and_Destination_Survey_DB1B{table}_{year}_{quarter}.zip"

DB1B_TABLES = {
    'Coupon': {'required_cols': COUPON_REQUIRED_COLS, 'optimized_schema': COUPON_OPTIMIZED_SCHEMA},
    'Market': {'required_cols': MARKET_REQUIRED_COLS, 'optimized_schema': MARKET_OPTIMIZED_SCHEMA},
    'Ticket': {'required_cols': TICKET_REQUIRED_COLS, 'optimized_schema': TICKET_OPTIMIZED_SCHEMA}
}


print("üöÄ Starting data engineering pipeline for Coupon, Market, and Ticket tables...")
print(f"   Processing data for the study period {START_YEAR}-Q1 to {END_YEAR}-Q4.")

for table_name, config in DB1B_TABLES.items():
    required_cols = config['required_cols']
    optimized_schema = config['optimized_schema']

    print(f"\n--- Processing {table_name} table ---")

    for year in STUDY_PERIOD:
        for quarter in range(1, 5):
            if year > END_YEAR:
                continue

            raw_file_name = f'db1b_{table_name.lower()}_{year}Q{quarter}.csv'
            raw_file_path = os.path.join(RAW_DATA_DIR, raw_file_name)

            zip_file_name = f'Origin_and_Destination_Survey_DB1B{table_name}_{year}_{quarter}.zip'
            zip_file_path = os.path.join(RAW_DATA_DIR, zip_file_name)

            processed_file_name = f'processed_{table_name.lower()}_{year}Q{quarter}.parquet'
            processed_file_path = os.path.join(PROCESSED_DATA_DIR, processed_file_name)

            if os.path.exists(processed_file_path):
                print(f"   - Processed file already exists, skipping: {processed_file_name}")
                continue

            try:
                if not os.path.exists(raw_file_path):
                    print(f"   - Raw CSV not found for {table_name} {year}Q{quarter}. Attempting download and unzip to Google Drive raw directory...")
                    download_url = DB1B_URL_TEMPLATE.format(table=table_name, year=year, quarter=quarter)

                    if download_file(download_url, zip_file_path):
                        if os.path.exists(zip_file_path):
                            unzip_file(zip_file_path, RAW_DATA_DIR, raw_file_name)
                        if not os.path.exists(raw_file_path):
                             print(f"   - ERROR: Raw CSV '{raw_file_name}' not found in Google Drive raw directory after unzipping '{zip_file_name}'. Skipping processing.")
                             if os.path.exists(zip_file_path):
                                 os.remove(zip_file_path)
                             continue
                    else:
                        print(f"   - WARNING: Download failed for {table_name} {year}Q{quarter}. Skipping processing.")
                        continue
                else:
                     print(f"   - Raw CSV already exists in Google Drive raw directory: {raw_file_name}. Skipping download/unzip.")


                if os.path.exists(raw_file_path):
                    process_db1b_data(raw_file_path, processed_file_path, required_cols, optimized_schema, table_name)

            except Exception as e:
                print(f"   - ERROR processing {table_name} {year}Q{quarter}: {e}")

print("\n‚úÖ Data engineering pipeline complete for all three tables.")
print("üîÑ Loading processed data from Google Drive and aggregating Ticket table...")

parquet_files = sorted(glob.glob(os.path.join(PROCESSED_DATA_DIR, '*.parquet')))

coupon_df_list = []
market_df_list = []
ticket_df_list = []

if parquet_files:
    print(f"Found {len(parquet_files)} processed parquet files in {PROCESSED_DATA_DIR}.")
    for f in parquet_files:
        try:
            filename = os.path.basename(f)
            parts = filename.replace('.parquet', '').split('_')
            if len(parts) >= 3:
                 file_year_q = parts[-1] =
                 if 'Q' in file_year_q:
                      file_year = int(file_year_q.split('Q')[0])
                      if START_YEAR <= file_year <= END_YEAR:
                            if f'processed_coupon' in f:
                                coupon_df_list.append(pd.read_parquet(f))
                            elif f'processed_market' in f:
                                market_df_list.append(pd.read_parquet(f))
                            elif f'processed_ticket' in f:
                                ticket_df_list.append(pd.read_parquet(f))
                      else:
                           print(f"   - Skipping file outside study period: {filename}")
                 else:
                      print(f"   - Skipping file with unexpected naming format: {filename}")
            else:
                print(f"   - Skipping file with unexpected naming format: {filename}")

        except Exception as e:
             print(f"   - Error processing file {f}: {e}")


    coupon_df = pd.concat(coupon_df_list, ignore_index=True) if coupon_df_list else pd.DataFrame()
    market_df = pd.concat(market_df_list, ignore_index=True) if market_df_list else pd.DataFrame()
    ticket_df = pd.concat(ticket_df_list, ignore_index=True) if ticket_df_list else pd.DataFrame()


    print(f"‚úÖ Successfully loaded and aggregated data for the study period {START_YEAR}-{END_YEAR}.")
    print(f"   - Coupon records: {len(coupon_df):,}")
    print(f"   - Market records: {len(market_df):,}")
    print(f"   - Ticket records: {len(ticket_df):,}")

    print("\n--- Coupon Data Preview ---")
    display(coupon_df.head())
    print("\n--- Market Data Preview ---")
    display(market_df.head())
    print("\n--- Ticket Data Preview ---")
    display(ticket_df.head())

else:
    print("‚ùå No processed Parquet files found in Google Drive. Cannot proceed with analysis.")

print(f"\nüíæ Saving final aggregated dataframes to {FINAL_DATAFRAMES_DIR}...")

if 'coupon_df' in locals() and not coupon_df.empty:
    final_coupon_path = os.path.join(FINAL_DATAFRAMES_DIR, 'final_coupon_df.parquet')
    coupon_df.to_parquet(final_coupon_path, engine='pyarrow', compression='snappy')
    print(f"‚úÖ final_coupon_df.parquet saved to {final_coupon_path}.")
else:
    print("‚ö†Ô∏è coupon_df is empty or not defined, skipping save.")

if 'market_df' in locals() and not market_df.empty:
    final_market_path = os.path.join(FINAL_DATAFRAMES_DIR, 'final_market_df.parquet')
    market_df.to_parquet(final_market_path, engine='pyarrow', compression='snappy')
    print(f"‚úÖ final_market_df.parquet saved to {final_market_path}.")
else:
    print("‚ö†Ô∏è market_df is empty or not defined, skipping save.")

if 'ticket_df' in locals() and not ticket_df.empty:
    final_ticket_path = os.path.join(FINAL_DATAFRAMES_DIR, 'final_ticket_df.parquet')
    ticket_df.to_parquet(final_ticket_path, engine='pyarrow', compression='snappy')
    print(f"‚úÖ final_ticket_df.parquet saved to {final_ticket_path}.")
else:
    print("‚ö†Ô∏è ticket_df is empty or not defined, skipping save.")

print("‚úÖ Attempted to save final dataframes.")

## Helper Functions

In [None]:
def pre_filter_check(df):
    """Prints pre-filter diagnostics."""
    print("Shape of data before filter:", df.shape)
    print("Number of unique ItinIDs before filter:", df['ItinID'].nunique())
    return df

def add_time_index(df: pd.DataFrame) -> pd.DataFrame:
    """Prepares a DB1B DataFrame for CausalPy."""
    df_copy = df.copy()

    df_copy['QuarterPeriod'] = pd.PeriodIndex.from_fields(
        year=df_copy['Year'],
        quarter=df_copy['Quarter'],
        freq='Q'
    )

    min_year = df_copy['Year'].min()
    df_copy['QuarterID'] = (df_copy['Year'] - min_year) * 4 + df_copy['Quarter']

    return df_copy

def add_period_indicator(df: pd.DataFrame,
                         intervention_point: pd.Period,
                         begin_study: pd.Period,
                         end_study: pd.Period) -> pd.DataFrame:
    """Adds indicators for the post-treatment and full study periods."""
    df_copy = df.copy()

    is_post_treatment = ((df_copy['QuarterPeriod'] >= intervention_point) &
                         (df_copy['QuarterPeriod'] <= end_study))

    is_study_period = ((df_copy['QuarterPeriod'] >= begin_study) &
                       (df_copy['QuarterPeriod'] <= end_study))

    df_copy['ind_post_treatment'] = is_post_treatment.astype(int)
    df_copy['ind_study_period'] = is_study_period.astype(int)

    return df_copy

def filter_db1b_markets(df: pd.DataFrame,
                        origin_list: list,
                        dest_list: list,
                        min_fare: float = 50.0,
                        max_fare: float = 2000.0,
                        min_pas: int = 1,
                        max_pas: int = 10,
                        num_coupons: list = [1, 2, 3, 4],
                        switch_carrier: int = 0,
                        study_period: int = 1,
                        min_miles: float = 0.0) -> pd.DataFrame:
    """
    Filters a DB1B market DataFrame for common analysis criteria.

    Args:
        df: The raw DB1B market DataFrame.
        origin_list: A list of airport codes for the origin.
        dest_list: A list of airport codes for the destination.
        min_fare: The minimum MktFare to include.
        num_coupons: The exact number of MktCoupons to include.
        switch_carrier: The number of OpCarrierChanges allowed.
                          Default is 0 (no carrier changes).
        min_miles: The minimum MktMilesFlown to include.

    Returns:
        A new, filtered DataFrame.
    """
    is_route_match = ((df['Origin'].isin(origin_list)) &
                     (df['Dest'].isin(dest_list)))
    is_not_codeshare = (df['OpCarrier'] == df['RPCarrier'])
    is_valid_coupon = (df['MktCoupons'].isin(num_coupons))
    is_valid_fare = (df['MktFare'] > min_fare) & (df['MktFare'] < max_fare)
    is_valid_passenger = ((df['Passengers'] >= min_pas) &
                         (df['Passengers'] <= max_pas))
    is_valid_distance = (df['MktMilesFlown'] > min_miles)
    is_in_study_period = (df['ind_study_period'] == study_period)
    is_single_carrier = (df['OpCarrierChange'] == switch_carrier)

    all_filters = (
        is_route_match &
        is_not_codeshare &
        is_valid_coupon &
        is_valid_fare &
        is_valid_passenger &
        is_valid_distance &
        is_in_study_period &
        is_single_carrier
    )
    return df[all_filters].copy()

def add_route(df):
    """Creates a standardized 'Route' column (e.g., 'ATL-JFK')."""
    col_1 = np.where(
        df['Origin'] < df['Dest'],
        df['Origin'],
        df['Dest']
    )
    col_2 = np.where(
        df['Origin'] < df['Dest'],
        df['Dest'],
        df['Origin']
    )
    return df.assign(Route = col_1 + '-' + col_2)

def add_yield(df):
    """
    Calculates passenger-weighted yield (Fare per Mile).

    Yield = (Total MktFare * Passengers) / (Total MktMilesFlown * Passengers)

    Handles division by zero by setting yield to 0.0 if miles are 0.
    """
    total_fare = df['MktFare']
    total_miles = df['MktMilesFlown']

    yield_col = np.where(
        total_miles > 0,
        total_fare / total_miles,
        0.0
    )
    return df.assign(Yield=yield_col)

def recode_merged_carriers(df: pd.DataFrame,
                           verbose: bool = True) -> pd.DataFrame:
    """
    Recodes 'RPCarrier' values in DB1B data to account for the
    economic reality of airline mergers, where reporting carrier codes
    lag the actual merger finalization.

    This function standardizes carrier codes to their post-merger parent
    company, based on the quarter the merger became effective.

    Args:
        df: The DB1B market DataFrame. Must contain 'Year', 'Quarter',
            and 'RPCarrier' columns.
        verbose: If True, prints a detailed log of how many records
                 were recoded for each merger.

    Returns:
        A new DataFrame with 'RPCarrier' recoded.
    """
    MERGER_DATA: List[Tuple[str, str, int, int, str]] = [
        ('NW', 'DL', 2008, 4, 'Delta-Northwest'),
        ('CO', 'UA', 2010, 4, 'United-Continental'),
        ('FL', 'WN', 2011, 2, 'Southwest-AirTran'),
        #('US', 'AA', 2014, 1, 'American-US Airways'),
        ('CS', 'UA', 2011, 1, 'Continental Micronesia-United')
    ]

    df_recoded = df.copy()
    df_recoded['YearQuarter_Numeric'] = (
        df_recoded['Year'] + (df_recoded['Quarter'] / 10.0)
    )

    if not pd.api.types.is_categorical_dtype(df_recoded['RPCarrier']):
        df_recoded['RPCarrier'] = df_recoded['RPCarrier'].astype('category')

    present_carriers = set(df_recoded['RPCarrier'].cat.categories)

    if verbose:
        print("--- Starting Airline Merger Recoding ---")

    for old_code, new_code, year, quarter, name in MERGER_DATA:

        if old_code not in present_carriers:
            if verbose:
                print(f"Skipping {name}: Carrier '{old_code}' not found in data.")
            continue

        merger_date_numeric = year + (quarter / 10.0)

        mask = (df_recoded['RPCarrier'] == old_code) & \
               (df_recoded['YearQuarter_Numeric'] >= merger_date_numeric)

        num_affected = mask.sum()

        if num_affected > 0:
            df_recoded.loc[mask, 'RPCarrier'] = new_code
            if verbose:
                print(f"Recoded {num_affected:,} records for {name} merger ({old_code} -> {new_code}) "
                      f"on or after {year} Q{quarter}.")
        elif verbose:
             print(f"No records found to recode for {name} ({old_code}) after {year} Q{quarter}.")

    df_recoded['RPCarrier'] = df_recoded['RPCarrier'].cat.remove_unused_categories()

    if verbose:
        print("\n--- Recoding Complete ---")
        print("Value counts for RPCarrier after recoding:")
        print(df_recoded['RPCarrier'].value_counts())

    df_recoded = df_recoded.drop(columns=['YearQuarter_Numeric'])

    return df_recoded

def post_filter_check(df):
    """Prints post-filter diagnostics."""
    print("Shape of data after filter:", df.shape)
    print("Number of unique ItinIDs after filter:", df['ItinID'].nunique())
    return df


In [None]:
intervention_point = pd.Period('2012Q2', freq='Q')
begin_study = pd.Period('2008Q4', freq='Q')
end_study = pd.Period('2018Q2', freq='Q')

clean_market_df = (
    market_df
    .pipe(pre_filter_check)
    .pipe(add_time_index)
    .pipe(add_period_indicator,
          intervention_point = intervention_point,
          begin_study = begin_study,
          end_study = end_study)
    .pipe(filter_db1b_markets,
          origin_list = ALL_HUBS,
          dest_list = ALL_HUBS)
    .pipe(add_route)
    .pipe(add_yield)
    .pipe(recode_merged_carriers)
    .pipe(post_filter_check)
)

os.makedirs(FINAL_DATAFRAMES_DIR, exist_ok=True)

FINAL_DATAFRAMES_DIR = '/content/drive/My Drive/AirlineData/processed_tables'
clean_market_path = os.path.join(FINAL_DATAFRAMES_DIR, 'clean_market_df.parquet')

try:
    clean_market_df.to_parquet(clean_market_path, engine='pyarrow', compression='snappy')
    print(f"‚úÖ clean_market_df saved to: {clean_market_path}")
except Exception as e:
    print(f"‚ùå Error saving clean_market_df: {e}")

In [None]:
dtw_fll_df = clean_market_df.copy()

dtw_fll_df['Yield'] = dtw_fll_df['Yield'].astype('float64')

dtw_fll_df = test_df[(test_df['QuarterPeriod'] <= '2015Q2') &
                  (test_df['Route'] == 'DTW-FLL')
]

dtw_fll_df = add_time_index(dtw_fll_df)

try:
    dtw_fll_df.to_parquet(dtw_fll_path, engine='pyarrow', compression='snappy')
    print(f"‚úÖ dtw_fll_path saved to: {clean_market_path}")
except Exception as e:
    print(f"‚ùå Error saving dtw_fll_path: {e}")