# NYC FHV Rideshare Fare Prediction - Deep & Cross Network v2

This notebook implements an end-to-end workflow for training a Deep & Cross Network v2 model on the NYC FHV (For-Hire Vehicle) dataset to predict ride fares. The model ingests trip data, processes features, and predicts the total fare amount presented to riders.

# 0 Environment & TPU setup

In [None]:
# Colab Pro: activate TPU
try:
    import jax  # quick TPU test
except Exception:
    %tensorflow_version 2.x
    import os, json, tensorflow as tf

# Mixed precision for TPU
tf.keras.mixed_precision.set_global_policy('mixed_bfloat16')

# Install helper libs once per new VM
!pip install polars==0.20.19 gcsfs==2024.4.1 --quiet

In [None]:
# Mount the GCS bucket that already holds the 20.5 GB Parquet
BUCKET = "nyc-taxi-fhv-460946772036"
!gcsfuse --implicit-dirs $BUCKET /mnt/fhv
PARQUET = "/mnt/fhv/fhvhv_all_years.zstd.parquet"

# 1 Data access (GCS → Polars DataFrame)

In [None]:
import polars as pl, datetime as dt
df = pl.read_parquet(PARQUET, low_memory=False)
print(df.shape)      # 745 M rows × 24 cols  

# 2 Cleaning & target creation

In [None]:
NUMERIC_OUTLIER_RULES = {
    "trip_miles":  (0.1, 200),            # drop >200 mi  
    "trip_time":   (60, 4*3600),          # 1 min – 4 h
}
def clip_interval(col, lo, hi):
    return pl.when(pl.col(col).is_between(lo, hi)).then(pl.col(col)).otherwise(None)

for c,(lo,hi) in NUMERIC_OUTLIER_RULES.items():
    df = df.with_columns(clip_interval(c, lo, hi))

money_cols = ["base_passenger_fare","tolls","bcf","sales_tax",
              "congestion_surcharge","airport_fee"]
df = df.with_columns([pl.col(c).clip(0) for c in money_cols])

df = df.with_columns([
    ( sum(pl.col(c) for c in money_cols) ).alias("target_amount"),
    (pl.col("trip_miles") / (pl.col("trip_time")/3600)).alias("mph")
]).drop_nulls("target_amount")

# 3 Feature engineering

In [None]:
# 3.1  Temporal splits
df = df.with_columns([
    pl.col("pickup_datetime").dt.hour().alias("pickup_hour"),
    pl.col("pickup_datetime").dt.weekday().alias("pickup_wday"),
    pl.col("pickup_datetime").dt.month().alias("pickup_month"),
])

# 3.2  Categorical cleanup (fill UNK)
high_card = ["dispatching_base_num","PULocationID","DOLocationID"]
for col in high_card + ["hvfhs_license_num"]:
    df = df.with_columns(pl.col(col).fill_null("UNK"))

# 4 Train / validation split

In [None]:
# Time-based split: last month of 2022 → validation
cutoff = dt.datetime(2022,12,1)
train_df = df.filter(pl.col("pickup_datetime") <  cutoff)
valid_df = df.filter(pl.col("pickup_datetime") >= cutoff)

print(train_df.shape, valid_df.shape)

# 5 Write TFRecord shards

In [None]:
import tensorflow as tf, math, os, itertools, json, typing as T
from tqdm import tqdm

def df_to_tfr_iter(table: pl.DataFrame, batch=200_000):
    n = table.height
    for i in tqdm(range(0, n, batch)):
        chunk = table.slice(i, batch)
        yield dict(chunk.to_arrow().to_pydict())  # col->list

def write_tfr(split, table):
    OUTDIR = f"/content/tfr/{split}"
    os.makedirs(OUTDIR, exist_ok=True)
    for shard_id, records in enumerate(df_to_tfr_iter(table)):
        fn = f"{OUTDIR}/{split}-{shard_id:05d}.tfr"
        with tf.io.TFRecordWriter(fn, compression_type="GZIP") as w:
            for j in range(len(records["target_amount"])):
                feat = {k: tf.train.Feature(
                           float_list=tf.train.FloatList(value=[records[k][j]])
                       ) if isinstance(records[k][j], float)
                       else tf.train.Feature(
                           bytes_list=tf.train.BytesList(value=[str(records[k][j]).encode()])
                       )
                       for k in records}
                example = tf.train.Example(features=tf.train.Features(feature=feat))
                w.write(example.SerializeToString())

