Notebook goal:

- predict the tip size

Structure:

- load the data
- transform into required shape
- train (note will use a hard, time-based split)
- predict

In [None]:
# import coiled
# coiled.create_software_environment(
#     name="ml-with-dask",
#     account="coiled-examples",
#     conda="/Users/rpelgrim/Documents/git/coiled-resources/envs/ml-example.yml",
# )

In [1]:
from coiled import Cluster
from distributed import Client

cluster = Cluster(
    name="ml-with-dask",
    software="coiled-examples/ml-with-dask",
    n_workers=10,
    worker_cpu=8,
    worker_memory="31GiB",
)
client = Client(cluster)

Output()

# imports

In [2]:
import dask.dataframe as dd
from dask_optuna import DaskStorage
from joblib import parallel_backend
from numpy import exp, log1p
from optuna import create_study
from sklearn.compose import TransformedTargetRegressor, make_column_transformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder, PolynomialFeatures
from xgboost import XGBRegressor

# load data and aggregate

In [3]:
def load_and_aggregate(year, month):
    """Load data for a particular year/month and aggregate to the required format."""

    # optimize data usage by loading columns of interest only
    use_cols = [
        "PULocationID",
        "DOLocationID",
        "tpep_pickup_datetime",
        # "tpep_dropoff_datetime",
        "passenger_count",
        "total_amount",
        # "trip_distance",
        # "tip_amount",
    ]

    PATH_DATA = "s3://nyc-tlc/trip data/yellow_tripdata_{year}-{month:02}*.parquet"

    ddf = dd.read_parquet(PATH_DATA.format(year=year, month=month), columns=use_cols)

    frequency = "1D"
    ddf["rounded_time"] = ddf["tpep_pickup_datetime"].dt.floor(frequency)

    ddf["trip_count"] = 1

    # aggregate the data to necessary format
    aggregations = {
        "trip_count": "sum",
        "passenger_count": "sum",
        "total_amount": "sum",
    }

    ddf_agg = ddf.groupby(["rounded_time", "PULocationID"]).agg(aggregations)
    for c in ddf_agg.columns:
        ddf_agg[c] = ddf_agg[c].clip(lower=0)

    return ddf_agg

In [4]:
ddf = dd.concat(
    [
        load_and_aggregate(year, month)
        for year in range(2012, 2016 + 1)
        for month in range(1, 12 + 1)
    ]
)

In [5]:
# persist to avoid recomputes when using optuna
ddf = ddf.persist()

In [6]:
# efficient parallel processing and aggregation
# the result is small, so can load it into memory
df = ddf.compute()

In [7]:
df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,trip_count,passenger_count,total_amount
rounded_time,PULocationID,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2012-01-01,1,41,66,2880.73
2012-01-01,2,1,2,9.9
2012-01-01,3,2,3,39.1
2012-01-01,4,994,1493,12158.62
2012-01-01,7,1096,1513,14682.94


# add features

In [8]:
@FunctionTransformer
def create_custom_features(df):
    """A custom transformer to generate useful features for the prediction."""

    df = df.copy()  # this is only useful during testing, so can be deleted

    for column in df.columns:
        df[f"prev_{column}"] = df.groupby("PULocationID")[column].shift(-1)

    time_var = df.index.get_level_values("rounded_time")
    df["year"] = time_var.year
    df["month"] = time_var.month
    df["dayofyear"] = time_var.dayofyear
    df["dayofmonth"] = time_var.day
    df["dayofweek"] = time_var.dayofweek

    df = df.reset_index()

    return df


def apply_log1p(df):
    """Also clips data to non-negative values."""
    return log1p(df)


def inverse_apply_log1p(df):
    """Also clips data to non-negative values."""
    return exp(df) - 1

In [9]:
ct = make_column_transformer(
    (
        SimpleImputer(strategy="constant", fill_value=0),
        ["prev_trip_count", "prev_passenger_count", "prev_total_amount"],
    ),
    (
        OneHotEncoder(
            categories=[range(265 + 1), range(6 + 1), range(31 + 1)],
            handle_unknown="error",
        ),
        ["PULocationID", "dayofweek", "dayofmonth"],
    ),
    (
        "passthrough",
        [
            "year",
            "month",
            "dayofyear",
        ],
    ),
)

