In [1]:
from pathlib import Path
from google.colab import drive
drive.mount('/content/drive')

BASE_DIR = Path('/content/drive/MyDrive')
DATA_DIR = BASE_DIR
OUT_DIR  = BASE_DIR / 'tlc_outputs_full'
OUT_DIR.mkdir(parents=True, exist_ok=True)

print("DATA_DIR:", DATA_DIR)
print("OUT_DIR :", OUT_DIR)

Mounted at /content/drive
DATA_DIR: /content/drive/MyDrive
OUT_DIR : /content/drive/MyDrive/tlc_outputs_full


Install dependencies

In [2]:
!pip -q install duckdb==1.0.0 pandas pyarrow xgboost scikit-learn joblib

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.5/18.5 MB[0m [31m118.7 MB/s[0m eta [36m0:00:00[0m
[?25h

Imports & open DuckDB

In [3]:
import duckdb as ddb, pandas as pd, numpy as np, joblib
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import xgboost as xgb
from pathlib import Path

pd.set_option('display.max_columns', 200)

con = ddb.connect(database=':memory:')
con.execute("PRAGMA threads = 4;")

<duckdb.duckdb.DuckDBPyConnection at 0x799f8eb72170>

Point to your files

In [4]:
parquet_glob = str(DATA_DIR / 'yellow_tripdata_2023-0*.parquet')
zone_lookup_csv = str(DATA_DIR / 'taxi_zone_lookup.csv')

print("Expecting Parquet pattern:", parquet_glob)
print("Lookup CSV exists? ->", Path(zone_lookup_csv).exists())

con.execute(f"""
    CREATE VIEW trips_raw AS
    SELECT * FROM read_parquet('{parquet_glob}');
""")
con.execute("SELECT COUNT(*) AS rows FROM trips_raw").df()

Expecting Parquet pattern: /content/drive/MyDrive/yellow_tripdata_2023-0*.parquet
Lookup CSV exists? -> True


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,rows
0,19493620


Clean & add time features

In [5]:
con.execute("""
    CREATE OR REPLACE VIEW trips_clean AS
    WITH base AS (
        SELECT
            tpep_pickup_datetime,
            tpep_dropoff_datetime,
            passenger_count,
            trip_distance,
            RatecodeID    AS ratecodeid,
            PULocationID  AS pulocationid,
            DOLocationID  AS dolocationid,
            payment_type,
            fare_amount,
            extra,
            mta_tax,
            tip_amount,
            tolls_amount,
            improvement_surcharge,
            congestion_surcharge,
            total_amount,

            /* engineered calendar features */
            date_trunc('hour', tpep_pickup_datetime) AS pickup_hour,
            EXTRACT(hour  FROM tpep_pickup_datetime) AS hour,
            EXTRACT(dow   FROM tpep_pickup_datetime) AS dow,     -- 0=Sunday
            EXTRACT(month FROM tpep_pickup_datetime) AS month,
            date_diff('minute', tpep_pickup_datetime, tpep_dropoff_datetime) AS trip_minutes
        FROM trips_raw
        WHERE trip_distance IS NOT NULL
          AND total_amount  IS NOT NULL
          AND fare_amount   IS NOT NULL
    ),
    filtered AS (
        SELECT *
        FROM base
        WHERE trip_distance BETWEEN 0.1 AND 60
          AND total_amount  BETWEEN 1 AND 500
          AND fare_amount   >= 0
          AND trip_minutes  BETWEEN 1 AND 180
    )
    SELECT
        *,
        CASE WHEN dow IN (0,6) THEN 1 ELSE 0 END AS is_weekend,
        CASE WHEN (hour BETWEEN 7 AND 10) OR (hour BETWEEN 16 AND 19) THEN 1 ELSE 0 END AS is_rush
    FROM filtered;
""")

con.execute("SELECT * FROM trips_clean LIMIT 5").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,congestion_surcharge,total_amount,pickup_hour,hour,dow,month,trip_minutes,is_weekend,is_rush
0,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,2.5,14.3,2023-01-01,0,0,1,8,1,0
1,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,2.5,16.9,2023-01-01,0,0,1,6,1,0
2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,2.5,34.9,2023-01-01,0,0,1,12,1,0
3,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,0.0,20.85,2023-01-01,0,0,1,10,1,0
4,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,2.5,19.68,2023-01-01,0,0,1,11,1,0


Join Taxi Zone lookup

In [6]:
con.execute(f"""
    CREATE OR REPLACE VIEW zones AS
    SELECT
        CAST(LocationID AS INTEGER) AS locationid,
        Borough AS borough,
        Zone    AS zone
    FROM read_csv_auto('{zone_lookup_csv}');
""")

con.execute("""
    CREATE OR REPLACE VIEW trips_enriched AS
    SELECT
        t.*,
        zpu.borough AS pu_borough,
        zpu.zone    AS pu_zone,
        zdo.borough AS do_borough,
        zdo.zone    AS do_zone
    FROM trips_clean t
    LEFT JOIN zones zpu ON t.pulocationid = zpu.locationid
    LEFT JOIN zones zdo ON t.dolocationid = zdo.locationid;
""")

con.execute("SELECT pulocationid, pu_borough, dolocationid, do_borough FROM trips_enriched LIMIT 5").df()

Unnamed: 0,pulocationid,pu_borough,dolocationid,do_borough
0,161,Manhattan,141,Manhattan
1,138,Queens,7,Queens
2,161,Manhattan,137,Manhattan
3,164,Manhattan,236,Manhattan
4,141,Manhattan,107,Manhattan


Build full zone×hour demand & supply tables

In [7]:
# Demand = number of pickups per zone-hour (ALL trips)
con.execute("""
    CREATE OR REPLACE TABLE zone_hour AS
    SELECT pulocationid, pickup_hour,
           COUNT(*)::BIGINT AS demand
    FROM trips_enriched
    GROUP BY 1,2;
""")

# Supply proxy = number of dropoffs previous hour (drivers become free next hour)
con.execute("""
    CREATE OR REPLACE TABLE zone_hour_supply AS
    SELECT dolocationid AS pulocationid,
           pickup_hour + INTERVAL 1 HOUR AS pickup_hour,
           COUNT(*)::BIGINT AS supply
    FROM trips_enriched
    GROUP BY 1,2;
""")

# Join demand & supply into one table
con.execute("""
    CREATE OR REPLACE TABLE zone_hour_agg AS
    SELECT
        d.pulocationid,
        d.pickup_hour,
        d.demand,
        COALESCE(s.supply, 0) AS supply,
        d.demand - COALESCE(s.supply,0) AS gap
    FROM zone_hour d
    LEFT JOIN zone_hour_supply s
      ON d.pulocationid = s.pulocationid AND d.pickup_hour = s.pickup_hour;
""")

zone_hour_path = str(OUT_DIR / 'zone_hour_agg.parquet')
con.execute(f"COPY (SELECT * FROM zone_hour_agg) TO '{zone_hour_path}' (FORMAT PARQUET);")
print("Saved:", zone_hour_path)
con.execute("SELECT * FROM zone_hour_agg ORDER BY pickup_hour DESC LIMIT 5").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Saved: /content/drive/MyDrive/tlc_outputs_full/zone_hour_agg.parquet


Unnamed: 0,pulocationid,pickup_hour,demand,supply,gap
0,170,2023-09-05 18:00:00,1,0,1
1,68,2023-08-15 10:00:00,2,0,2
2,43,2023-07-27 02:00:00,1,0,1
3,75,2023-07-27 01:00:00,1,0,1
4,107,2023-07-05 23:00:00,1,0,1


Create full zone×hour feature table

In [8]:
con.execute("""
    CREATE OR REPLACE TABLE zone_hour_features AS
    WITH base AS (
        SELECT
            a.pulocationid,
            a.pickup_hour,
            EXTRACT(hour  FROM a.pickup_hour)::INT AS hour,
            EXTRACT(dow   FROM a.pickup_hour)::INT AS dow,   -- 0=Sunday
            EXTRACT(month FROM a.pickup_hour)::INT AS month,
            CASE WHEN EXTRACT(dow FROM a.pickup_hour) IN (0,6) THEN 1 ELSE 0 END AS is_weekend,
            a.demand, a.supply, a.gap
        FROM zone_hour_agg a
    ),
    surge AS (
        SELECT
            *,
            /* pressure metric: higher when demand >> supply & during rush */
            (CAST(gap AS DOUBLE) / (CAST(supply AS DOUBLE) + 1.0))
              + 0.5 * CASE WHEN (hour BETWEEN 7 AND 10) OR (hour BETWEEN 16 AND 19)
                           THEN 1 ELSE 0 END AS x
        FROM base
    )
    SELECT
        *,
        1.0 + 0.8 * (1.0 / (1.0 + EXP(-x)))                                        AS surge_multiplier,
        CASE WHEN x > 0.5 THEN 2.0 + 3.0 * (1.0 / (1.0 + EXP(-x))) ELSE 0.0 END    AS driver_bonus
    FROM surge;
""")

zone_feat_path = str(OUT_DIR / 'zone_hour_features.parquet')
con.execute(f"COPY (SELECT * FROM zone_hour_features) TO '{zone_feat_path}' (FORMAT PARQUET);")
print("Saved:", zone_feat_path)
con.execute("SELECT * FROM zone_hour_features LIMIT 5").df()

Saved: /content/drive/MyDrive/tlc_outputs_full/zone_hour_features.parquet


Unnamed: 0,pulocationid,pickup_hour,hour,dow,month,is_weekend,demand,supply,gap,x,surge_multiplier,driver_bonus
0,261,2023-03-01,0,3,3,0,2,11,-9,-0.75,1.256657,0.0
1,107,2023-03-01,0,3,3,0,27,69,-42,-0.6,1.283475,0.0
2,114,2023-03-01,0,3,3,0,57,29,28,0.933333,1.574201,4.153253
3,231,2023-03-01,0,3,3,0,31,64,-33,-0.507692,1.300588,0.0
4,141,2023-03-01,0,3,3,0,29,95,-66,-0.6875,1.267672,0.0


Train

In [11]:
zf = pd.read_parquet(zone_feat_path)
print("Rows:", len(zf), "Cols:", zf.shape[1])

feature_cols = ['hour','dow','month','is_weekend','pulocationid','demand','supply','gap']
target_col   = 'surge_multiplier'

# Cast to compact dtypes
X = zf[feature_cols].astype({
    'hour':'int16','dow':'int16','month':'int16','is_weekend':'int8','pulocationid':'int32',
    'demand':'int32','supply':'int32','gap':'int32'
})
y = zf[target_col].astype(float)

from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.15, random_state=42)

xgb_zone = xgb.XGBRegressor(
    n_estimators=600,
    max_depth=8,
    learning_rate=0.05,
    subsample=0.8,
    colsample_bytree=0.8,
    tree_method='hist',
    random_state=42,
)
xgb_zone.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)

from sklearn.metrics import mean_squared_error
mse  = mean_squared_error(y_val, xgb_zone.predict(X_val))
rmse = float(np.sqrt(mse))
print(f"RMSE on surge_multiplier: {rmse:.4f}")

model_path = OUT_DIR / 'zone_hour_surge_xgb.joblib'
joblib.dump({'model': xgb_zone, 'feature_cols': feature_cols, 'target': target_col}, model_path)
print("Saved:", model_path)

Rows: 410155 Cols: 12
RMSE on surge_multiplier: 0.0017
Saved: /content/drive/MyDrive/tlc_outputs_full/zone_hour_surge_xgb.joblib


Build a base-fare table per zone×hour

In [13]:
con.execute("""
    CREATE OR REPLACE TABLE zone_hour_basefare AS
    SELECT
        pulocationid,
        date_trunc('hour', tpep_pickup_datetime) AS pickup_hour,
        MEDIAN(fare_amount) AS base_fare
    FROM trips_clean
    GROUP BY 1,2;
""")

basefare_path = str(OUT_DIR / 'zone_hour_basefare.parquet')
con.execute(f"COPY (SELECT * FROM zone_hour_basefare) TO '{basefare_path}' (FORMAT PARQUET);")
print("Saved:", basefare_path)

# Join features + base_fare for preview of final recommendation
preview = con.execute("""
    SELECT
        f.pulocationid,
        f.pickup_hour AS pickup_hour,       -- disambiguated
        f.hour, f.demand, f.supply, f.gap,
        f.surge_multiplier,
        b.base_fare,
        (b.base_fare * f.surge_multiplier) AS recommended_price,
        f.driver_bonus
    FROM zone_hour_features AS f
    LEFT JOIN zone_hour_basefare AS b
      ON f.pulocationid = b.pulocationid
     AND f.pickup_hour  = b.pickup_hour
    ORDER BY f.pickup_hour DESC
    LIMIT 10;
""").df()

preview


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Saved: /content/drive/MyDrive/tlc_outputs_full/zone_hour_basefare.parquet


Unnamed: 0,pulocationid,pickup_hour,hour,demand,supply,gap,surge_multiplier,base_fare,recommended_price,driver_bonus
0,170,2023-09-05 18:00:00,18,1,0,1,1.65406,5.8,9.593546,4.452723
1,68,2023-08-15 10:00:00,10,2,0,2,1.739313,16.65,28.959569,4.772425
2,43,2023-07-27 02:00:00,2,1,0,1,1.584847,25.4,40.25511,4.193176
3,75,2023-07-27 01:00:00,1,1,0,1,1.584847,5.1,8.082719,4.193176
4,107,2023-07-05 23:00:00,23,1,0,1,1.584847,13.5,21.395433,4.193176
5,237,2023-07-05 18:00:00,18,1,0,1,1.65406,11.4,18.856279,4.452723
6,161,2023-07-05 17:00:00,17,1,0,1,1.65406,9.3,15.382754,4.452723
7,114,2023-07-02 01:00:00,1,1,0,1,1.584847,19.1,30.270575,4.193176
8,138,2023-07-01 00:00:00,0,1,3,-2,1.302033,42.2,54.945773,0.0
9,132,2023-07-01 00:00:00,0,2,2,0,1.4,61.35,85.89,0.0


A/B revenue simulation at zone×hour

In [14]:
sim = con.execute("""
    SELECT f.pulocationid, f.pickup_hour,
           f.surge_multiplier, f.demand, f.supply, f.gap,
           b.base_fare
    FROM zone_hour_features f
    LEFT JOIN zone_hour_basefare b
      ON f.pulocationid = b.pulocationid AND f.pickup_hour = b.pickup_hour
    WHERE b.base_fare IS NOT NULL
""").df()

# Predicted price under ML policy
sim['ml_price'] = sim['base_fare'] * sim['surge_multiplier']

def accept_prob(price, base):
    rel = (price - base) / (base + 1e-6)
    p = 0.95 - 0.7 * (1 / (1 + np.exp(-4*rel)))  # logistic decay with price ↑
    return np.clip(p, 0.05, 0.95)

sim['p_acc_base'] = accept_prob(sim['base_fare'], sim['base_fare'])
sim['p_acc_ml']   = accept_prob(sim['ml_price'],  sim['base_fare'])

# Expected revenue per *potential* request (zone-hour units)
sim['rev_base'] = sim['base_fare'] * sim['p_acc_base']
sim['rev_ml']   = sim['ml_price']  * sim['p_acc_ml']

uplift = (sim['rev_ml'].mean() - sim['rev_base'].mean()) / (sim['rev_base'].mean() + 1e-9)
print(f"Simulated revenue uplift (ML vs Baseline): {uplift*100:.2f}%")

sim.head()

Simulated revenue uplift (ML vs Baseline): -14.29%


Unnamed: 0,pulocationid,pickup_hour,surge_multiplier,demand,supply,gap,base_fare,ml_price,p_acc_base,p_acc_ml,rev_base,rev_ml
0,233,2023-06-17 21:00:00,1.324597,46,75,-29,12.8,16.95484,0.6,0.400106,7.68,6.783726
1,45,2023-06-17 21:00:00,1.352605,15,20,-5,21.2,28.675218,0.6,0.387318,12.72,11.106418
2,209,2023-06-17 21:00:00,1.390478,19,20,-1,13.5,18.771453,0.6,0.371361,8.1,6.970981
3,145,2023-06-17 21:00:00,1.24802,2,14,-12,16.3,20.342733,0.6,0.439351,9.78,8.937596
4,193,2023-06-17 21:00:00,1.32105,2,4,-2,17.0,22.457848,0.6,0.401785,10.2,9.023235


In [15]:
for p in sorted(OUT_DIR.glob('*')):
    print(" -", p.name)

 - zone_hour_agg.parquet
 - zone_hour_basefare.parquet
 - zone_hour_features.parquet
 - zone_hour_surge_xgb.joblib


In [16]:
from google.colab import files
from pathlib import Path

file_list = [
    OUT_DIR/'zone_hour_agg.parquet',
    OUT_DIR/'zone_hour_basefare.parquet',
    OUT_DIR/'zone_hour_features.parquet',
    OUT_DIR/'zone_hour_surge_xgb.joblib',
]

for f in file_list:
    if f.exists():
        print("Downloading:", f.name)
        files.download(str(f))
    else:
        print("Missing:", f)

Downloading: zone_hour_agg.parquet


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloading: zone_hour_basefare.parquet


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloading: zone_hour_features.parquet


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloading: zone_hour_surge_xgb.joblib


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>