In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import pandas as pd
import os

# Starts a Spark session
spark = (
    SparkSession.builder
        .appName("Taxi vs Rideshare Profitability")
        .config("spark.sql.repl.eagerEval.enabled", False)   
        .config("spark.sql.parquet.cacheMetadata", "true")
        .config("spark.sql.session.timeZone", "Etc/UTC")
        .config("spark.sql.shuffle.partitions", "320")
        .config("spark.sql.adaptive.enabled", "true") 
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")       
        .config("spark.driver.memory", "6g")                
        .config("spark.executor.memory", "6g")
        .getOrCreate()
)

# Define months
months = ["2024-01","2024-02","2024-03","2024-04","2024-05","2024-06"]

# Load in data files 
yellow_files = [f"data/yellow/yellow_tripdata_{m}.parquet" for m in months]
fhvhv_files  = [f"data/fhvhv/fhvhv_tripdata_{m}.parquet"   for m in months]

df_yellow = (
    spark.read.parquet(*yellow_files)
         .withColumn("service_type", lit("yellow"))
)
df_fhvhv = (
    spark.read.parquet(*fhvhv_files)
         .withColumn("service_type", lit("hv_fhv"))
)

# Merge
df = df_yellow.unionByName(df_fhvhv, allowMissingColumns=True)

# External tables 
electricity = spark.read.csv("data/external/electricity.csv", header=True, inferSchema=True)
fuel = spark.read.csv("data/external/fuel.csv", header=True, inferSchema=True)



Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/30 19:57:21 WARN Utils: Your hostname, LAPTOP-E04ANIN1, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/30 19:57:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/30 19:57:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [None]:
# Preprocess data
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_timestamp, coalesce, unix_timestamp, when, lit, date_format, hour, dayofweek, broadcast
from pyspark.sql.types import IntegerType, DoubleType
from pyspark import StorageLevel
from pyspark.sql import Window
import geopandas as gpd

# Print size of raw data
#print(f"Raw data count: {df.count():,}")
#_prev = df.count()  # baseline for drop tracking

# Reduce memory usage
_needed = [
    "service_type",
    "tpep_pickup_datetime","tpep_dropoff_datetime",
    "pickup_datetime","dropoff_datetime",
    "trip_distance","trip_miles","trip_time",
    "PULocationID","DOLocationID",
    "passenger_count","payment_type",
    "fare_amount","extra","tip_amount",
    "driver_pay","tips"
]
df = df.select([c for c in _needed if c in df.columns])

# Standardise timestamps
df = (
    df.withColumn("pickup_ts",  to_timestamp(coalesce(col("tpep_pickup_datetime"),  col("pickup_datetime"))))
      .withColumn("dropoff_ts", to_timestamp(coalesce(col("tpep_dropoff_datetime"), col("dropoff_datetime"))))
)

# Remove rows with null pickup or dropoff timestamps
df = df.filter(col("pickup_ts").isNotNull() & col("dropoff_ts").isNotNull())
#print(f"[Null pickup/dropoff removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Standardise location IDs
df = df.withColumn("month", date_format(col("pickup_ts"), "yyyy-MM"))

# Standardise distance and keep positive distances only and not null
df = (
    df.withColumn("distance_mi", coalesce(col("trip_distance"), col("trip_miles")).cast(DoubleType()))
      .filter(col("distance_mi").isNotNull() & (col("distance_mi") > 0))
)
#print(f"[Invalid distance (<=0 or null)] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Standardise trip time and not null
df = df.withColumn(
    "trip_time_s",
    when(col("trip_time").isNotNull(), col("trip_time").cast("double"))
    .otherwise((unix_timestamp(col("dropoff_ts")) - unix_timestamp(col("pickup_ts"))).cast("double"))
)
df = df.filter(col("trip_time_s").isNotNull() & (col("trip_time_s") > 0))
#print(f"[Invalid trip_time_s (<=0 or null)] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Deduplicate rows 
dedupe_key = [c for c in ["service_type","pickup_ts","dropoff_ts","PULocationID","DOLocationID","distance_mi","trip_time_s"] if c in df.columns]

# If dedupe_key is empty, we won't deduplicate
if dedupe_key:
    w = Window.partitionBy(["month"] + dedupe_key).orderBy(F.lit(1))
    df = df.withColumn("__rn", F.row_number().over(w)).filter(col("__rn") == 1).drop("__rn")
    #print(f"[Exact duplicates removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Fixed Parameters
