In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
sc.install_pypi_package("pandas==1.0.5", "https://pypi.org/simple")
sc.install_pypi_package("scipy==1.4.1", "https://pypi.org/simple")
sc.install_pypi_package("matplotlib==3.2.1", "https://pypi.org/simple")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1747596292980_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==1.0.5
  Downloading https://files.pythonhosted.org/packages/af/f3/683bf2547a3eaeec15b39cef86f61e921b3b187f250fcd2b5c5fb4386369/pandas-1.0.5-cp37-cp37m-manylinux1_x86_64.whl (10.1MB)
Collecting python-dateutil>=2.6.1 (from pandas==1.0.5)
  Downloading https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.5 python-dateutil-2.9.0.post0

Collecting scipy==1.4.1
  Downloading https://files.pythonhosted.org/packages/dd/82/c1fe128f3526b128cfd185580ba40d01371c5d299fcf7f77968e22dfcc2e/scipy-1.4.1-cp37-cp37m-manylinux1_x86_64.whl (26.1MB)
Installing collected packages: scipy
Successfully installed scipy-1.4.1

Collecting matplotlib==3.2.1
  Downloading https://files.pythonhosted.org/packages/b2/c2/71fcf957710f3ba1f09088b35776a799ba7dd95f7c2b195ec800933b276b/matplotlib-3.2.1-cp37-cp

In [3]:
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    VectorAssembler, OneHotEncoder, StringIndexer,
    Bucketizer, PolynomialExpansion
)
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt

data = spark.read.parquet('s3://css-uchicago/nyc-tlc/trip_data/yellow_tripdata_2015*.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
print('Total Columns: %d' % len(data.dtypes))
print('Total Rows: %d' % data.count())
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Columns: 19
Total Rows: 146039231
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)

First, I identify two sets of special zones—airport and midtown—by hard-coding their TLC LocationIDs (`[236, 237]` for JFK/LGA and `[161, 162, 163, 186]` for Midtown). Trips touching these areas tend to follow very different distance, surcharge, and tipping behaviors than the rest of the city, so it makes sense to mark them out explicitly.

Next, I build a new `corridor_id` string by concatenating the pickup and dropoff zone IDs (for example, `"142_236"`). This single categorical variable captures origin → destination pairs in one fell swoop, allowing the regression to learn separate coefficients for common flows—such as Uptown→JFK versus Midtown→JFK—rather than smearing all long trips together.

To further isolate airport behavior, I add a binary `is_airport_trip` flag that is set to 1 whenever either endpoint is in an airport zone. In a similar spirit, `is_midtown_trip` flags journeys that begin or end in the dense Midtown taxi network. Both of these broad-strokes indicators pick up on the distinct fare structures, demand surges, and tipping norms typical of airport shuttles and Midtown short hops.

Because `corridor_id` can take on hundreds (or thousands) of unique values, I convert it into a numeric form that my ML pipeline can digest: first with a `StringIndexer` (assigning each corridor a stable integer index), and then with a `OneHotEncoder` to turn that index into a sparse binary vector. This approach gives each high-volume OD pair its own parameter, while exploiting sparsity to keep computation tractable.

In [31]:

# 1) Define any special zone lists
airport_zones = [236, 237]               
midtown_zones = [161, 162, 163, 186]     

# 2) Build corridor_id and the two binary flags
data = (data
    # origin–destination corridor as “PU_DO”
    .withColumn("corridor_id",
        F.concat_ws("_", F.col("PULocationID"), F.col("DOLocationID"))
    )
    # airport‐trip flag
    .withColumn("is_airport_trip",
        F.when(
            F.col("PULocationID").isin(airport_zones) |
            F.col("DOLocationID").isin(airport_zones),
            1
        ).otherwise(0)
    )
    # midtown‐trip flag
    .withColumn("is_midtown_trip",
        F.when(
            F.col("PULocationID").isin(midtown_zones) |
            F.col("DOLocationID").isin(midtown_zones),
            1
        ).otherwise(0)
    )
)

