In [0]:
display(dbutils.fs.ls("dbfs:/Workspace/Users/freddyplati@gmail.com/Week 11 PySpark Data Processing/"))

path,name,size,modificationTime
dbfs:/Workspace/Users/freddyplati@gmail.com/Week 11 PySpark Data Processing/Week11 Assignment.ipynb,Week11 Assignment.ipynb,1007,1762655254020
dbfs:/Workspace/Users/freddyplati@gmail.com/Week 11 PySpark Data Processing/fhvhv_tripdata_2025-01.parquet,fhvhv_tripdata_2025-01.parquet,491076642,1762655026826
dbfs:/Workspace/Users/freddyplati@gmail.com/Week 11 PySpark Data Processing/fhvhv_tripdata_2025-02.parquet,fhvhv_tripdata_2025-02.parquet,461632254,1762655024958
dbfs:/Workspace/Users/freddyplati@gmail.com/Week 11 PySpark Data Processing/fhvhv_tripdata_2025-03.parquet,fhvhv_tripdata_2025-03.parquet,501783767,1762655027681


In [0]:
%sql
CREATE CATALOG IF NOT EXISTS main;
CREATE SCHEMA  IF NOT EXISTS main.default;
CREATE VOLUME  IF NOT EXISTS main.default.nyc_fhvhv;

In [0]:
# Source = your Workspace folder that listed successfully
src_base = "dbfs:/Workspace/Users/freddyplati@gmail.com/Week 11 PySpark Data Processing/"
dst_base = "/Volumes/main/default/nyc_fhvhv"

# Make sure destination exists
dbutils.fs.mkdirs(dst_base)

# Copy the three Parquet files into the Volume
for m in ["01","02","03"]:
    src = f"{src_base}fhvhv_tripdata_2025-{m}.parquet"
    dst = f"{dst_base}/fhvhv_tripdata_2025-{m}.parquet"
    # recurse=False because each is a single parquet file (not a folder)
    dbutils.fs.cp(src, dst, recurse=False)

# Verify they are in the Volume
display(dbutils.fs.ls(dst_base))

path,name,size,modificationTime
dbfs:/Volumes/main/default/nyc_fhvhv/fhvhv_tripdata_2025-01.parquet,fhvhv_tripdata_2025-01.parquet,491076642,1762655925000
dbfs:/Volumes/main/default/nyc_fhvhv/fhvhv_tripdata_2025-02.parquet,fhvhv_tripdata_2025-02.parquet,461632254,1762655943000
dbfs:/Volumes/main/default/nyc_fhvhv/fhvhv_tripdata_2025-03.parquet,fhvhv_tripdata_2025-03.parquet,501783767,1762655959000


In [0]:
from pyspark.sql import functions as F

fhvhv_tr = (fhvhv
  # timestamps are already timestamp_ntz; just reuse
  .withColumn("pickup_ts",  F.col("pickup_datetime"))
  .withColumn("dropoff_ts", F.col("dropoff_datetime"))
  .withColumn("year",  F.year("pickup_ts"))
  .withColumn("month", F.month("pickup_ts"))
  .withColumn("trip_minutes", (F.col("trip_time")/60.0))

  # sanity speed (trip_time is seconds in TLC; adjust if your docs differ)
  .withColumn("speed_mph",
              F.when(F.col("trip_minutes") > 0,
                     F.col("trip_miles")/(F.col("trip_minutes")/60.0)))

  # normalize nulls to 0 for money fields
  .withColumn("fare",              F.coalesce(F.col("base_passenger_fare"), F.lit(0.0)))
  .withColumn("tolls_",            F.coalesce(F.col("tolls"), F.lit(0.0)))
  .withColumn("tips_",             F.coalesce(F.col("tips"), F.lit(0.0)))
  .withColumn("bcf_",              F.coalesce(F.col("bcf"), F.lit(0.0)))  # Black Car Fund
  .withColumn("sales_tax_",        F.coalesce(F.col("sales_tax"), F.lit(0.0)))
  .withColumn("congestion_",       F.coalesce(F.col("congestion_surcharge"), F.lit(0.0)))
  .withColumn("airport_fee_",      F.coalesce(F.col("airport_fee"), F.lit(0.0)))
  .withColumn("cbd_congestion_",   F.coalesce(F.col("cbd_congestion_fee"), F.lit(0.0)))

  # build revenue = fare + surcharges/fees + tips + tolls
  .withColumn("revenue",
              F.col("fare") + F.col("tolls_") + F.col("tips_") + F.col("bcf_")
              + F.col("sales_tax_") + F.col("congestion_") + F.col("airport_fee_")
              + F.col("cbd_congestion_"))

  .withColumn("tip_rate",
              F.when(F.col("fare") > 0, F.col("tips_")/F.col("fare")).otherwise(F.lit(0.0)))

  # early filters (2+)
  .filter(F.col("PULocationID").isNotNull() & F.col("DOLocationID").isNotNull())
  .filter((F.col("trip_miles") > 0) & (F.col("trip_time") > 0))
  .filter((F.col("speed_mph") >= 1) & (F.col("speed_mph") <= 90))

  # column pruning
  .select("pickup_ts","dropoff_ts","year","month","PULocationID","DOLocationID",
          "trip_miles","trip_minutes","speed_mph","fare","tips_","tolls_","revenue",
          "tip_rate","hvfhs_license_num")
  .withColumnRenamed("tips_", "tips")
  .withColumnRenamed("tolls_","tolls")
)