CREDIT_CARD_FEE = 0.025 
MAINTENANCE_COST_PER_MILE = 0.15
MAINTENANCE_COST_PER_MILE_HV = 0.15

# Payment type exists
if "payment_type" not in df.columns:
    df = df.withColumn("payment_type", lit(None).cast(IntegerType()))

# Time features 
df = (df
    .withColumn("pickup_hour", hour(col("pickup_ts")))
    .withColumn("pickup_dow", dayofweek(col("pickup_ts")))  
    .withColumn("is_weekend", (col("pickup_dow").isin([1,7])).cast("boolean"))
)

# Add revenue 
rev_yellow = coalesce(col("fare_amount"), lit(0.0)) + coalesce(col("extra"), lit(0.0)) + coalesce(col("tip_amount"), lit(0.0))
rev_hv     = coalesce(col("driver_pay"), lit(0.0)) + coalesce(col("tips"), lit(0.0))
df = df.withColumn(
    "revenue",
    when(col("service_type") == "yellow", rev_yellow).otherwise(rev_hv).cast(DoubleType())
)

# Add costs
maint_rate = when(col("service_type") == "yellow",
                  lit(MAINTENANCE_COST_PER_MILE)
              ).otherwise(
                  lit(MAINTENANCE_COST_PER_MILE_HV)
              )
df = df.withColumn("expense_maintenance", (col("distance_mi") * maint_rate).cast(DoubleType()))

# Credit card fee 
df = df.withColumn(
     "expense_cc_processing",
    when((col("service_type") == "yellow") & (col("payment_type") == 1),
         (lit(CREDIT_CARD_FEE) * col("revenue")).cast(DoubleType()))
    .otherwise(lit(0.0))
)

# Expenses pre-fuel 
df = df.withColumn(
    "expenses_nonfuel",
    (col("expense_maintenance") + col("expense_cc_processing")).cast(DoubleType())
)

# Keep distance
df = df.filter(col("distance_mi") >= 0.1)
#print(f"[Distance < 0.1 mi removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Make sure within month range
df = df.filter( (col("month") >= "2024-01") & (col("month") <= "2024-06") )
#print(f"[Outside Jan–Jun 2024 removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Keep duration >= 60 seconds
df = df.filter(col("trip_time_s") >= 60)
#print(f"[Trip time < 60 s removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Keep positive passenger count 
if "passenger_count" in df.columns:
    df = df.filter(
        when(col("service_type") == "yellow", col("passenger_count") > 0)
        .otherwise(True)
    )
    #print(f"[Non-positive passenger_count (Yellow) removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

TAXI_ZONES_PATH = "data/taxi_zones/taxi_zones.shp"
zones_gdf = gpd.read_file(TAXI_ZONES_PATH)[["LocationID"]]

valid_ids = (
    zones_gdf["LocationID"]
      .dropna()
      .astype("int64")
      .unique()
      .tolist()
)
# Valid TLC zone IDs 
for c in ["PULocationID", "DOLocationID"]:
    if c in df.columns:
        df = df.filter(col(c).isin(valid_ids))
        #print(f"[Invalid {c} removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Non-negative money fields 
money_ok = (
    (coalesce(col("fare_amount"), lit(0.0))  >= 0) &
    (coalesce(col("extra"),       lit(0.0))  >= 0) &
    (coalesce(col("tip_amount"),  lit(0.0))  >= 0) &
    (coalesce(col("driver_pay"),  lit(0.0))  >= 0) &
    (coalesce(col("tips"),        lit(0.0))  >= 0)
)
df = df.filter(money_ok)
#print(f"[Negative money fields removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Minimum initial fare for Yellow 
df = df.filter(
    when(col("service_type") == "yellow", coalesce(col("fare_amount"), lit(0.0)) >= 2.50)
    .otherwise(True)
)
#print(f"[Yellow fare < $2.50 removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Pre-fuel profitability
df = (df
    .withColumn("active_hours", (col("trip_time_s") / 3600.0).cast(DoubleType()))
    .withColumn("net_before_fuel", (col("revenue") - col("expenses_nonfuel")).cast(DoubleType()))
    .withColumn("net_per_hr_before_fuel", (col("net_before_fuel") / col("active_hours")).cast(DoubleType()))
    .withColumn("mph", (col("distance_mi") / (col("trip_time_s")/3600.0)).cast(DoubleType()))
)

