In [1]:
import pandas as pd
import numpy as np
import math
from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import HistGradientBoostingRegressor
import statistics as sts
from sliceline.slicefinder import Slicefinder
import optbinning
from dt import *
import psycopg2

(CVXPY) Jan 17 07:09:18 PM: Encountered unexpected exception importing solver GLOP:
RuntimeError('Unrecognized new version of ortools (9.8.3296). Expected < 9.8.0. Please open a feature request on cvxpy to enable support for this version.')
(CVXPY) Jan 17 07:09:18 PM: Encountered unexpected exception importing solver PDLP:
RuntimeError('Unrecognized new version of ortools (9.8.3296). Expected < 9.8.0. Please open a feature request on cvxpy to enable support for this version.')


SyntaxError: invalid syntax (pipeline.py, line 57)

## Dataset

We'll be testing out the Department of Transportation Airline On-Time Statistics dataset. We will set data sources to be years, which are 2018 to 2023. (2023 doesn't have November and December available yet.) The training columns will be:

1. Note: Year is excluded from training (by stationarity assumption)
2. Month (int)
3. Day (int)
4. Day of week (int)
5. Marketing airline (categorical, total 9): This is the airline which sold the ticket. 
6. Operating airline (categorical, total 21): This is the airline which operates the airplane. Could be the same as marketing airline, could be a different regional operator.
7. Origin & destination states (categorical, total 51)
8. Origin & destination airport coordinates (continuous): The original data is categorical with high number of options, it is probably more meaningful & tractable to extract the coordinates.
9. Distance (in miles)
10. Scheduled arrival & departure time (integer from 0000 to 2400). 

The prediction column is arrival delay (in minutes) which is encoded as integer, where negative is early and positive is late arrival. 

This dataset is very large: There are almost 40 million rows in total. 

This dataset is also very high-dimensional: there are 10 ordinal features and 132 categorical one-hot encoded features (30 airlines + 102 states). It is a good stress test for the system. 


 - NOTE: 2018-08 CSV is malformed and gives a parse error; needs fixing
 - NOTE: There may be issues where a flight has no arrival time due to cancellation, with unexpected handling of the arrival_delay column

**Reproducibility Notes**

The PyPi `sliceline` package requires Python 3.7~3.10.0, which is not the most up-to-date python version. Creating a virtual environment with python 3.9 should work. The conda environment should also have:

* sliceline
* pandas
* scikit
* optbinning

## Defining Reusable Methods

### Data Manipulation

In [None]:
def grab_data_psql(db_source):
    df = db_source.get_query_result()
    train_x, train_y = dt.split_xy(df, db_source.y)
    return train_x, train_y, df

### Model Training

In [None]:
def train_model(train_x, train_y):
    model = HistGradientBoostingRegressor(random_state=42)
    model.fit(train_x, train_y)
    return model

### Error Analysis

In [None]:
def get_errors(model, x, y):
    preds = model.predict(x)
    training_errors = (y - preds)**2
    return training_errors

In [None]:
def get_rms(arr):
    means = sts.mean(arr)
    rms = math.sqrt(means)
    return rms

In [None]:
def get_rms_error(model, x, y):
    errors = get_errors(model, x, y)
    return get_rms(errors)

### Binning

In [None]:
def bin_xs(train_x, train_errors):
    optimal_binner = optbinning.ContinuousOptimalBinning(max_n_prebins=20, max_n_bins=20)
    train_x_binned = pd.DataFrame(np.array(
        [
            optimal_binner.fit_transform(train_x[col], train_errors, metric="bins") for col in train_x.columns
        ]
    ).T, columns=train_x.columns)
    return train_x_binned

### Sliceliner

In [None]:
def get_slices(train_x_binned, train_errors, alpha = 0.9, k=1, max_l = 3, min_sup = 0, verbose = False):
    sf = Slicefinder(alpha = alpha, k = k, max_l = max_l, min_sup = min_sup, verbose = verbose)
    sf.fit(train_x_binned, train_errors)
    print(sf)
    df = pd.DataFrame(sf.top_slices_, columns=sf.feature_names_in_, index=sf.get_feature_names_out())
    return df

In [None]:
# Reformat slices returned from sliceliner as dataframe into a matrix of strings
def reformat_slices(slice_df):
    slice_df.fillna('(-inf, inf)', inplace=True)
    slice_list = slice_df.values.tolist()
    slice_parsed = dt.parse_slices(slice_list)
    return slice_parsed

In [None]:
# Get the number of times each slice already exists in xs
def get_slice_cnts(xs, slices):
    cnts = []
    for slice_ in slices:
        cnt = 0
        for x in xs.values.tolist():
            if dt.belongs_to_slice(slice_, x):
                cnt += 1 
        cnts.append(cnt)
    return cnts

### Putting the Pipeline Together

In [None]:
# Train a model, report errors, and return model & binned train set
def pipeline_train(train_x, train_y, test_x, test_y):
    # Train model
    model = train_model(train_x, train_y)
    # Error analysis
    train_errors = get_errors(model, train_x, train_y)
    print("Train RMS error:", get_rms(train_errors))
    print("Test RMS error:", get_rms(get_errors(model, test_x, test_y)))
    # Binning
    train_x_binned = bin_xs(train_x, train_errors)
    return model, train_x_binned, train_errors