In [0]:
# KPIs by month
kpi_month = (fhvhv_tr
  .groupBy("year","month")
  .agg(F.count("*").alias("trips"),
       F.avg("speed_mph").alias("avg_speed"),
       F.sum("revenue").alias("revenue_sum"),
       F.avg("tip_rate").alias("avg_tip_rate"))
  .orderBy("year","month"))

kpi_month.show()

# Two SQL queries
fhvhv_tr.createOrReplaceTempView("fhvhv")

sql_a = spark.sql("""
  SELECT year, month,
         COUNT(*) AS trips,
         AVG(speed_mph) AS avg_speed,
         SUM(revenue) AS revenue_sum
  FROM fhvhv
  WHERE speed_mph BETWEEN 1 AND 90
  GROUP BY year, month
  ORDER BY year, month
""")

sql_b = spark.sql("""
  SELECT PULocationID, COUNT(*) AS trips, SUM(revenue) AS revenue_sum
  FROM fhvhv
  WHERE year = 2025 AND month IN (1,2,3)
  GROUP BY PULocationID
  ORDER BY trips DESC
  LIMIT 20
""")

+----+-----+--------+------------------+-------------------+--------------------+
|year|month|   trips|         avg_speed|        revenue_sum|        avg_tip_rate|
+----+-----+--------+------------------+-------------------+--------------------+
|2025|    1|20400288|13.939180662952221|6.264227268287927E8|0.038985726740600884|
|2025|    2|19334052| 13.69140516314002|6.092364401687396E8| 0.03697393015432927|
|2025|    3|20531303| 13.80096165959246|7.091356421488773E8| 0.03561225178814893|
+----+-----+--------+------------------+-------------------+--------------------+



In [0]:
%sql
CREATE VOLUME IF NOT EXISTS main.default.nyc_fhvhv_out;

In [0]:
%sql
SHOW VOLUMES IN main.default;

database,volume_name
default,nyc_fhvhv
default,nyc_fhvhv_out


In [0]:
out_dir = "/Volumes/main/default/nyc_fhvhv_out"

(fhvhv_tr
 .write.mode("overwrite")
 .partitionBy("year","month")
 .parquet(f"{out_dir}/fhvhv_tr_partitioned"))

(kpi_month.coalesce(1)
 .write.mode("overwrite").parquet(f"{out_dir}/kpi_month"))

(sql_a.write.mode("overwrite").parquet(f"{out_dir}/sql_a"))
(sql_b.write.mode("overwrite").parquet(f"{out_dir}/sql_b"))

In [0]:
print("\n==== EXPLAIN: kpi_month ====\n")
kpi_month.explain(mode="formatted")

print("\n==== EXPLAIN: sql_a ====\n")
sql_a.explain(mode="formatted")


==== EXPLAIN: kpi_month ====

== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Initial Plan ==
   ColumnarToRow (15)
   +- PhotonResultStage (14)
      +- PhotonSort (13)
         +- PhotonShuffleExchangeSource (12)
            +- PhotonShuffleMapStage (11)
               +- PhotonShuffleExchangeSink (10)
                  +- PhotonGroupingAgg (9)
                     +- PhotonShuffleExchangeSource (8)
                        +- PhotonShuffleMapStage (7)
                           +- PhotonShuffleExchangeSink (6)
                              +- PhotonGroupingAgg (5)
                                 +- PhotonProject (4)
                                    +- PhotonProject (3)
                                       +- PhotonProject (2)
                                          +- PhotonScan parquet  (1)