# Single-pass outlier trim using 99.9% quantile
stacked = (
    df.select("service_type", F.lit("distance_mi").alias("metric"), col("distance_mi").alias("value"))
      .unionByName(df.select("service_type", F.lit("trip_time_s").alias("metric"), col("trip_time_s").alias("value")))
      .unionByName(df.select("service_type", F.lit("revenue").alias("metric"),     col("revenue").alias("value")))
)
bounds = (
    stacked.groupBy("service_type", "metric")
           .agg(F.expr("percentile_approx(value, 0.999, 10000)").alias("p999"))
)
df = (
    df.alias("t")
      .join(bounds.alias("b1").filter(col("b1.metric") == "distance_mi")
                 .select(col("service_type").alias("s1"), col("p999").alias("p_d")),
            on=[col("t.service_type") == col("s1")], how="left")
      .join(bounds.alias("b2").filter(col("b2.metric") == "trip_time_s")
                 .select(col("service_type").alias("s2"), col("p999").alias("p_t")),
            on=[col("t.service_type") == col("s2")], how="left")
      .join(bounds.alias("b3").filter(col("b3.metric") == "revenue")
                 .select(col("service_type").alias("s3"), col("p999").alias("p_r")),
            on=[col("t.service_type") == col("s3")], how="left")
      .filter( (col("distance_mi") <= F.coalesce(col("p_d"), lit(float("inf")))) &
               (col("trip_time_s") <= F.coalesce(col("p_t"), lit(float("inf")))) &
               (col("revenue")     <= F.coalesce(col("p_r"), lit(float("inf")))) )
      .drop("s1","s2","s3","p_d","p_t","p_r")
)
#print(f"[Outliers (p99.9) removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

df = df.repartition(64, "service_type", "month")

# Cap impossible speeds
df = df.filter((col("mph") >= 0) & (col("mph") <= 120.0))
#print(f"[Impossible speeds (mph < 0 or > 120) removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Fuel and energy costs
fuel = fuel.select("month", "price_per_gallon").dropna()
electricity = electricity.select("month", "price_usd_per_kwh").dropna()

# Join fuel and electricity prices
df = (df.join(broadcast(fuel), on="month", how="left")
        .join(broadcast(electricity), on="month", how="left")
)
# Remove if fuel or electricity price is missing
df = df.filter(col("price_per_gallon").isNotNull() & col("price_usd_per_kwh").isNotNull())
#print(f"[Missing fuel/electricity price removed] dropped {_prev - (_after := df.count()):,} rows (from {_prev:,} to {_after:,})"); _prev = _after

# Energy assumptions according to EPA and AFDC 
MPG_FHV  = 27.0  
MPG_TAXI = 16.0  

KWH_YELLOW = 0.30
KWH_FHV    = 0.30

YELLOW_EV_PERCENT = 0.00  # Assuming no EVs in Yellow Taxi fleet 
FHV_EV_PERCENT    = 0.10  # Example share for HVFHV

# Per-service parameters as columns 
df = (df
    .withColumn(
        "mpg",
        when(col("service_type") == "yellow", lit(MPG_TAXI))
        .otherwise(lit(MPG_FHV)).cast(DoubleType())
    )
    .withColumn(
        "kwh_per_mile",
        when(col("service_type") == "yellow", lit(KWH_YELLOW))
        .otherwise(lit(KWH_FHV)).cast(DoubleType())
    )
    .withColumn(
        "ev_share",
        when(col("service_type") == "yellow", lit(YELLOW_EV_PERCENT))
        .otherwise(lit(FHV_EV_PERCENT)).cast(DoubleType())
    )
)

# Cost per mile (blend gas vs EV by ev_share)
gas_cpm = (col("price_per_gallon") / col("mpg")).cast(DoubleType())
ev_cpm  = (col("price_usd_per_kwh") * col("kwh_per_mile")).cast(DoubleType())

df = (df
    .withColumn(
        "energy_cost_per_mile",
        ((lit(1.0) - coalesce(col("ev_share"), lit(0.0))) * coalesce(gas_cpm, lit(0.0))) +
        (coalesce(col("ev_share"), lit(0.0)) * coalesce(ev_cpm, lit(0.0)))
    )
    .withColumn("expense_fuel", (col("distance_mi") * col("energy_cost_per_mile")).cast(DoubleType()))
    .withColumn("net_after_fuel", (col("revenue") - col("expenses_nonfuel") - col("expense_fuel")).cast(DoubleType()))
    .withColumn("net_per_hr_after_fuel", (col("net_after_fuel") / col("active_hours")).cast(DoubleType()))
)

