# ETL Master Flight Table Using Dask
The master flight table combines attributes from external tables to form a single flat table to use as a starting point for training our ML models. External attribute tables such as airport demand and weather can be pre-partitioned by airport name. External tables are typically joined to the master flight table twice: once for arrivals and a second time for departures. Data alignment is required to merge these tables so multiple shuffle operations are required for each of the joins. Pre-partitioning the data ensures that the dominant join pattern can take advantage of less expensive, in-block operations to avoid unnecessary data movement. This becomes critical for large datasets on a distributed cluster environment.  

Dask 2021.06.0 doesn't currently support MultiIndex so multi-index merging on [ARPT_NAME, DEP_TIME_DT_LOCAL_HR] is not optimal. Creating dask index or running groupby requires shuffling when join attributes are not indexed. Performing indexing before merge should save memory usage at the expense of additional computational overhead to create such indices. Some recommendations are provided in the dask documenation: https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead  
> dd.merge(a, pandas_df)  # fast  
dd.merge(a, b, left_index=True, right_index=True)  # fast  
dd.merge(a, b, left_index=True, right_on='id')  # half-fast, half-slow  
dd.merge(a, b, left_on='id', right_on='id')  # slow  

Hand tuning data flows requires a deep understanding of internal data structures. Operations that worked for small data samples may not be scalable to the full dataset. This provides some motivation to try to improve performance using advanced libraries such as Spark, dask, or NVTabular to enable us to reuse simple code and scale it to the full dataset without having to resort to writing additional custom code or patches for resolving performance issues at scale. Some fine-tuning is required to get dask to run well at scale. 

In [1]:
import os
import shutil
from time import time
import pandas as pd
import numpy as np

import dask
import dask.dataframe as dd
from dask.distributed import Client, wait, progress, get_worker

print('dask version', dask.__version__)

# Load external tables for merging:
staging_dir = './data/staging_tbl/'
etl_output_dir = './data/encoded/NAS/' # Directory will be wiped!

try:
    # RECURSIVELY DELETE DIRECTORY and then add it
    shutil.rmtree(etl_output_dir)
except:
    pass

os.mkdir(etl_output_dir)

# Small number of workers okay for ETL to reserve more mem per worker. Most tasks are threaded.
client = Client(n_workers=1, threads_per_worker=32) # Good for DSWS.
# client = Client('tcp://192.168.1.232:8786') # Connect to dask-scheduler

# Working config for full 17 year data set: 1w32t, 256GB RAM. Peak mem usage around 225GB, including spill with repartition between joins.
# Default spill is at 60% worker's mem usage. Spilling to disk adversely affects performance. Can also disable spill.
# Currently use repartitioning before each join to optimize joins.

def disable_spill():
    dask.config.set({'distributed.worker.memory.target': False, 
    'distributed.worker.memory.spill': False, 
    'distributed.worker.memory.pause': 0.95,
    'distributed.worker.memory.terminate': 0.97}
    )
    print(dask.config.config)
    
client.register_worker_callbacks(setup=disable_spill)

client

dask version 2021.06.1


Perhaps you already have a cluster running?
Hosting the HTTP server on port 43583 instead


0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:43583/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:43583/status,Workers: 1
Total threads:  32,Total memory:  251.65 GiB

0,1
Comm: tcp://127.0.0.1:38995,Workers: 1
Dashboard: http://127.0.0.1:43583/status,Total threads:  32
Started:  Just now,Total memory:  251.65 GiB

0,1
Comm: tcp://127.0.0.1:33757,Total threads: 32
Dashboard: http://127.0.0.1:34887/status,Memory: 251.65 GiB
Nanny: tcp://127.0.0.1:35957,
Local directory: /data/airline_delay_causal/dask-worker-space/worker-7du86sc8,Local directory: /data/airline_delay_causal/dask-worker-space/worker-7du86sc8
GPU: Quadro RTX 8000,GPU memory: 47.46 GiB


Airport demand and airport weather partitioned by airport. This ensures that external tables are smaller than main table for the big.merge(small) patterned joins with sorted data. Smaller table can fit within memory and be broadcasted. Tried to perform partitioning by year with 8 parquet files per year (17*8 files), which failed on first full year merge. 

In [2]:
%%time

run_type = 'cpu'

if run_type == 'gpu':
    import dask_cudf as hw
else:
    import dask.dataframe as hw

arpt_demand_dd = hw.read_parquet(staging_dir+'arpt_demand')
arpt_weather_dd = hw.read_parquet(staging_dir+'arpt_weather')
nas_flights_dd = hw.read_parquet(staging_dir+'nas_flights')


# # Pre-filter data for testing:
# test_arpt = 'ATL_GA'
# arpt_demand_dd = arpt_demand_dd[arpt_demand_dd['ARPT_NAME']==test_arpt]
# arpt_weather_dd = arpt_weather_dd[arpt_weather_dd['ARPT_NAME']==test_arpt]
# nas_flights_dd = nas_flights_dd[nas_flights_dd['ORIGIN']==test_arpt]