(1) PhotonScan parquet 
Output [13]: [pickup_datetime#11315, PULocationID#11317, DOLocationID#11318, trip_miles#11319, trip_time#11320L, base_passenger_fare#11321, tolls#11322, bcf#11

In [0]:
src_df_path = "/Volumes/main/default/nyc_fhvhv"
fhvhv = spark.read.parquet(f"{src_df_path}/fhvhv_tripdata_2025-*.parquet")

# sanity checks
fhvhv.printSchema()
display(fhvhv.limit(5))

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_

hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag,cbd_congestion_fee
HV0003,B03404,B03404,2025-01-01T00:28:07.000,2025-01-01T00:31:17.000,2025-01-01T00:33:25.000,2025-01-01T00:54:24.000,148,211,1.32,1259,18.21,0.0,0.46,1.5,2.75,0.0,0.0,15.71,N,N,N,N,N,0.0
HV0005,B03406,,2025-01-01T00:18:33.000,,2025-01-01T00:29:49.000,2025-01-01T00:41:52.000,127,244,3.313,723,16.97,0.0,0.47,1.51,0.0,0.0,0.0,23.57,N,N,N,N,N,0.0
HV0003,B03404,B03404,2025-01-01T00:28:22.000,2025-01-01T00:31:52.000,2025-01-01T00:32:39.000,2025-01-01T01:20:33.000,132,181,13.43,2874,40.36,0.0,1.18,3.8,0.0,2.5,0.0,47.75,N,N,N,N,N,0.0
HV0003,B03404,B03404,2025-01-01T00:27:13.000,2025-01-01T00:33:58.000,2025-01-01T00:34:55.000,2025-01-01T00:39:19.000,76,76,0.82,264,12.24,0.0,0.34,1.08,0.0,0.0,0.0,12.2,N,N,N,N,N,0.0
HV0003,B03404,B03404,2025-01-01T00:33:29.000,2025-01-01T00:45:46.000,2025-01-01T00:46:19.000,2025-01-01T00:53:56.000,76,76,1.61,457,19.76,0.0,0.54,1.76,0.0,0.0,0.0,17.67,N,N,N,N,N,0.0


In [0]:
# If already uploaded alongside notebook:
zone_src = "dbfs:/Workspace/Users/freddyplati@gmail.com/Week 11 PySpark Data Processing/taxi_zone_lookup.csv"
zone_dst_dir = "/Volumes/main/default/nyc_fhvhv_out/lookup"
dbutils.fs.mkdirs(zone_dst_dir)
dbutils.fs.cp(zone_src, f"{zone_dst_dir}/taxi_zone_lookup.csv", recurse=False)

zones = (spark.read
         .option("header", True)
         .csv(f"{zone_dst_dir}/taxi_zone_lookup.csv")
         .select(F.col("LocationID").cast("int").alias("LocationID"),
                 "Borough", "Zone"))

from pyspark.sql.functions import broadcast
zones_b = broadcast(zones)

with_zones = (fhvhv_tr
    .join(zones_b.withColumnRenamed("LocationID","PU"), F.col("PULocationID")==F.col("PU"), "left")
    .withColumnRenamed("Borough","PU_Borough").withColumnRenamed("Zone","PU_Zone")
    .join(zones_b.withColumnRenamed("LocationID","DO"), F.col("DOLocationID")==F.col("DO"), "left")
    .withColumnRenamed("Borough","DO_Borough").withColumnRenamed("Zone","DO_Zone")
)

# Example aggregation using joined columns
top_borough = (with_zones
    .groupBy("PU_Borough")
    .agg(F.sum("revenue").alias("rev"), F.count("*").alias("trips"))
    .orderBy(F.desc("rev")))

# Write + plan
out_dir = "/Volumes/main/default/nyc_fhvhv_out"
(top_borough.coalesce(1).write.mode("overwrite").parquet(f"{out_dir}/top_borough"))

print("\n==== EXPLAIN: top_borough (with broadcast joins) ====\n")
top_borough.explain(mode="formatted")


==== EXPLAIN: top_borough (with broadcast joins) ====

== Physical Plan ==
AdaptiveSparkPlan (32)
+- == Initial Plan ==
   ColumnarToRow (31)
   +- PhotonResultStage (30)
      +- PhotonSort (29)
         +- PhotonShuffleExchangeSource (28)
            +- PhotonShuffleMapStage (27)
               +- PhotonShuffleExchangeSink (26)
                  +- PhotonGroupingAgg (25)
                     +- PhotonShuffleExchangeSource (24)
                        +- PhotonShuffleMapStage (23)
                           +- PhotonShuffleExchangeSink (22)
                              +- PhotonGroupingAgg (21)
                                 +- PhotonProject (20)
                                    +- PhotonBroadcastHashJoin LeftOuter (19)
                                       :- PhotonProject (11)
                                       :  +- PhotonBroadcastHashJoin LeftOuter (10)
                                       :     :- PhotonProject (2)
                                       :     :  +- 

In [0]:
# Transformations (lazy)
lazy_df = with_zones.filter((F.col("year")==2025) & (F.col("month")==1)).select("pickup_ts","revenue")

# Action (eager) triggers execution
print("Actions vs Transformations — count():", lazy_df.count())

Actions vs Transformations — count(): 20400288