# Keep only relevant columns
cols_keep = [
    # Metadata
    "service_type", "month", "pickup_ts", "dropoff_ts",
    "pickup_hour", "pickup_dow", "is_weekend",
    # Location IDs
    "PULocationID", "DOLocationID",
    # engineered trip metrics
    "distance_mi", "trip_time_s", "mph", "active_hours",
    # Feature engineering
    "revenue", "expenses_nonfuel", "expense_fuel",
    "net_before_fuel", "net_after_fuel",
    "net_per_hr_before_fuel", "net_per_hr_after_fuel",
    # Parameters
    "price_per_gallon", "price_usd_per_kwh",
    "energy_cost_per_mile", "ev_share", "mpg", "kwh_per_mile",
]

# Filter columns to keep only those that exist in the DataFrame
cols_keep = [c for c in cols_keep if c in df.columns]

# Print size after cleaning
df = df.select(cols_keep)
#print(f"Cleaned data count: {df.count():,}")


In [None]:
# Analysis
from pyspark.sql.functions import col, sum as ssum, count, round as sround, when, lit, to_date
from pyspark.sql import functions as F
from pyspark import StorageLevel

# Testing
FAST_MODE = False
df_base = df.stat.sampleBy("service_type", {"yellow":0.02,"hv_fhv":0.02}, seed=7) if FAST_MODE else df
df_base = df_base.repartition(320, "service_type", "month")

# Aggregation helpers
sum_net   = ssum("net_after_fuel").alias("sum_net")
sum_hours = ssum("active_hours").alias("sum_hours")

def safe_net_per_hr(df_agg):
    return df_agg.withColumn("net_per_hr_TW", sround(F.when(col("sum_hours") > 0, col("sum_net")/col("sum_hours")), 2))

# Overall summary
overall = safe_net_per_hr(
    df_base.groupBy("service_type").agg(sum_net, sum_hours, count("*").alias("trips"))
)

# Figure 1 Composition 

# Earnings per trip
comp = (
    df_base.groupBy("service_type")
           .agg(
               ssum("revenue").alias("sum_rev"),
               ssum("expenses_nonfuel").alias("sum_nonfuel"),
               ssum("expense_fuel").alias("sum_fuel"),
               count("*").alias("n_trips")
           )
           .withColumn("rev_per_trip",     sround(col("sum_rev")/col("n_trips"), 2))
           .withColumn("nonfuel_per_trip", sround(col("sum_nonfuel")/col("n_trips"), 2))
           .withColumn("fuel_per_trip",    sround(col("sum_fuel")/col("n_trips"), 2))
           .select("service_type","n_trips","rev_per_trip","nonfuel_per_trip","fuel_per_trip")
)

# Figure 2
# Zone level relative advanatge 
zone_stats = (
    df_base.groupBy("service_type","PULocationID")
           .agg(
               ssum("net_after_fuel").alias("sum_net"),
               ssum("active_hours").alias("sum_hours"),
               count("*").alias("trips")
           )
           .withColumn("net_per_hr_TW", col("sum_net")/col("sum_hours"))
)
zone_y = zone_stats.filter(col("service_type")=="yellow") \
                   .select("PULocationID",
                           col("net_per_hr_TW").alias("y_hr"),
                           col("trips").alias("y_trips"))
zone_h = zone_stats.filter(col("service_type")=="hv_fhv") \
                   .select("PULocationID",
                           col("net_per_hr_TW").alias("h_hr"),
                           col("trips").alias("h_trips"))
zone_rel = (zone_y.join(zone_h, "PULocationID", "outer")
                 .withColumn("total_trips", F.coalesce(col("y_trips"), F.lit(0)) + F.coalesce(col("h_trips"), F.lit(0)))
                 .withColumn("denom", F.coalesce(col("y_hr"), F.lit(0)) + F.coalesce(col("h_hr"), F.lit(0)))
                 .withColumn("share_diff",
                             F.when(col("denom") > 0,
                                    (F.coalesce(col("y_hr"), F.lit(0)) - F.coalesce(col("h_hr"), F.lit(0))) / col("denom"))
                              .otherwise(F.lit(None)))
)


In [None]:
# Plots
import os, numpy as np, pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter, MultipleLocator
from shapely.geometry import LineString
from matplotlib.colors import TwoSlopeNorm
import geopandas as gpd