model = make_pipeline(
    create_custom_features,
    ct,
    TransformedTargetRegressor(
        regressor=XGBRegressor(n_jobs=-1),
        func=apply_log1p,
        inverse_func=inverse_apply_log1p,
    ),
)

In [10]:
# split into train and test based on time
mask_train = df.index.get_level_values("rounded_time").year <= 2014
mask_test = df.index.get_level_values("rounded_time").year > 2014

y_column = "trip_count"

X = df
y = df[y_column]

X_train = X[mask_train].copy()
y_train = y[mask_train].copy()

X_test = X[mask_test].copy()
y_test = y[mask_test].copy()

In [11]:
# # the alternatives are: a simple train-test split like below
# # or the TimeSeriesSplit from sklearn, which is similar to the above, but introduces extra moving parts
# X = df
# y_column = "trip_count"
# y = df[y_column]

# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

In [12]:
model.fit(X_train, y_train)

Pipeline(steps=[('functiontransformer',
                 FunctionTransformer(func=<function create_custom_features at 0x16a1084c0>)),
                ('columntransformer',
                 ColumnTransformer(transformers=[('simpleimputer',
                                                  SimpleImputer(fill_value=0,
                                                                strategy='constant'),
                                                  ['prev_trip_count',
                                                   'prev_passenger_count',
                                                   'prev_total_amount']),
                                                 ('onehotencoder',
                                                  OneHotEncoder(categories=[range(0, 266),...
                                                                   interaction_constraints=None,
                                                                   learning_rate=None,
                                 

In [13]:
def model_performance(y_true, y_pred, index_label=0):

    from pandas import DataFrame
    from sklearn.metrics import (
        mean_absolute_error,
        mean_squared_error,
        median_absolute_error,
        r2_score,
    )

    median_ae = median_absolute_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    r2 = r2_score(y_true, y_pred)

    return DataFrame(
        {"Median AE": median_ae, "MAE": mae, "RMSE": rmse, "R2": r2},
        index=[index_label],
    )

In [14]:
y_pred = model.predict(X_test)

model_performance(y_test, y_pred)

Unnamed: 0,Median AE,MAE,RMSE,R2
0,6.783797,183.425914,565.447395,0.973802


In [15]:
print(model_performance(y_test, y_pred))
#    Median AE         MAE        RMSE        R2
# 0   6.770683  190.153663  583.813663  0.974528

   Median AE         MAE        RMSE        R2
0   6.783797  183.425914  565.447395  0.973802


In [16]:
df[y_column].describe()

count    414853.000000
mean       1894.150367
std        3920.652716
min           1.000000
25%           4.000000
50%          31.000000
75%         984.000000
max      100276.000000
Name: trip_count, dtype: float64

# hyperparameter tuning

In [17]:
def objective(trial):

    # Load our dataset, note ddf is persisted to avoid multiple recomputes
    df = ddf.compute()

    # split into train and test based on time
    mask_train = df.index.get_level_values("rounded_time").year <= 2014
    mask_test = df.index.get_level_values("rounded_time").year > 2014

    y_column = "trip_count"

    X = df
    y = df[y_column]

    X_train = X[mask_train].copy()
    y_train = y[mask_train].copy()

    X_test = X[mask_test].copy()
    y_test = y[mask_test].copy()

    # Get set of hyperparameters
    param = {
        "objective": trial.suggest_categorical(
            "objective", ["reg:squarederror", "count:poisson"]
        ),
        "booster": trial.suggest_categorical("booster", ["gbtree", "dart", "gblinear"]),
        "lambda": trial.suggest_float("lambda", 1e-8, 1.0, log=True),
        "alpha": trial.suggest_float("alpha", 1e-8, 1.0, log=True),
        "eta": trial.suggest_float("eta", 1e-2, 1.0, log=True),
    }

    # define the model with updated parameters and train it
    model = make_pipeline(
        create_custom_features,
        ct,
        TransformedTargetRegressor(
            regressor=XGBRegressor(**param),
            func=apply_log1p,
            inverse_func=inverse_apply_log1p,
        ),
    )

    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    # Compute and return a metric of interest
    r2 = r2_score(y_test, y_pred)

    return r2

In [25]:
X.isna().sum()

(trip_count         0
 passenger_count    0
 total_amount       0
 dtype: int64,
 0)

In [26]:
y.isna().sum()

0

### Dask-compatible Optuna storage class errors out

In [18]:
# # Create Dask-compatible Optuna storage class
# note this: https://github.com/jrbourbeau/dask-optuna/issues/22
# DON'T USE THIS WHEN STORING LOCALLY
storage = DaskStorage()

In [23]:
# This erred out for Sultan, so use local storage instead
study = create_study(direction="maximize", storage=storage)

In [None]:
# ---------------------------------------------------------------------------
# AttributeError                            Traceback (most recent call last)
# Input In [41], in <cell line: 2>()
#       1 # Run 500 optimizations trial on our cluster
# ----> 2 study = create_study(direction="maximize", storage=storage)

# File ~/miniconda3/envs/coiled_taxi/lib/python3.9/site-packages/optuna/study.py:720, in create_study(storage, sampler, pruner, study_name, direction, load_if_exists)
#     717 else:
#     718     raise ValueError("Please set either 'minimize' or 'maximize' to direction.")
# --> 720 study._storage.set_study_direction(study_id, _direction)
#     722 return study

# File ~/miniconda3/envs/coiled_taxi/lib/python3.9/site-packages/dask_optuna/storage.py:388, in DaskStorage.set_study_direction(self, study_id, direction)
#     384 @use_basestorage_doc
#     385 def set_study_direction(
#     386     self, study_id: int, direction: study.StudyDirection
#     387 ) -> None:
# --> 388     return self.client.sync(
#     389         self.client.scheduler.optuna_set_study_direction,
#     390         study_id=study_id,
#     391         direction=direction.name,
#     392         storage_name=self.name,
#     393     )

# File ~/miniconda3/envs/coiled_taxi/lib/python3.9/site-packages/distributed/utils.py:310, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
#     308     return future
#     309 else:
# --> 310     return sync(
#     311         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
#     312     )

# File ~/miniconda3/envs/coiled_taxi/lib/python3.9/site-packages/distributed/utils.py:364, in sync(loop, func, callback_timeout, *args, **kwargs)
#     362 if error[0]:
#     363     typ, exc, tb = error[0]
# --> 364     raise exc.with_traceback(tb)
#     365 else:
#     366     return result[0]

# File ~/miniconda3/envs/coiled_taxi/lib/python3.9/site-packages/distributed/utils.py:349, in sync.<locals>.f()
#     347     if callback_timeout is not None:
#     348         future = asyncio.wait_for(future, callback_timeout)
# --> 349     result[0] = yield future
#     350 except Exception:
#     351     error[0] = sys.exc_info()

# File ~/miniconda3/envs/coiled_taxi/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
#     759 exc_info = None
#     761 try:
# --> 762     value = future.result()
#     763 except Exception:
#     764     exc_info = sys.exc_info()

# File ~/miniconda3/envs/coiled_taxi/lib/python3.9/site-packages/distributed/core.py:900, in PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
#     898 prev_name, comm.name = comm.name, "ConnectionPool." + key
#     899 try:
# --> 900     return await send_recv(comm=comm, op=key, **kwargs)
#     901 finally:
#     902     self.pool.reuse(self.addr, comm)

# File ~/miniconda3/envs/coiled_taxi/lib/python3.9/site-packages/distributed/core.py:693, in send_recv(comm, reply, serializers, deserializers, **kwargs)
#     691 if comm.deserialize:
#     692     typ, exc, tb = clean_exception(**response)
# --> 693     raise exc.with_traceback(tb)
#     694 else:
#     695     raise Exception(response["exception_text"])

# File /opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py:516, in handle_comm()

# File /opt/conda/envs/coiled/lib/python3.9/site-packages/dask_optuna/storage.py:102, in set_study_direction()

# AttributeError: 'InMemoryStorage' object has no attribute 'set_study_direction'

### So Work Locally Instead

In [21]:
study = create_study(direction="maximize")

[32m[I 2022-06-08 15:27:06,113][0m A new study created in memory with name: no-name-c3ee0b4d-4d8b-42ca-b4a0-40069dca78a2[0m


In [24]:
%%time
with parallel_backend("dask"):
    study.optimize(objective, n_trials=5)

[33m[W 2022-06-08 15:27:37,747][0m Trial 0 failed because of the following error: KilledWorker("('read-parquet-1fb47b1256515f163c79cb7f23639dfe', 0)", <WorkerState 'tls://10.4.14.148:40641', name: ml-with-dask-worker-2e9d89cfc3, status: closed, memory: 0, processing: 14>)[0m
Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/coiled_taxi/lib/python3.9/site-packages/optuna/_optimize.py", line 189, in _run_trial
    value = func(trial)
  File "/var/folders/ky/bqjn_gxn1xv0cn_8q5xvp3q40000gn/T/ipykernel_2391/256758797.py", line 4, in objective
    df = ddf.compute()
  File "/Users/rpelgrim/mambaforge/envs/coiled_taxi/lib/python3.9/site-packages/dask/base.py", line 288, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/rpelgrim/mambaforge/envs/coiled_taxi/lib/python3.9/site-packages/dask/base.py", line 571, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/rpelgrim/mambaforge/envs/coiled_taxi/lib/python3.9/site-pa

KilledWorker: ("('read-parquet-1fb47b1256515f163c79cb7f23639dfe', 0)", <WorkerState 'tls://10.4.14.148:40641', name: ml-with-dask-worker-2e9d89cfc3, status: closed, memory: 0, processing: 14>)

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/coiled_taxi/lib/python3.9/site-packages/distributed/comm/tcp.py", line 409, in connect
    stream = await self.client.connect(
  File "/Users/rpelgrim/mambaforge/envs/coiled_taxi/lib/python3.9/site-packages/tornado/tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/coiled_taxi/lib/python3.9/asyncio/tasks.py", line 490, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following

In [23]:
best_params = study.best_params
print(best_params)

{'objective': 'reg:squarederror', 'booster': 'gbtree', 'lambda': 2.1566103715315003e-06, 'alpha': 0.0009330665005152326, 'eta': 0.8743753776465254}


In [24]:
best_model = make_pipeline(
    create_custom_features,
    ct,
    TransformedTargetRegressor(
        regressor=XGBRegressor(**best_params),
        func=apply_log1p,
        inverse_func=inverse_apply_log1p,
    ),
)

In [25]:
best_model.fit(X_train, y_train)

Pipeline(steps=[('functiontransformer',
                 FunctionTransformer(func=<function create_custom_features at 0x16a3be430>)),
                ('columntransformer',
                 ColumnTransformer(transformers=[('simpleimputer',
                                                  SimpleImputer(fill_value=0,
                                                                strategy='constant'),
                                                  ['prev_trip_count',
                                                   'prev_passenger_count',
                                                   'prev_total_amount']),
                                                 ('onehotencoder',
                                                  OneHotEncoder(categories=[range(0, 266),...
                                                                   interaction_constraints=None,
                                                                   lambda=2.1566103715315003e-06,
                      

In [26]:
y_pred = best_model.predict(X_test)

print(model_performance(y_test, y_pred))
#    Median AE         MAE        RMSE        R2
# 0   6.770683  190.153663  583.813663  0.974528
# THIS MODEL IS PERFORMING WORSE
#    Median AE         MAE        RMSE        R2
# 0   7.165927  187.004637  590.276149  0.971451

   Median AE         MAE       RMSE        R2
0   7.057741  187.265314  583.53562  0.972099
