In [0]:
df_jan = spark.read.parquet("dbfs:/FileStore/tables/yellow_tripdata_2025_01.parquet")
df_feb = spark.read.parquet("dbfs:/FileStore/tables/yellow_tripdata_2025_02.parquet")


In [0]:
df_jan.printSchema()
df_feb.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: ti

In [0]:
display(df_jan.count())
display(df_feb.count())

3475226

3577543

In [0]:
df_jan.show(5)
df_feb.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [0]:
df = df_jan.unionByName(df_feb)

In [0]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [0]:
df.count()

7052769

In [0]:
from pyspark.sql.functions import col, avg, count, to_date, sum, window, max

### Add Revenue Column

In [0]:
df = df.withColumn("Revenue", col("fare_amount") + col("extra") + col("mta_tax") + col("improvement_surcharge") + col("tip_amount") + col("tolls_amount") + col("total_amount"))

### Total Passengers by Area

In [0]:
df.groupBy("PULocationID").sum("passenger_count").show()

+------------+--------------------+
|PULocationID|sum(passenger_count)|
+------------+--------------------+
|         148|               78356|
|         243|                1063|
|          31|                  24|
|         137|               74978|
|          85|                 624|
|         251|                   3|
|          65|                3112|
|         255|                1865|
|          53|                 202|
|         133|                 393|
|          78|                 397|
|         155|                 646|
|         108|                 341|
|         211|               60579|
|          34|                 122|
|         193|                3095|
|         101|                 115|
|         126|                 194|
|         115|                   8|
|          81|                 266|
+------------+--------------------+
only showing top 20 rows


### Real-time Avg Fare / Total Amount by Vendor

In [0]:
df.groupBy("VendorID").agg(avg("fare_amount").alias("avg_fare"), avg("total_amount").alias("avg_total_earning")).show()

+--------+------------------+------------------+
|VendorID|          avg_fare| avg_total_earning|
+--------+------------------+------------------+
|       1| 17.95817555434934|26.257572032419343|
|       6| 2.972588522588525| 30.02638583638582|
|       7|14.008691788126564| 22.76513686455739|
|       2| 16.63325596916356| 25.06227858034035|
+--------+------------------+------------------+



### Moving Count of Payments by Mode

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("payment_type").orderBy("tpep_pickup_datetime").rowsBetween(-10, 0)
df.withColumn("moving_payment_count", count("payment_type").over(window_spec)).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|            Revenue|moving_payment_count|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------------+--------------------+
|      

### Top 2 Vendors by Revenue on a Given Date

In [0]:
date_filtered = df.withColumn("date", to_date("tpep_pickup_datetime"))
(date_filtered.groupBy("date", "VendorID")
    .agg(
        sum("Revenue").alias("total_revenue"),
        sum("passenger_count").alias("total_passenger"),
        sum("trip_distance").alias("total_distance")
    )
    .orderBy("total_revenue", ascending=False)
    .show(2)
)

+----------+--------+-----------------+---------------+------------------+
|      date|VendorID|    total_revenue|total_passenger|    total_distance|
+----------+--------+-----------------+---------------+------------------+
|2025-02-14|       2| 5936830.82999978|         112270| 839522.6200000233|
|2025-02-28|       2|5676362.939999949|         113845|1100449.3199999998|
+----------+--------+-----------------+---------------+------------------+
only showing top 2 rows


### Most Passengers Between Two Locations

In [0]:
df.groupBy("PULocationID", "DOLocationID").sum("passenger_count").orderBy("sum(passenger_count)", ascending=False).show(1)

+------------+------------+--------------------+
|PULocationID|DOLocationID|sum(passenger_count)|
+------------+------------+--------------------+
|         237|         236|               57465|
+------------+------------+--------------------+
only showing top 1 row


### Top Pickup Locations in Last Day

In [0]:
max_timestamp = df.select(max("tpep_pickup_datetime")).first()[0]

In [0]:
from pyspark.sql.types import TimestampType
from datetime import timedelta
cutoff_timestamp = max_timestamp - timedelta(days=1)
last_day_df = df.filter(col("tpep_pickup_datetime") >= cutoff_timestamp)

In [0]:
last_day_df.groupBy("PULocationID").sum("passenger_count").orderBy("sum(passenger_count)", ascending=False).show()

+------------+--------------------+
|PULocationID|sum(passenger_count)|
+------------+--------------------+
|         132|                7098|
|         237|                7033|
|         236|                6858|
|         161|                6530|
|         186|                5390|
|         230|                5174|
|         138|                4908|
|         162|                4708|
|         142|                4443|
|         163|                4157|
|         234|                4023|
|          68|                3947|
|         170|                3898|
|          48|                3768|
|         239|                3753|
|         249|                3457|
|          79|                3343|
|         141|                3332|
|         164|                3205|
|         238|                2829|
+------------+--------------------+
only showing top 20 rows
