In [0]:
import pandas as pd
import time
from pyspark.sql.functions import col, count, avg, sum as spark_sum, desc, when

# Verify Spark is ready (spark is pre-created in Databricks)
print(f"✅ Spark version: {spark.version}")
print(f"✅ Ready to go!")



✅ Spark version: 4.0.0
✅ Ready to go!


### Operation 1: Load the Data

In [0]:

df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_200[6-9]-*.csv.gz,"
         "dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2010-*.csv.gz")
)

### Operation 2: Apply transformations


In [0]:
from pyspark.sql import functions as F

df_clean = (
    df
    .withColumn("pickup_ts", F.to_timestamp("pickup_datetime", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("dropoff_ts", F.to_timestamp("dropoff_datetime", "yyyy-MM-dd HH:mm:ss"))
)


## Transformation

In [0]:
df_feat = (
    df_clean
    .withColumn("trip_minutes", (F.col("dropoff_ts").cast("long") - F.col("pickup_ts").cast("long")) / 60.0)
    .withColumn("year", F.year("pickup_ts"))
    .withColumn("month", F.month("pickup_ts"))
    .withColumn("hour", F.hour("pickup_ts"))
)


##Filter

In [0]:
df_f = (
    df_feat
    .filter(F.col("pickup_datetime").startswith("2010-"))
    .filter(F.col("trip_distance") > 0)
    .filter(F.col("fare_amount") >= 0)
    .filter(F.col("pickup_ts").isNotNull() & F.col("dropoff_ts").isNotNull())
)


In [0]:
agg_pay_month = (
    df_f.groupBy("month", "payment_type")
        .agg(
            F.count("*").alias("n_trips"),
            F.avg("fare_amount").alias("avg_fare"),
            F.avg("tip_amount").alias("avg_tip"),
            F.avg("trip_distance").alias("avg_distance"),
            F.avg("trip_minutes").alias("avg_duration")
        )
        .orderBy( "month")
)


## SQL Queries

In [0]:
df_f.createOrReplaceTempView("df_f")
# 99 percentile tip by month
sql_99_percentile_tip_by_month = spark.sql("""
SELECT  month,
       percentile_approx(tip_amount, 0.99) AS 99_tip_amount
FROM df_f
GROUP BY month
ORDER BY month
""")

# Average trips per day by day of week
sql_avg_trips_by_dow = spark.sql("""
SELECT
  date_format(pickup_ts, 'E') AS dow,
  COUNT(*) / COUNT(DISTINCT date(pickup_ts)) AS avg_trips_per_day
FROM df_f
WHERE pickup_ts IS NOT NULL
GROUP BY date_format(pickup_ts, 'E')
ORDER BY avg_trips_per_day DESC
""")


In [0]:
#show results
agg_pay_month.show()
sql_99_percentile_tip_by_month.show()
sql_avg_trips_by_dow.show()


+-----+------------+-------+------------------+--------------------+------------------+------------------+
|month|payment_type|n_trips|          avg_fare|             avg_tip|      avg_distance|      avg_duration|
+-----+------------+-------+------------------+--------------------+------------------+------------------+
|    1|         Dis|   5931|12.137599055808462|0.026538526386781326|3.6906086663294517|12.408028438149804|
|    1|         No |  27588|10.375210236334613| 0.01422502537335073|2.9222560533565365| 11.28149799429708|
|    1|         CRE| 241539|11.338242561242701|  2.1906308712049785| 3.398023921602689|14.455690799415436|
|    1|         CAS|5600983| 8.890324505547309|1.493487839545308...| 2.460610835633479|10.428857163704018|
|    1|         Cas|4403194| 8.640615823428492|0.002560668460213...| 2.345022317889869| 9.884501886584824|
|    1|         Cre|4500656| 10.54006692580258|   2.049957021820885| 3.118497241290966|12.144309488808913|
|    2|         CRE|2074020| 10.37784

### Write to Parquet (the code is in "coment" format because Databricks permission)

In [0]:
#agg_pay_month.write.parquet("output.parquet")
#sql_99_percentile_tip_by_month.write.parquet("output.parquet")
#sql_avg_trips_by_dow.write.parquet("output.parquet")


## Execution Plans

In [0]:
agg_pay_month.explain("formatted")


== Physical Plan ==
AdaptiveSparkPlan (17)
+- == Initial Plan ==
   ColumnarToRow (16)
   +- PhotonResultStage (15)
      +- PhotonSort (14)
         +- PhotonShuffleExchangeSource (13)
            +- PhotonShuffleMapStage (12)
               +- PhotonShuffleExchangeSink (11)
                  +- PhotonGroupingAgg (10)
                     +- PhotonShuffleExchangeSource (9)
                        +- PhotonShuffleMapStage (8)
                           +- PhotonShuffleExchangeSink (7)
                              +- PhotonGroupingAgg (6)
                                 +- PhotonProject (5)
                                    +- PhotonProject (4)
                                       +- PhotonFilter (3)
                                          +- PhotonRowToColumnar (2)
                                             +- Scan csv  (1)


(1) Scan csv 
Output [6]: [pickup_datetime#12512, dropoff_datetime#12513, trip_distance#12515, payment_type#12522, fare_amount#12523, tip_amount#12526]


In [0]:
sql_99_percentile_tip_by_month.explain("formatted")


== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Initial Plan ==
   ColumnarToRow (15)
   +- PhotonResultStage (14)
      +- PhotonSort (13)
         +- PhotonShuffleExchangeSource (12)
            +- PhotonShuffleMapStage (11)
               +- PhotonShuffleExchangeSink (10)
                  +- PhotonGroupingAgg (9)
                     +- PhotonShuffleExchangeSource (8)
                        +- PhotonShuffleMapStage (7)
                           +- PhotonShuffleExchangeSink (6)
                              +- PhotonGroupingAgg (5)
                                 +- PhotonProject (4)
                                    +- PhotonFilter (3)
                                       +- PhotonRowToColumnar (2)
                                          +- Scan csv  (1)


(1) Scan csv 
Output [5]: [pickup_datetime#12512, dropoff_datetime#12513, trip_distance#12515, fare_amount#12523, tip_amount#12526]
Batched: false
Location: InMemoryFileIndex [dbfs:/databricks-datasets/nyctaxi/tripdata/

In [0]:
sql_avg_trips_by_dow.explain("formatted")


== Physical Plan ==
AdaptiveSparkPlan (21)
+- == Initial Plan ==
   ColumnarToRow (20)
   +- PhotonResultStage (19)
      +- PhotonSort (18)
         +- PhotonShuffleExchangeSource (17)
            +- PhotonShuffleMapStage (16)
               +- PhotonShuffleExchangeSink (15)
                  +- PhotonGroupingAgg (14)
                     +- PhotonShuffleExchangeSource (13)
                        +- PhotonShuffleMapStage (12)
                           +- PhotonShuffleExchangeSink (11)
                              +- PhotonGroupingAgg (10)
                                 +- PhotonGroupingAgg (9)
                                    +- PhotonShuffleExchangeSource (8)
                                       +- PhotonShuffleMapStage (7)
                                          +- PhotonShuffleExchangeSink (6)
                                             +- PhotonGroupingAgg (5)
                                                +- PhotonProject (4)
                                        