In [None]:
# Write training TFRecords
write_tfr('train', train_df)

In [None]:
# Write validation TFRecords
write_tfr('valid', valid_df)

# 6 Build tf.data input pipeline

In [None]:
FEATURE_DESCRIPTION = {
    # floats
    **{c: tf.io.FixedLenFeature([], tf.float32) for c in
       ["trip_miles","trip_time","mph","base_passenger_fare","tolls",
        "bcf","sales_tax","congestion_surcharge","airport_fee"]},
    # ints
    **{c: tf.io.FixedLenFeature([], tf.int64) for c in
       ["pickup_hour","pickup_wday","pickup_month"]},
    # strings
    **{c: tf.io.FixedLenFeature([], tf.string) for c in
       ["hvfhs_license_num","dispatching_base_num","PULocationID","DOLocationID",
        "shared_request_flag","shared_match_flag","wav_request_flag","access_a_ride_flag"]},
    # label
    "target_amount": tf.io.FixedLenFeature([], tf.float32),
}

def parse_fn(example_proto):
    return tf.io.parse_single_example(example_proto, FEATURE_DESCRIPTION)

def make_dataset(split, batch, shuffle=False):
    files = tf.io.gfile.glob(f"/content/tfr/{split}/*.tfr")
    ds = (tf.data.TFRecordDataset(files, compression_type="GZIP",
                                  num_parallel_reads=tf.data.AUTOTUNE)
          .map(parse_fn, num_parallel_calls=tf.data.AUTOTUNE))
    if shuffle: ds = ds.shuffle(1_000_000)
    return (ds.batch(batch, drop_remainder=True)
              .prefetch(tf.data.AUTOTUNE))

# 7 Define Deep & Cross Network v2

In [None]:
from tensorflow.keras import layers as L, Model

def dcn_v2(inputs):
    # --- Embeddings ---
    emb_dims = {"hvfhs_license_num":2, "dispatching_base_num":16,
                "PULocationID":16, "DOLocationID":16}
    embed_out = []
    for feat, dim in emb_dims.items():
        v = L.StringLookup(output_mode='int', num_oov_indices=1)(inputs[feat])
        v = L.Embedding(input_dim=v.vocabulary_size(), output_dim=dim)(v)
        embed_out.append(L.Flatten()(v))

    # --- Flags 0/1 ---
    flags = ["shared_request_flag","shared_match_flag",
             "wav_request_flag","access_a_ride_flag"]
    flag_out = [L.Cast(dtype='float32')(inputs[f]) for f in flags]

    # --- Numeric normalised ---
    num_cols = ["trip_miles","trip_time","mph","base_passenger_fare",
                "tolls","bcf","sales_tax","congestion_surcharge","airport_fee"]
    norm = L.Normalization()
    norm.adapt(train_df.select(num_cols).to_numpy())  # offline!
    num_out = norm(L.Concatenate()( [inputs[c] for c in num_cols] ))

    # --- Temporal (sin/cos) ---
    hour = tf.cast(inputs["pickup_hour"], tf.float32)
    sin_hour = tf.sin(2*3.1416*hour/24); cos_hour = tf.cos(2*3.1416*hour/24)
    
    wday = tf.cast(inputs["pickup_wday"], tf.float32)
    sin_wday = tf.sin(2*3.1416*wday/7); cos_wday = tf.cos(2*3.1416*wday/7)
    
    month = tf.cast(inputs["pickup_month"], tf.float32)
    sin_month = tf.sin(2*3.1416*month/12); cos_month = tf.cos(2*3.1416*month/12)

    concat = L.Concatenate()(embed_out + flag_out + [num_out, sin_hour, cos_hour, sin_wday, cos_wday, sin_month, cos_month])

    # ---- DCN-v2 Cross Stack ----
    cross = concat
    for _ in range(3):
        cross = tf.keras.experimental.LinearCombination()([concat, cross])

    # ---- Deep Tower ----
    deep = concat
    for units, drop in [(512,0.2),(256,0.2),(128,0.1),(64,0)]:
        deep = L.Dense(units, activation='gelu', kernel_regularizer='l2')(deep)
        deep = L.BatchNormalization()(deep)
        if drop: deep = L.Dropout(drop)(deep)

    fused = L.Concatenate()([cross, deep])
    out = L.Dense(64, activation='gelu')(fused)
    out = L.Dense(1, name='fare')(out)
    return out