# Settings
os.makedirs("plots/img", exist_ok=True)
plt.rcParams.update({
    "figure.figsize": (6.5, 4.2),
    "font.size": 10,
    "axes.titlesize": 12,
    "axes.labelsize": 11,
    "legend.fontsize": 9,
})
# Color settings
COLORS = {"yellow": "#FFC000", "hv_fhv": "#2C7FB8"}
COST_NONFUEL = "#8C8C8C"
COST_FUEL = "#2CA25F"
NET_MARKER = "#1a1a1a"
usd0 = FuncFormatter(lambda x, pos: f"${x:,.0f}")

# Figure 1: Hour-of-day with 95% CI + overall means
daily = (
    df_base.withColumn("pickup_date", to_date(col("pickup_ts")))
           .groupBy("service_type", "pickup_date", "pickup_hour")
           .agg(ssum("net_after_fuel").alias("sum_net"),
                ssum("active_hours").alias("sum_hours"))
           .withColumn("net_per_hr", col("sum_net")/col("sum_hours"))
)
daily_pd = daily.select("service_type","pickup_hour","net_per_hr").toPandas()
g = daily_pd.groupby(["service_type","pickup_hour"])["net_per_hr"]
hour_stats = (g.mean().rename("mean").reset_index()
              .merge(g.std(ddof=1).rename("std").reset_index(), on=["service_type","pickup_hour"])
              .merge(g.count().rename("n").reset_index(), on=["service_type","pickup_hour"]))
hour_stats["sem"]  = hour_stats["std"] / np.sqrt(hour_stats["n"].clip(lower=1))
hour_stats["ci95"] = 1.96 * hour_stats["sem"]

overall_hr = (overall.select("service_type","net_per_hr_TW")
                     .toPandas().set_index("service_type")["net_per_hr_TW"])

# Plotting for Fig 1
fig, ax = plt.subplots()
for svc in ["yellow","hv_fhv"]:
    sub = hour_stats[hour_stats["service_type"]==svc].sort_values("pickup_hour")
    x = sub["pickup_hour"].values
    m = sub["mean"].values
    c = COLORS[svc]
    ax.plot(x, m, marker="o", linewidth=2, markersize=3, color=c, label=("Yellow" if svc=="yellow" else "HVFHV"))
    ax.fill_between(x, m - sub["ci95"].values, m + sub["ci95"].values, color=c, alpha=0.18, linewidth=0)
for svc in ["yellow","hv_fhv"]:
    if svc in overall_hr.index:
        ax.axhline(overall_hr[svc], color=COLORS[svc], linestyle="--", linewidth=1)

ax.set_title("Time-weighted Net Profit by Hour of Day (Jan–Jun 2024)")
ax.set_xlabel("Pickup time")
ax.set_ylabel("Net profit per active hour")
ax.yaxis.set_major_formatter(usd0)
ax.xaxis.set_major_locator(MultipleLocator(2))
ax.set_xticklabels([f"{int(t):02d}:00" for t in ax.get_xticks()])
ax.grid(True, axis="y", alpha=0.25)
ax.legend(frameon=False, ncols=2, loc="upper center", bbox_to_anchor=(0.5, -0.12))
plt.tight_layout()
plt.savefig("plots/img/hour_of_day_TW_ci.png", dpi=200, bbox_inches="tight")
plt.close()

# Figure 2 Compisition Bar graph
comp_pd = comp.toPandas().set_index("service_type")[["rev_per_trip","nonfuel_per_trip","fuel_per_trip"]]

ax = comp_pd.plot(kind="bar", stacked=True, title="Per-trip Revenue vs Costs")
ax.set_xlabel("Service type")
ax.set_ylabel("USD per trip")
ax.yaxis.set_major_formatter(FuncFormatter(lambda v, : f"${v:,.0f}")) 

ax.set_ylim(bottom=0)
ax.axhline(0, linewidth=1) 
ax.grid(True, axis="y", alpha=0.3)
plt.xticks(rotation=0)

plt.tight_layout()
plt.savefig("plots/img/composition.png", dpi=150)
plt.close()

# Print overall summary
print("\nOVERALL (time-weighted net/hr):")
print(overall.select("service_type","sum_net","sum_hours","trips","net_per_hr_TW").toPandas().to_string(index=False))


In [None]:
# GEOSPATIAL
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
from matplotlib.colors import TwoSlopeNorm

# Load taxi zones shapefile
TAXI_ZONES_PATH = "data/taxi_zones/taxi_zones.shp"
zones = gpd.read_file(TAXI_ZONES_PATH)[["LocationID","geometry","zone","borough"]]
zone_rel_pd = zone_rel.select("PULocationID","share_diff","total_trips").toPandas() \
                      .rename(columns={"PULocationID":"LocationID"})
