In [None]:
pip install pandas plot duckdb ipywidgets SQLAlchemy psycopg2-binary

In [None]:
# recommended resources for this notebook: >= 8 cpu and >= 4GB ram
import pandas as pd
import matplotlib.pyplot as plt
import duckdb
import glob
from sqlalchemy import create_engine
import os

# use duck db for analytical queries
# https://duckdb.org/docs/api/python/overview
# https://duckdb.org/docs/guides/performance/environment
# https://duckdb.org/docs/configuration/pragmas#resource-management
duckdb = duckdb.connect(":memory:", config={'threads': 2, 'memory_limit': '1.5GB', 'max_memory': '1.5GB', 'max_temp_directory_size': '50GB'})

# use a postgres database to share data between this notebook and observable-framework
psql = create_engine(os.environ['PGURL'])

In [None]:
# data source: https://geodata.bts.gov/datasets/usdot::t-100-domestic-market-and-segment-data/explore
duckdb.query("""
create or replace table airport_stats as 
select 
origin as airport
, passengers as total_passengers_2023 
from read_csv('airport-data/T100_Domestic_Market_and_Segment_Data_*.csv')
""")
duckdb.query("select * from airport_stats")

In [None]:
# data source: https://data.bts.gov/Aviation/Airports/kfcv-nyy3/about_data
duckdb.query("""
create or replace table top_airports as 
select 
airport_id as airport_id
, airport as code
, display_airport_name as display_airport_name
, display_airport_city_name_full as display_airport_city_name_full
, latitude as latitude
, longitude as longitude
, total_passengers_2023
from read_csv('airport-data/airports.csv') 

-- only load the top airports
join (select * from airport_stats order by total_passengers_2023 desc limit 50) using(airport)

where
airport_is_latest = 1
and airport_is_closed = 0
""")
duckdb.query("select * from top_airports")

In [None]:
duckdb.query("select * from top_airports").df().to_sql('airports', psql, if_exists='replace', index=False)

In [None]:
# data source: https://www.transtats.bts.gov/Fields.asp?gnoyr_VQ=FHK
duckdb.query("""
create or replace table trips as 
select 
Year as year
,Quarter as quarter
,OriginAirportID as origin_airport_id
,DestAirportID as dest_airport_id
,Passengers::int as passengers
,MktFare::int as mkt_fare
,MktMilesFlown::int as mkt_miles_flown

from read_csv('airport-data/Origin_and_Destination_Survey_DB1BMarket_*.csv')
""")
duckdb.query("select * from trips")

In [None]:
duckdb.query("""
create or replace table historical_routes as 
select 
    CONCAT(year,'-Q',quarter) as date
    , passengers
    , mkt_fare/passengers as fare
    , fare/mkt_miles_flown as fare_per_mile
    
    , oa.code as origin_airport_code
    , da.code as dest_airport_code
    , oa.airport_id as origin_airport_id
    , da.airport_id as dest_airport_id

    , avg(fare) over(partition by date, origin_airport_id, dest_airport_id) as avg_fare
    , stddev(fare) over(partition by date, origin_airport_id, dest_airport_id) as stddev_fare

from trips t 

-- only load routes for the top airports
join top_airports oa on oa.airport_id = t.origin_airport_id 
join top_airports da on da.airport_id = t.dest_airport_id
""")
duckdb.query("select * from historical_routes")

In [None]:
duckdb.query("""
create or replace table arcs as 
select
  array_to_string(list_sort(list_value(origin_airport_id, dest_airport_id)),'-') as arc_id
, case when origin_airport_id < dest_airport_id then origin_airport_id else dest_airport_id end as min_airport_id
, case when origin_airport_id > dest_airport_id then origin_airport_id else dest_airport_id end as max_airport_id
, mode(mkt_miles_flown) as distance

from trips t

-- only load data for the top airports
join top_airports oa on oa.airport_id = t.origin_airport_id 
join top_airports da on da.airport_id = t.dest_airport_id

group by 1,2,3
order by 1,2,3
""")
duckdb.query("select * from arcs")

In [None]:
duckdb.query("select * from arcs").df().to_sql('arcs', psql, if_exists='replace', index=False)

In [None]:
normalized_routes = duckdb.query("""
with route_passengers as (
    -- select passengers separately because we want the total count regardless of fare outliers
    -- source is only a 10% sample of airline tickets, so multiply passenger count by 10 to get a better total
    select date, origin_airport_id, dest_airport_id, sum(passengers)*10 as total_passengers from historical_routes
    group by 1,2,3
)
, route_aggs as (
    select 
          date
        , origin_airport_id
        , dest_airport_id
        , round(avg(fare),2) as average_fare
        
    from historical_routes
    
    where 
    -- remove fare outliers (first class, discount tickets)
    fare between avg_fare - stddev_fare and avg_fare + stddev_fare
    group by 1,2,3
)

select
hra.*
, p.total_passengers
, array_to_string(list_sort(list_value(hra.origin_airport_id, hra.dest_airport_id)),'-') as arc_id
from route_aggs hra
join route_passengers p using(date, origin_airport_id, dest_airport_id)

order by 1,2,3
""").df()
normalized_routes

In [None]:
normalized_routes.to_sql('historical_routes', psql, if_exists='replace', index=False)

In [None]:
normalized_routes['average_fare'].plot(kind='hist', bins=100)

In [None]:
normalized_routes['total_passengers'].plot(kind='hist', bins=100)

In [None]:
# testing: compare published passenger count to the aggregated counts
duckdb.query("""
with route_total as (
    select year, airport_id, sum(total_passengers)*10 as total_passengers
    from (
        select year, origin_airport_id as airport_id, sum(passengers) as total_passengers
        from trips
        where year = 2023
        group by 1,2
        union
        select year, dest_airport_id as airport_id, sum(passengers) as total_passengers
        from trips
        where year = 2023
        group by 1,2
    )
    group by 1,2
)
select airport_id, total_passengers_2023, total_passengers
from top_airports
join route_total using(airport_id)
order by 1
""")