inputs = {k: L.Input(shape=(), name=k, dtype=tf.string if 'flag' in k or k.endswith('_num') or 'ID' in k else tf.float32)
          for k in FEATURE_DESCRIPTION if k!='target_amount'}
model = Model(inputs, dcn_v2(inputs))
model.compile(
    optimizer=tf.keras.optimizers.AdamW(1e-3, weight_decay=1e-5, global_clipnorm=1.0),
    loss=tf.keras.losses.Huber(delta=5.0),
    metrics=[tf.keras.metrics.MeanAbsoluteError(name='MAE'),
             tf.keras.metrics.MeanAbsolutePercentageError(name='MAPE')]
)
model.summary()

# 8 Train, monitor & early-stop

In [None]:
BATCH = 16_384
train_ds = make_dataset('train', BATCH, shuffle=True)
valid_ds = make_dataset('valid', BATCH)

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_MAE', patience=3, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint('checkpoints/dcnv2_{epoch:02d}.keras',
                                       save_best_only=True, monitor='val_MAE'),
    tf.keras.callbacks.TensorBoard('logs')
]

EPOCHS = 20
history = model.fit(train_ds,
                    validation_data=valid_ds,
                    epochs=EPOCHS,
                    steps_per_epoch=math.ceil(len(train_df)/BATCH),
                    validation_steps=math.ceil(len(valid_df)/BATCH),
                    callbacks=callbacks)

# 9 Evaluate & error analysis

In [None]:
import matplotlib.pyplot as plt, numpy as np

val_pred = model.predict(valid_ds, verbose=0).flatten()
val_true = valid_df["target_amount"].to_numpy()

print("MAE $", np.mean(np.abs(val_pred-val_true)))
plt.hist(val_true-val_pred, bins=100)
plt.title("Prediction residuals ($)")
plt.show()

# 9 Hyper-parameter sweep (optional)

In [None]:
!pip install keras-tuner --quiet
import keras_tuner as kt