In [None]:
def pipeline_sliceline(train_x, train_x_binned, train_errors, alpha = 0.9, max_l = 3, min_sup = 0, k = 1):
    # Sliceliner
    slices_df = get_slices(train_x_binned, train_errors, alpha = 0.9, max_l = 3, min_sup = 0, verbose = False, k=k)
    slices = reformat_slices(slices_df)
    existing_cnts = get_slice_cnts(train_x, slices)
    print("Slices:")
    print(slices_df)
    print("Existing counts:", existing_cnts)
    return slices

In [None]:
# Obtain additional data
def pipeline_dt(sources, costs, slices, query_counts):
    dt = DT(sources, costs, slices, None)
    additional_data = dt.run(query_counts)
    return additional_data

In [None]:
# Combine existing dataset with additional data
# Additional data is shuffled in
def pipeline_augment(train_x, train_y, additional_data, features):
    add_df = pd.DataFrame(additional_data, columns=features)
    add_x, add_y = dt.split_xy(add_df, 'arrival_delay')
    aug_x = pd.concat([train_x, add_x], ignore_index=True)
    aug_x = aug_x.sample(frac=1, random_state=12345)
    aug_y = pd.concat([train_y, add_y], ignore_index=True)
    aug_y = aug_y.sample(frac=1, random_state=12345)
    return aug_x, aug_y

## Flights Example

In [None]:
train_test_query_str = "SELECT year,month,day,weekday,origin_longitude,origin_latitude,dest_longitude,dest_latitude,distance,departure_scheduled,arrival_scheduled,arrival_delay FROM flights WHERE arrival_delay IS NOT NULL ORDER BY random() LIMIT 10000;"

orig_dbsource = DBSource('localhost','dtdemo','jwc','postgres',train_test_query_str,'arrival_delay')

# Yes, we will use simple uniform random subsampling even though it's bad practice for this proof of concept
# The dataset is very large so we can get away with it
# We also filter only rows that are not cancelled throughout this proof of concept
train_x, train_y, train = grab_data_psql(orig_dbsource)
train_x = dt.process_df(train_x)
test_x, test_y, test = grab_data_psql(orig_dbsource)
test_x = dt.process_df(test_x)
train_x

In [None]:
print(len(train_x.columns))
print(len(test_x.columns))

In [None]:
model, train_x_binned, train_errors = pipeline_train(train_x, train_y, test_x, test_y)

In [2]:
slices = pipeline_sliceline(train_x, train_x_binned, train_errors, alpha = 0.5, max_l = 1, min_sup = 0, k = 5)

NameError: name 'pipeline_sliceline' is not defined

In [19]:
# Constants for defining data sources
# Data sources are divided by year
source_query_strs = [
    "SELECT year,month,day,weekday,origin_longitude,origin_latitude,dest_longitude,dest_latitude,distance,departure_scheduled,arrival_scheduled,arrival_delay FROM flights WHERE arrival_delay IS NOT NULL AND year = 2018 ORDER BY random() LIMIT 10000;",
    "SELECT year,month,day,weekday,origin_longitude,origin_latitude,dest_longitude,dest_latitude,distance,departure_scheduled,arrival_scheduled,arrival_delay FROM flights WHERE arrival_delay IS NOT NULL AND year = 2019 ORDER BY random() LIMIT 10000;",
    "SELECT year,month,day,weekday,origin_longitude,origin_latitude,dest_longitude,dest_latitude,distance,departure_scheduled,arrival_scheduled,arrival_delay FROM flights WHERE arrival_delay IS NOT NULL AND year = 2020 ORDER BY random() LIMIT 10000;",
    "SELECT year,month,day,weekday,origin_longitude,origin_latitude,dest_longitude,dest_latitude,distance,departure_scheduled,arrival_scheduled,arrival_delay FROM flights WHERE arrival_delay IS NOT NULL AND year = 2021 ORDER BY random() LIMIT 10000;",
    "SELECT year,month,day,weekday,origin_longitude,origin_latitude,dest_longitude,dest_latitude,distance,departure_scheduled,arrival_scheduled,arrival_delay FROM flights WHERE arrival_delay IS NOT NULL AND year = 2022 ORDER BY random() LIMIT 10000;",
    "SELECT year,month,day,weekday,origin_longitude,origin_latitude,dest_longitude,dest_latitude,distance,departure_scheduled,arrival_scheduled,arrival_delay FROM flights WHERE arrival_delay IS NOT NULL AND year = 2023 ORDER BY random() LIMIT 10000;",
]
sources = [dt.DBSource('localhost','dtdemo','jwc','postgres',query,'arrival_delay') for query in source_query_strs]
costs = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0]

In [20]:
# Arbitrary, for now
query_counts = [ 100, 100, 100, 100, 100 ]

In [21]:
additional_data = pipeline_dt(sources, costs, slices, query_counts)
additional_data

AttributeError: 'DT' object has no attribute 'batch'

In [None]:
aug_x, aug_y = pipeline_augment(train_x, train_y, additional_data, train.columns)
aug_x