gmap = zones.merge(zone_rel_pd, on="LocationID", how="left")

# Mask zones with very few trips
LOW_TRIPS = 100
gmap["masked"] = gmap["total_trips"].fillna(0) < LOW_TRIPS
plot_col = gmap["share_diff"].where(~gmap["masked"], np.nan)

# Plotting for Fig 3
fig, ax = plt.subplots(figsize=(6.5, 6.5))
norm = TwoSlopeNorm(vmin=-1, vcenter=0, vmax=1)
gmap.assign(plot_val=plot_col).plot(
    column="plot_val", cmap="RdBu_r", norm=norm,
    linewidth=0.2, edgecolor="#aaaaaa", ax=ax,
    legend=True, legend_kwds={"label": "Share difference (Y−H)/(Y+H)", "shrink": 0.6}
)
gmap[gmap["masked"]].plot(ax=ax, color="#DDDDDD", linewidth=0.2, edgecolor="#aaaaaa", label=f"< {LOW_TRIPS} trips")
ax.set_axis_off()
ax.set_title("Relative Advantage by Pickup Zone (Jan–Jun 2024)")
from matplotlib.patches import Patch
handles, labels = ax.get_legend_handles_labels()
if f"< {LOW_TRIPS} trips" not in labels:
    handles.append(Patch(facecolor="#DDDDDD", edgecolor="#aaaaaa", label=f"< {LOW_TRIPS} trips"))
ax.legend(handles=handles, frameon=False, loc="lower left")
plt.tight_layout()
plt.savefig("plots/img/map_share_diff.png", dpi=220, bbox_inches="tight")
plt.close()



In [None]:
# Machine Learning Model 1
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum as ssum, count, to_date
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Aggregate to (service, zone, date, hour)
_ts = next((c for c in ["pickup_ts","pickup_datetime","tpep_pickup_datetime","lpep_pickup_datetime"]
            if c in df.columns), None)
assert _ts is not None, "No pickup timestamp column found."

by_shz = (
    df.select("service_type","PULocationID","pickup_hour",
              "net_after_fuel","active_hours","price_per_gallon","price_usd_per_kwh", _ts)
      .withColumn("pickup_date", to_date(col(_ts)))
      .groupBy("service_type","PULocationID","pickup_date","pickup_hour")
      .agg(
          ssum("net_after_fuel").alias("sum_net"),
          ssum("active_hours").alias("sum_hours"),
          count("*").alias("trips"),
          F.first("price_per_gallon").alias("price_per_gallon"),
          F.first("price_usd_per_kwh").alias("price_usd_per_kwh")
      )
      .filter(col("sum_hours") > 0)
      .withColumn("net_hr",  col("sum_net")/col("sum_hours"))
      .withColumn("trips_hr", col("trips")/col("sum_hours"))
      .withColumn("dow", F.dayofweek("pickup_date"))
      .withColumn("is_weekend", F.col("dow").isin(1,7).cast("int"))
      .withColumn("month_ix", F.month("pickup_date"))
      .withColumn("ym", F.date_format("pickup_date","yyyy-MM"))
)

# Pair services on the same zone–date–hour and build Δ & weights
wide = (
    by_shz.groupBy("PULocationID","pickup_date","pickup_hour","dow","is_weekend","month_ix","ym",
                   "price_per_gallon","price_usd_per_kwh")
          .pivot("service_type", ["yellow","hv_fhv"])
          .agg(F.first("net_hr").alias("net_hr"),
               F.first("trips_hr").alias("trips_hr"),
               F.first("sum_hours").alias("sum_hours"))
          .fillna(0.0)
)

# Rename columns for clarity
for old, new in [("yellow_net_hr","y_net_hr"), ("hv_fhv_net_hr","h_net_hr"),
                 ("yellow_trips_hr","y_trips_hr"), ("hv_fhv_trips_hr","h_trips_hr"),
                 ("yellow_sum_hours","y_hours"), ("hv_fhv_sum_hours","h_hours")]:
    if old in wide.columns:
        wide = wide.withColumnRenamed(old, new)

feat = (wide
        .withColumn("delta", col("y_net_hr") - col("h_net_hr"))
        .withColumn("w_hours", col("y_hours") + col("h_hours"))  
)

# Time-aware split
train = feat.filter(col("ym") <= "2024-05").cache()
test  = feat.filter(col("ym") == "2024-06").cache()