# 3) Categorical → index → one-hot for that corridor_id
corridor_indexer = StringIndexer(
    inputCol  = "corridor_id",
    outputCol = "corridor_idx",
    handleInvalid="keep"
)

corridor_encoder = OneHotEncoder(
    inputCols  = ["corridor_idx"],
    outputCols = ["corridor_ohe"]
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

To capture the strong temporal rhythms in tipping behavior, I begin by extracting relevant time-based features from the original `tpep_pickup_datetime` column. I rename this timestamp to `pickup_ts` for clarity and ease of use in subsequent transformations.

From this timestamp, I derive the hour of day using the `hour()` function and cast it to an integer. This `hour` feature is useful for identifying daily cycles, such as morning or evening rush periods when tipping may spike due to higher demand or driver scarcity.

I also extract the day of week using `dayofweek()`, which returns values ranging from 1 (Sunday) to 7 (Saturday). However, this format is not aligned with the ISO standard, which defines Monday as 1 and Sunday as 7. To resolve this, I re-index the day values using modular arithmetic to produce a `weekday_iso` column that conforms to the ISO standard. This lets me consistently identify weekdays and weekends in downstream analyses.

Building on this, I define an `is_weekend` binary feature that is set to 1 if the trip occurred on a Saturday or Sunday. Since tipping norms and rider behavior often shift between weekends and weekdays, this indicator helps the model adjust expectations accordingly.

Finally, I introduce a `rush_hour` flag for trips that occur during the late afternoon peak (4–7 PM). This feature signals times of higher congestion, limited driver supply, and potentially longer trip durations—all of which may influence fare size and tipping propensity. These temporal features allow the model to capture predictable, time-linked variation in tipping behavior without needing to memorize individual timestamps.

In [32]:
data = (
    data
    .withColumn("pickup_ts", F.col("tpep_pickup_datetime"))
    .withColumn("hour",       F.hour("pickup_ts").cast("int"))
    .withColumn("dow_sun1",   F.dayofweek("pickup_ts").cast("int"))
    .withColumn(
        "weekday_iso",
        ((F.col("dow_sun1") + F.lit(5)) % F.lit(7) + F.lit(1)).cast("int")
    )
    .withColumn("is_weekend", (F.col("weekday_iso") >= 6).cast("int"))
    .withColumn("rush_hour",  F.when(F.col("hour").between(16,19), 1).otherwise(0))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
data = data.withColumn(
    "passenger_count_dbl",
    F.col("passenger_count").cast("double")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

I introduce non‐linear representations of trip distance to allow the model to learn different tipping patterns for short, mid‐range, long, and very‐long trips. First, I define `distance_splits` to separate the continuous `trip_distance` into four intuitive buckets—short (≤2 mi), mid (2–8 mi), long (8–15 mi), and very‐long (>15 mi)—and apply Spark’s `Bucketizer` to generate a new categorical feature `dist_bin`. This bucketization captures the “sweet spot” in the mid‐range where riders often tip more generously, without forcing a purely linear assumption.

Next, I compute a `fare_per_mile` feature by dividing the total fare by the trip distance (when distance is positive) and defaulting to zero otherwise. This ratio conveys how expensive each mile of the trip was, helping the model distinguish, for example, short rides with high per‐mile fares (like airport trips) from long trips with lower unit costs. Because fare structure and rider generosity can vary markedly by trip economy, this normalized feature adds valuable granularity.

Finally, I create an interaction term `dist_times_pax` by multiplying the raw trip distance by `passenger_count`. This captures how group size compounds distance effects—for instance, multi‐passenger rides may behave differently over the same distance than solo trips. By including this floating‐point interaction, I ensure the model can adjust its distance sensitivity based on party size rather than treating every mile as “worth” the same tipping potential regardless of how many people are on board.

In [34]:
# 3) FLOATING-POINT  ──────
#    nonlinear distance effects + fare / distance ratio
# -------------------------------------------------------------
# Bucketize trip_distance (non-linear bins) → categorical buckets
distance_splits = [-float("inf"), 2, 8, 15, float("inf")]   # short, mid, long, very-long
bucketizer = Bucketizer(
    splits=distance_splits,
    inputCol="trip_distance",
    outputCol="dist_bin")                                  

# fare per mile
data = data.withColumn(
    "fare_per_mile",
    F.when(F.col("trip_distance") > 0,
           F.col("fare_amount") / F.col("trip_distance")
    ).otherwise(0.0)
)

# interaction: distance × passenger_count  (integer × float)
data = data.withColumn(
    "dist_times_pax",
    F.col("trip_distance") * F.col("passenger_count")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

I convert the continuous `passenger_count_dbl` into a categorical representation so that the model can learn distinct tipping behaviors across different party sizes. First, I apply Spark’s `StringIndexer` to map each unique passenger count (as a double) to an integer index in the new column `pax_idx`, ensuring that any unexpected or missing values are handled gracefully by keeping them as a separate category. Then, I use `OneHotEncoder` on `pax_idx` to produce a sparse vector `pax_ohe`, which allows the linear regression to assign an independent coefficient to each party‐size category rather than treating passenger count as a simple numeric input. This one‐hot encoding captures non‐linear shifts in tipping patterns between single riders, couples, and larger groups without imposing an arbitrary ordering.  

In [35]:
pax_indexer = StringIndexer(
    inputCol="passenger_count_dbl",
    outputCol="pax_idx",
    handleInvalid="keep" 
)
pax_encoder = OneHotEncoder(
    inputCols=["pax_idx"],
    outputCols=["pax_ohe"]
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In this step, I bring together all of the engineered columns into a single feature vector that can be consumed by the regression algorithm. I list out the untransformed numeric and binary flags—such as the cyclic hour (`hour`), weekend indicator (`is_weekend`), and rush‐hour flag (`rush_hour`)—alongside the continuous variables that capture distance and fare dynamics (`fare_per_mile`, `dist_times_pax`). I also include the spatial indicator (`is_airport_trip`), the one‐hot encoded passenger‐count vector (`pax_ohe`), and the bucketized distance category (`dist_bin`) to allow the model to learn non‐linear distance effects. Finally, I apply `VectorAssembler` to these columns, producing a new column called `features` that contains a single vector of all predictors. This consolidated vector simplifies downstream model training by presenting every engineered input in a uniform format.  

In [36]:
feature_cols = [
    # temporal / binary
    "hour", "is_weekend", "rush_hour",

    # floating-point interactions
    "fare_per_mile", "dist_times_pax",

    # spatial flags
    "is_airport_trip", "is_midtown_trip",

    # nonlinear distance bucket
    "dist_bin",

    # high-cardinality OD pairs
    "corridor_ohe",

    # party-size one-hot
    "pax_ohe",
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

I instantiate my core learner: a regularized linear regression that predicts tip_amount from the assembled feature vector. I set a modest number of iterations and apply both L1 and L2 penalties (elasticNetParam=0.2) to help control overfitting.

With the learner defined, I assemble my full modeling pipeline. This chains together all of my feature transformers—indexing and encoding the passenger counts, bucketizing distances, vectorizing features—followed by the regression stage. Now I have one object that encapsulates every step from raw DataFrame to trained model.

Before I even tune anything, I split the data once into an 80/20 train/test hold-out. This ensures that after cross-validation and model selection on the training partition, I still have an untouched subset on which to report a final, unbiased performance estimate.

Finally, I configure a simple evaluator that knows to compare the "prediction" column against my true "tip_amount". Later on I’ll ask it for RMSE and any other regression metrics I choose.

In [37]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt

# 1) Define the regression learner
lr = LinearRegression(
    featuresCol="features",
    labelCol="tip_amount",
    maxIter=50,
    regParam=0.1,
    elasticNetParam=0.2
)

# 2) Build the full pipeline
pipeline = Pipeline(stages=[
    corridor_indexer, corridor_encoder,
    pax_indexer, pax_encoder,
    bucketizer,
    assembler,
    lr
])

# 3) Split into train / test once
sampled_data = data.sample(fraction=0.01, seed=42)

train_df, test_df = sampled_data.randomSplit([0.8, 0.2], seed=42)

# 4) Define evaluator
evaluator = RegressionEvaluator(
    labelCol="tip_amount",
    predictionCol="prediction"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

I instantiate a CrossValidator, pointing it at my full pipeline (which already bundles all of my feature‐engineering steps and the linear regression estimator) and the evaluator that knows how to compute RMSE.  By setting numFolds=3, I ask Spark to perform three‐fold cross validation, and with parallelism=2 I enable two models to be trained in parallel, speeding up the search.

Next, I call crossval.fit(train_df), which kicks off the cross‐validation process.  Under the hood, Spark will split train_df into three folds, train on two of them and validate on the third, rotating through all combinations, and will track which combination of hyperparameters (if I’d provided any) yields the lowest validation RMSE.  The result, cvModel, contains both the full history of models and the single best pipeline.

Finally, I extract cvModel.bestModel—the pipeline instance that achieved the lowest cross‐validation error—and apply it to my held‐out test_df.  After transforming the test set, I use the same evaluator to compute the RMSE on that hold‐out data, giving me an unbiased estimate of how well my tuned pipeline will perform in production.

In [38]:
import numpy as np
paramGrid = (ParamGridBuilder()
    .addGrid(lr.regParam,      list(np.arange(0.0, 0.1, 0.01)))
    .addGrid(lr.elasticNetParam, [0.0, 1.0])
    .build()
)

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=RegressionEvaluator(
        labelCol="tip_amount",
        predictionCol="prediction",
        metricName="rmse"       # default, but made explicit
    ),
    numFolds=5,
    parallelism=8              # one task per core node
)

# 6) Run CV (this fits the pipeline on each fold & hyperparam combo)
cvModel = crossval.fit(train_df)

bestModel = cvModel.bestModel
preds     = bestModel.transform(test_df)
evaluator = RegressionEvaluator(labelCol="tip_amount",
                                predictionCol="prediction",
                                metricName="rmse")
test_rmse = evaluator.evaluate(preds)

print(f"Best regParam      : {bestModel.stages[-1]._java_obj.getRegParam():.2f}")
print(f"Best elasticNetParam: {bestModel.stages[-1]._java_obj.getElasticNetParam():.2f}")
print(f"Test-set RMSE       : {test_rmse:.2f}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Best regParam      : 0.00
Best elasticNetParam: 1.00
Test-set RMSE       : 2.11

In [40]:
# 10) Show top 10 features by absolute coefficient
feature_names = assembler.getInputCols()
coeffs        = bestModel.stages[-1].coefficients.toArray()
feat_coeff    = list(zip(feature_names, coeffs))
top_feats     = sorted(feat_coeff, key=lambda x: abs(x[1]), reverse=True)[:10]

print("\nTop 10 features by |coefficient|:")
for feat, c in top_feats:
    print(f"  • {feat:20s} →  {c:+.4f}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Top 10 features by |coefficient|:
  • dist_bin             →  +0.6165
  • pax_ohe              →  -0.4696
  • is_weekend           →  -0.2001
  • is_airport_trip      →  -0.1792
  • is_midtown_trip      →  -0.0724
  • rush_hour            →  +0.0581
  • corridor_ohe         →  +0.0165
  • hour                 →  +0.0048
  • fare_per_mile        →  +0.0008
  • dist_times_pax       →  -0.0000