# Convert partitioned columns to str for merging:
arpt_demand_dd['ARPT_NAME'] = arpt_demand_dd['ARPT_NAME'].astype('category')
arpt_weather_dd['ARPT_NAME'] = arpt_weather_dd['ARPT_NAME'].astype('category')
nas_flights_dd['ORIGIN'] = nas_flights_dd['ORIGIN'].astype('category')

arpt_demand_dd['YEAR'] = arpt_demand_dd['YEAR'].astype('int16')
arpt_weather_dd['YEAR'] = arpt_weather_dd['YEAR'].astype('int16')
nas_flights_dd['YEAR'] = nas_flights_dd['YEAR'].astype('int16')

CPU times: user 142 ms, sys: 95.5 ms, total: 238 ms
Wall time: 376 ms


In [3]:
%%time

if run_type == 'gpu':
    # BLOCKING issue: GPU has merge issues related to categorical/object columns. It's trying to cast everything to numeric.
    # Possible issue with datetime object and categoricals in cudf?
    
    arpt_demand_dd = hw.read_parquet(staging_dir+'arpt_demand', columns=['YEAR', 'ARR_PER_QTHR'])
    nas_flights_dd = hw.read_parquet(staging_dir+'nas_flights', columns=['YEAR', 'ORIGIN', 'DEP_DELAY'])
    
    # Fix dtype for autotype YEAR partition:
    arpt_demand_dd['YEAR'] = arpt_demand_dd['YEAR'].astype('int16')
    nas_flights_dd['YEAR'] = nas_flights_dd['YEAR'].astype('int16')
    
    arpt_demand_dd = arpt_demand_dd[arpt_demand_dd['YEAR']==2003]
    nas_flights_dd = nas_flights_dd[nas_flights_dd['YEAR']==2003]
    
    arpt_demand_dd = arpt_demand_dd.reset_index()
    arpt_demand_dd['ARPT_NAME'] = arpt_demand_dd['ARPT_NAME'].astype(str)
    nas_flights_dd['ORIGIN'] = nas_flights_dd['ORIGIN'].astype(str)
    
    nas_flights_mg = nas_flights_dd.merge(arpt_demand_dd, left_on=['YEAR', 'ORIGIN'], 
                                          right_on=['YEAR', 'ARPT_NAME'], how='left')

else:
    # Generate computational graph for entire merging pipeline. Let dask manage tasks.
    
    # Repartition data after each join to reduce overhead: https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead
    # "Reducing partitions is very helpful just before shuffling, which creates n log(n) tasks relative to the number of partitions. 
    # DataFrames with less than 100 partitions are much easier to shuffle than DataFrames with tens of thousands."
    npartitions = 256 # Number of airports ~400 in full data.
    
    #############################
    ## Merge ORIGIN Attributes ##
    #############################
    # dd.add_prefix() works on dask 2021.06.1 but not 2021.05.1?
    nas_flights_mg = nas_flights_dd.merge(arpt_demand_dd.add_prefix('ORIGIN_'), left_on=['ORIGIN', 'DEP_TIME_DT_LOCAL_QTHR'], 
                                      right_on=['ORIGIN_ARPT_NAME', 'ORIGIN_DT_LOCAL_QTHR'], how='left').drop(columns=['ORIGIN_YEAR', 'ORIGIN_ARPT_NAME'])
#     nas_flights_mg = nas_flights_mg.repartition(npartitions=npartitions).persist()
    
    # Merge weather data at origin and destination. INNER join used to remove records without weather data. Or left join to keep and remove NA's later.
    # Assumed that hourly weather data at all airports are complete. A small fraction of airports don't have corresponding weather station.
    # Weather data merge is most time consuming step. Need to scatter weather tables?
    wx_join_type = 'left'
    nas_flights_mg = nas_flights_mg.merge(arpt_weather_dd.add_prefix('ORIGIN_'), left_on=['ORIGIN', 'DEP_TIME_DT_LOCAL_HR'],
                                          right_on=['ORIGIN_ARPT_NAME', 'ORIGIN_DT_LOCAL_HR'], how=wx_join_type).drop(columns=['ORIGIN_YEAR', 'ORIGIN_ARPT_NAME'])
#     nas_flights_mg = nas_flights_mg.repartition(npartitions=npartitions).persist()
    
    ###########################
    ## Merge DEST Attributes ##
    ###########################
    nas_flights_mg = nas_flights_mg.merge(arpt_demand_dd.add_prefix('DEST_'), left_on=['DEST', 'ARR_TIME_DT_LOCAL_QTHR'], 
                                          right_on=['DEST_ARPT_NAME', 'DEST_DT_LOCAL_QTHR'], how='left').drop(columns=['DEST_YEAR', 'DEST_ARPT_NAME'])
#     nas_flights_mg = nas_flights_mg.repartition(npartitions=npartitions).persist()
    
    nas_flights_mg = nas_flights_mg.merge(arpt_weather_dd.add_prefix('DEST_'), left_on=['DEST', 'ARR_TIME_DT_LOCAL_HR'], 
                                      right_on=['DEST_ARPT_NAME', 'DEST_DT_LOCAL_HR'], how=wx_join_type).drop(columns=['DEST_YEAR', 'DEST_ARPT_NAME'])