# Encode zone and assemble features
idx  = StringIndexer(inputCol="PULocationID", outputCol="zone_idx", handleInvalid="keep")
ohe  = OneHotEncoder(inputCols=["zone_idx"], outputCols=["zone_ohe"], dropLast=False)

# Features
va = VectorAssembler(
    inputCols=[
        "pickup_hour","dow","is_weekend","month_ix",
        "y_trips_hr","h_trips_hr",
        "price_per_gallon","price_usd_per_kwh",
        "zone_ohe"
    ],
    outputCol="features"
)

# Weighted ridge regression on Δ
lin = LinearRegression(
    labelCol="delta",
    featuresCol="features",
    weightCol="w_hours",  
    elasticNetParam=0.0,   
    regParam=0.2,
    maxIter=200
) 

# Pipeline
pipe = Pipeline(stages=[idx, ohe, va, lin]).fit(train)
pred = pipe.transform(test)

# Evaluate
pred_eval = pred.withColumn(
    "abs_err", F.abs(col("delta") - col("prediction"))
).withColumn(
    "sq_err", (col("delta") - col("prediction"))**2
)

# Weighted MAE
wmae = (
    pred_eval.withColumn("w_abs_err", col("w_hours") * col("abs_err"))
             .agg((F.sum("w_abs_err") / F.sum("w_hours")).alias("wmae"))
             .collect()[0]["wmae"]
)

# Weighted RMSE 
wrmse = (
    pred_eval.withColumn("w_sq_err", col("w_hours") * col("sq_err"))
             .agg((F.sqrt(F.sum("w_sq_err") / F.sum("w_hours"))).alias("wrmse"))
             .collect()[0]["wrmse"]
)

print(f"Model 1 (Ridge Δ) — June: wMAE={wmae:.2f}, wRMSE={wrmse:.2f}")

In [None]:
# Machine learning plot 1 
import pandas as pd, numpy as np, matplotlib.pyplot as plt, os
from matplotlib.ticker import FuncFormatter

os.makedirs("plots/img", exist_ok=True)

m1 = pred.select("delta","prediction").toPandas().dropna()

# Bin by predicted difference
m1["bin"] = pd.qcut(m1["prediction"], 10, duplicates="drop")
g = m1.groupby("bin", observed=True)
cal = pd.DataFrame({
    "pred_mean": g["prediction"].mean().values,
    "act_mean":  g["delta"].mean().values,
    "n":         g.size().values
}).sort_values("pred_mean")

# Plot
plt.figure(figsize=(6.8,4.2))
plt.plot(cal["pred_mean"], cal["act_mean"], marker="o", linewidth=2, label="Model 1 — Ridge")
lim = float(np.ceil(max(np.abs(cal[["pred_mean","act_mean"]]).to_numpy().ravel())/5)*5) or 10.0
plt.plot([-lim, lim], [-lim, lim], "k--", linewidth=1, label="Perfect calibration (y=x)")
plt.xlabel("Predicted Δ  (bin mean)  [USD/hr]")
plt.ylabel("Actual Δ     (bin mean)  [USD/hr]")
plt.title("Calibration by prediction decile — Model 1 (Ridge), June")
plt.grid(True, axis="y", alpha=0.25)
plt.legend(frameon=False, loc="upper left")
plt.tight_layout()
plt.savefig("plots/img/model1_calibration_deciles.png", dpi=200)
plt.close()

In [None]:
# Machine learning model 2

from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml.regression import GBTRegressor

# Using the same train/test split and features as Model 1, but with GBT
idx = StringIndexer(inputCol="PULocationID", outputCol="zone_idx", handleInvalid="keep")

va = VectorAssembler(
    inputCols=[
        "pickup_hour","dow","is_weekend","month_ix",
        "price_per_gallon","price_usd_per_kwh",
        "zone_idx"             
    ],
    outputCol="features_raw"
)

# Index categorical features
vx = VectorIndexer(inputCol="features_raw", outputCol="features", maxCategories=300, handleInvalid="keep")

# GBT regressor on Δ
gbt = GBTRegressor(
    labelCol="delta",
    featuresCol="features",
    maxDepth=6,          
    maxBins=512,
    maxIter=120,        
    stepSize=0.1,       
    subsamplingRate=0.8, 
    seed=7
)

# Pipeline
pipe_gbt = Pipeline(stages=[idx, va, vx, gbt]).fit(train)
pred_gbt = pipe_gbt.transform(test).cache()