def model_builder(hp):
    # Input layer setup from previous model
    inputs = {k: L.Input(shape=(), name=k, dtype=tf.string if 'flag' in k or k.endswith('_num') or 'ID' in k else tf.float32)
              for k in FEATURE_DESCRIPTION if k!='target_amount'}
    
    # --- Embeddings ---
    emb_base_dim = hp.Int("emb_base_dim", min_value=8, max_value=32, step=8)
    emb_dims = {"hvfhs_license_num": 2,  # Small cardinality
                "dispatching_base_num": emb_base_dim,
                "PULocationID": emb_base_dim,
                "DOLocationID": emb_base_dim}
    embed_out = []
    for feat, dim in emb_dims.items():
        v = L.StringLookup(output_mode='int', num_oov_indices=1)(inputs[feat])
        v = L.Embedding(input_dim=v.vocabulary_size(), output_dim=dim)(v)
        embed_out.append(L.Flatten()(v))

    # --- Flags 0/1 ---
    flags = ["shared_request_flag","shared_match_flag","wav_request_flag","access_a_ride_flag"]
    flag_out = [L.Cast(dtype='float32')(inputs[f]) for f in flags]

    # --- Numeric normalized ---
    num_cols = ["trip_miles","trip_time","mph","base_passenger_fare",
                "tolls","bcf","sales_tax","congestion_surcharge","airport_fee"]
    norm = L.Normalization()
    norm.adapt(train_df.select(num_cols).to_numpy())  # offline!
    num_out = norm(L.Concatenate()( [inputs[c] for c in num_cols] ))

    # --- Temporal (sin/cos) ---
    hour = tf.cast(inputs["pickup_hour"], tf.float32)
    sin_hour = tf.sin(2*3.1416*hour/24); cos_hour = tf.cos(2*3.1416*hour/24)
    
    wday = tf.cast(inputs["pickup_wday"], tf.float32)
    sin_wday = tf.sin(2*3.1416*wday/7); cos_wday = tf.cos(2*3.1416*wday/7)
    
    month = tf.cast(inputs["pickup_month"], tf.float32)
    sin_month = tf.sin(2*3.1416*month/12); cos_month = tf.cos(2*3.1416*month/12)

    concat = L.Concatenate()(embed_out + flag_out + [num_out, sin_hour, cos_hour, sin_wday, cos_wday, sin_month, cos_month])

    # ---- DCN-v2 Cross Stack ----
    cross_layers = hp.Int("cross_layers", 2, 4, step=1)
    cross = concat
    for _ in range(cross_layers):
        cross = tf.keras.experimental.LinearCombination()([concat, cross])

    # ---- Deep Tower ----
    deep = concat
    hp_units1 = hp.Int("units1", min_value=256, max_value=768, step=256)
    hp_units2 = hp.Int("units2", min_value=128, max_value=384, step=128)
    hp_drop1 = hp.Float("dropout1", 0.1, 0.3, step=0.1)
    hp_drop2 = hp.Float("dropout2", 0.0, 0.2, step=0.1)
    
    layer_config = [(hp_units1, hp_drop1), (hp_units2, hp_drop2), (128, 0.1), (64, 0)]
    
    for units, drop in layer_config:
        deep = L.Dense(units, activation='gelu', kernel_regularizer='l2')(deep)
        deep = L.BatchNormalization()(deep)
        if drop > 0: deep = L.Dropout(drop)(deep)

    fused = L.Concatenate()([cross, deep])
    out = L.Dense(64, activation='gelu')(fused)
    out = L.Dense(1, name='fare')(out)
    
    model = Model(inputs, out)
    
    hp_learning_rate = hp.Choice("learning_rate", [1e-4, 3e-4, 1e-3])
    model.compile(
        optimizer=tf.keras.optimizers.AdamW(hp_learning_rate, weight_decay=1e-5, global_clipnorm=1.0),
        loss=tf.keras.losses.Huber(delta=5.0),
        metrics=[tf.keras.metrics.MeanAbsoluteError(name='MAE'),
                 tf.keras.metrics.MeanAbsolutePercentageError(name='MAPE')]
    )
    
    return model

tuner = kt.BayesianOptimization(model_builder,
                                objective="val_MAE",
                                max_trials=20,
                                directory="ktuner",
                                overwrite=True)
tuner.search(train_ds, validation_data=valid_ds, epochs=5)
best_model = tuner.get_best_models(1)[0]

# 10 Save model & notebook wrap-up

In [None]:
# Create directory for saved model
!mkdir -p dcnv2_nyc_fhv_savedmodel

# Save the model in SavedModel format
model.save("dcnv2_nyc_fhv_savedmodel")

# Zip the model for easy distribution
!zip -r model.zip dcnv2_nyc_fhv_savedmodel

## Project Summary

| Topic | Key points |
|-------|------------|
| **Business objective** | Accurate upfront fare estimate; aids riders, TNCs & city planners. |
| **Raw data** | 745 M FHV trips (2019-22) → 24 columns. |
| **Cleaning rules** | Clip extreme miles/time, drop negative money, handle sparse cols. |
| **Target** | Sum of base fare + taxes/fees (tips excluded). |
| **Feature groups** | 9 numeric, 4 binary flags, 4 high-card categoricals, 3 temporal. |
| **Model choice** | Deep & Cross Network v2 (explicit feature crosses + deep tower). |
| **Hardware** | Colab Pro TPU v2-8, batch 16 384 → 3.5 h/epoch full data. |
| **Loss / metrics** | Huber δ=5, track MAE, MAPE, RMSE. |
| **Training hygiene** | Mixed-precision, early stop, AdamW + warm-up cosine LR, weight decay. |