#     nas_flights_mg = nas_flights_mg.repartition(npartitions=npartitions).persist() # Don't need to repartition after last merge.
    
    arpt_demand_attr_cols = [cc for cc in arpt_demand_dd.columns if cc not in ['YEAR', 'ARPT_NAME', 'DT_LOCAL_QTHR']]
    arpt_demand_attr_cols = [prefix + dcols for prefix in ['ORIGIN_', 'DEST_'] for dcols in arpt_demand_attr_cols]
#     print('Airport demand cols:', arpt_demand_attr_cols)

    # Fill missing data due to merging:
    nas_flights_mg[arpt_demand_attr_cols] = nas_flights_mg[arpt_demand_attr_cols].fillna(0).astype('int8')
    
    # Due to sporadic weather data, missing fields can only be dropped instead of imputed.

    # Remove certain cols with ORIGIN_/DEST_ prefix:
    od_remove_cols = ['DT_LOCAL_QTHR', 'DT_LOCAL_HR']
    od_remove_cols = [pfix+cc for pfix in ['ORIGIN_', 'DEST_'] for cc in od_remove_cols]

    # Remove certain cols with ARR/DEP_ prefix:
    ad_remove_cols = ['TIME_DT_LOCAL', 'TIME_DT_LOCAL_DAY', 'TIME_DT_LOCAL_HR', 'TIME_DT_LOCAL_QTHR']
    ad_remove_cols = [pfix+cc for pfix in ['ARR_', 'DEP_'] for cc in ad_remove_cols]

    # Drop unecessary columns to prevent leakage and duplicate data:
    nas_flights_mg = nas_flights_mg.drop(columns=ad_remove_cols + od_remove_cols)

    # Cast to float32 so parquet file can be written. Newer version of pyarrow supports float16.
    fp16_cols = list(nas_flights_mg.select_dtypes('float16').columns)
    nas_flights_mg[fp16_cols] = nas_flights_mg[fp16_cols].astype('float32')
    
    # Performance penalty of 25-50% when external table (e.g., weather data) has categorical features. Should leave them as objects then categorize() after merge.
    # categorize forces computation so would need to checkpoint data before .categorize(). 
    # Defer running .categorize() until final merge and/or encoding step.
#     nas_flights_mg = nas_flights_mg.categorize() # categorize() converts all 'object' cols to 'category'. And unknown categories to known.
    
    # Data was aligned on airport for ease of merging. Need to partition data on YYYYMM to quickly access files for time-series test/train split and cross-validation. 
    # Use write_metadata_file=False to avoid writing the two metadata folders. Dask cannot read partitioned directory with metadata files included.
    nas_flights_mg.repartition(npartitions=1).to_parquet(etl_output_dir, write_metadata_file=False, partition_on=['YYYYMM'], flavor='spark')
#     nas_flights_mg.to_parquet(etl_output_dir, write_metadata_file=False, partition_on=['YYYYMM'], flavor='spark')

    
# len(nas_flights_mg)
# nas_flights_mg.head(5) # Head completes much faster than len(). Dask may just be computing a small subset of data to display.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/home/btong/miniconda3/envs/rapids-dev/lib/python3.8/site-packages/IPython/core/magics/execution.py", line 1321, in time
    exec(code, glob, local_ns)
  File "<timed exec>", line 88, in <module>
  File "/home/btong/miniconda3/envs/rapids-dev/lib/python3.8/site-packages/dask/dataframe/core.py", line 4389, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
  File "/home/btong/miniconda3/envs/rapids-dev/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 653, in to_parquet
    out = out.compute(**compute_kwargs)
  File "/home/btong/miniconda3/envs/rapids-dev/lib/python3.8/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/btong/miniconda3/envs/rapids-dev/lib/python3.8/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/btong/miniconda3/envs/rapids-dev/lib/python3.8/site-packages/distributed/c



TypeError: object of type 'NoneType' has no len()

In [None]:
# TargetEncoder needs to be applied after test/train split to avoid leakage. 
# TargetEncoder from category_encoder doesn't understand dask.dataframe. Pandas required, but entire data may not fit within mem.
# Need to slice UID and categorical/object cols to encode. Then merge with full data prior to running training.


In [None]:
break

In [None]:
enc_df = dd.read_parquet(etl_output_dir)
print(len(enc_df))
enc_df

In [None]:
# Make sure data doesn't contain NA's:
na_count = enc_df.isna().sum()

# Encoded data quality check:
qc_out = pd.DataFrame({'dtype': enc_df.dtypes, 'NA_cnt': na_count})
qc_out.sort_values('NA_cnt', ascending=False)

# Certain airports does not contain weather data. These can be dropped before running ML pipeline.

In [None]:
# Check dtypes. Should not have datetime after pre-encoding. Okay to have category and object cols.
qc_out['dtype'].value_counts()

In [None]:
list(zip(nas_flights_mg.columns, nas_flights_mg.dtypes))