# Evaluate
den = pred_gbt.agg(F.sum("w_hours").alias("W")).first()["W"]

w_mae = (pred_gbt
         .select((F.col("w_hours")*F.abs(F.col("delta")-F.col("prediction"))).alias("w_ae"))
         .agg(F.sum("w_ae")).first()[0] / den)

w_rmse = ((pred_gbt
           .select((F.col("w_hours")*F.pow(F.col("delta")-F.col("prediction"),2)).alias("w_se"))
           .agg(F.sum("w_se")).first()[0] / den) ** 0.5)

print(f"Model 2 (GBT Δ) — June: wMAE={w_mae:.2f}, wRMSE={w_rmse:.2f}")

# Simple baseline: mean by hour of day
base = (train.groupBy("pickup_hour")
              .agg(F.avg("delta").alias("dbar")))
pred_base = (test.join(base, "pickup_hour", "left")
                 .fillna({"dbar": float(train.agg(F.avg('delta')).first()[0])})
                 .withColumn("pred_base", F.col("dbar")))
b_mae = (pred_base
         .select((F.col("w_hours")*F.abs(F.col("delta")-F.col("pred_base"))).alias("w_ae"))
         .agg(F.sum("w_ae")).first()[0] / den)
b_rmse = ((pred_base
           .select((F.col("w_hours")*F.pow(F.col("delta")-F.col("pred_base"),2)).alias("w_se"))
           .agg(F.sum("w_se")).first()[0] / den) ** 0.5)
print(f"Baseline (hour mean) — June: wMAE={b_mae:.2f}, wRMSE={b_rmse:.2f}")

In [None]:
# Machine learning plot 2
import os, numpy as np, pandas as pd, matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from pyspark.sql import functions as F, Window

os.makedirs("plots/img", exist_ok=True)
usd0 = FuncFormatter(lambda x, pos: f"${x:,.0f}")

# Calibration plot by decile
w = Window.orderBy(F.col("prediction"))
pred_bins = pred_gbt.withColumn("bin", F.ntile(10).over(w))

# Making the bin stats table
bin_stats = (pred_bins.groupBy("bin")
    .agg(
        F.sum("w_hours").alias("w"),
        (F.sum(F.col("w_hours")*F.col("prediction"))/F.sum("w_hours")).alias("pred_mean"),
        (F.sum(F.col("w_hours")*F.col("delta"))/F.sum("w_hours")).alias("actual_mean"),
        F.min("prediction").alias("pred_min"),
        F.max("prediction").alias("pred_max")
    )
    .orderBy("bin")
    .toPandas()
)

fig, ax = plt.subplots(figsize=(7.6, 4.6))
ax.plot(bin_stats["pred_mean"], bin_stats["actual_mean"],
        marker="o", linewidth=2.2, label="Model 2 — GBT")
# y=x line
lims = [min(bin_stats["pred_mean"].min(), bin_stats["actual_mean"].min()),
        max(bin_stats["pred_mean"].max(), bin_stats["actual_mean"].max())]
pad = (lims[1]-lims[0])*0.05
ax.plot([lims[0]-pad, lims[1]+pad], [lims[0]-pad, lims[1]+pad],
        linestyle="--", color="k", linewidth=1.2, label="Perfect calibration (y=x)")

ax.set_title("Calibration by prediction decile — Model 2 (GBT Δ), June")
ax.set_xlabel("Predicted Δ  (bin mean)  [USD/hr]")
ax.set_ylabel("Actual Δ  (bin mean)  [USD/hr]")
ax.yaxis.set_major_formatter(usd0)
ax.xaxis.set_major_formatter(usd0)
ax.grid(True, alpha=0.25)
ax.legend(frameon=False, loc="upper left")
plt.tight_layout()
plt.savefig("plots/img/gbt_calibration.png", dpi=200)
plt.close()


# Feature importance plot
gbt_model = pipe_gbt.stages[-1]  
feat_names = ["pickup_hour","dow","is_weekend","month_ix",
              "price_per_gallon","price_usd_per_kwh","zone_idx"]
imp = np.array(gbt_model.featureImportances.toArray())
order = np.argsort(imp)
fig, ax = plt.subplots(figsize=(7.2, 4.6))
ax.barh(np.array(feat_names)[order], imp[order])
ax.set_title("Feature importances")
ax.set_xlabel("Importance (gain)")
plt.tight_layout()
plt.savefig("plots/img/gbt_feature_importance.png", dpi=200)
plt.close()