In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql.functions import when
from pyspark.sql.functions import col, to_timestamp, year
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as _sum, when

In [0]:
# Set the storage account
spark.conf.set(
    "fs.azure.account.key.taxidatalake2025.dfs.core.windows.net",
    "eUGvWRhpg9Hu20GqBnedmb4+jafMNcOdC2+9ykuUqhEjJ5lvm8W5FIy/FISMls9bVHolgKqk/zIE+AStgpuUCA=="
)

file_path = "abfss://taxidata@taxidatalake2025.dfs.core.windows.net/yellow-tripdata-2018-01.csv"
df = spark.read.option("header", True).option("inferSchema", True).csv(file_path)
df.show(5)
df.printSchema()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2018-01-01 00:21:05|  2018-01-01 00:24:23|              1|          0.5|         1|                 N|          41|          24|           2|        4.5|  0.5|    0.5|       0.

In [0]:
columns_to_cast = [
    'fare_amount', 'extra', 'mta_tax', 'tip_amount', 
    'tolls_amount', 'improvement_surcharge', 'total_amount'
]

for col_name in columns_to_cast:
    df = df.withColumn(col_name, when(col(col_name) == "\\N", None).otherwise(col(col_name).cast("double")))

In [0]:
df = df.withColumn(
    "Revenue", 
    (col("fare_amount") + col("extra") + col("mta_tax") + 
     col("improvement_surcharge") + col("tip_amount") + 
     col("tolls_amount") + col("total_amount"))
)
df.select("VendorID", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount", "Revenue").show(100)

+--------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+
|VendorID|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|           Revenue|
+--------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+
|       1|        4.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         5.8|              11.6|
|       1|       14.0|  0.5|    0.5|       0.0|         0.0|                  0.3|        15.3|              30.6|
|       1|        6.0|  0.5|    0.5|       1.0|         0.0|                  0.3|         8.3|              16.6|
|       1|       33.5|  0.5|    0.5|       0.0|         0.0|                  0.3|        34.8|              69.6|
|       1|       12.5|  0.5|    0.5|      2.75|         0.0|                  0.3|       16.55|              33.1|
|       1|        4.5|  0.5|    0.5|       0.0|         0.0|                  0.

**Query 2**

In [0]:
df = df.withColumn("pickup_time", to_timestamp("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("passenger_count", when(col("passenger_count").isNull(), 0).otherwise(col("passenger_count").cast("int")))
window_spec = Window.partitionBy("PULocationID").orderBy("pickup_time").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("cumulative_passenger_count", _sum("passenger_count").over(window_spec))
df.select("pickup_time", "PULocationID", "passenger_count", "cumulative_passenger_count").orderBy("pickup_time").show(100)


+-------------------+------------+---------------+--------------------------+
|        pickup_time|PULocationID|passenger_count|cumulative_passenger_count|
+-------------------+------------+---------------+--------------------------+
|2018-01-01 00:00:00|         229|              1|                         1|
|2018-01-01 00:00:02|          68|              1|                         1|
|2018-01-01 00:00:03|         255|              1|                         1|
|2018-01-01 00:00:03|         236|              3|                         3|
|2018-01-01 00:00:04|          37|              1|                         1|
|2018-01-01 00:00:04|         141|              1|                         1|
|2018-01-01 00:00:06|         162|              1|                         1|
|2018-01-01 00:00:11|         238|              1|                         1|
|2018-01-01 00:00:13|         144|              1|                         1|
|2018-01-01 00:00:14|         170|              1|              

**Query 3**

In [0]:
df.select("VendorID").distinct().show()

+--------+
|VendorID|
+--------+
|       1|
|       2|
+--------+



In [0]:
from pyspark.sql.functions import col, avg, to_timestamp, year
from pyspark.sql.window import Window
df = df.withColumn("pickup_time", to_timestamp("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss"))
window_spec = Window.partitionBy("VendorID").orderBy("pickup_time").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("cumulative_avg_fare", avg("fare_amount").over(window_spec))
df = df.withColumn("cumulative_avg_total", avg("total_amount").over(window_spec))
df.select("pickup_time", "VendorID", "fare_amount", "total_amount", "cumulative_avg_fare", "cumulative_avg_total").orderBy("pickup_time").show(10)


+-------------------+--------+-----------+------------+-------------------+--------------------+
|        pickup_time|VendorID|fare_amount|total_amount|cumulative_avg_fare|cumulative_avg_total|
+-------------------+--------+-----------+------------+-------------------+--------------------+
|2018-01-01 00:00:00|       2|       27.0|        27.8|               27.0|                27.8|
|2018-01-01 00:00:02|       2|        7.5|         8.8|              17.25|                18.3|
|2018-01-01 00:00:03|       2|        5.5|         6.8| 13.333333333333334|  14.466666666666667|
|2018-01-01 00:00:03|       1|       20.5|        21.8|               20.5|                21.8|
|2018-01-01 00:00:04|       1|       13.5|        14.8|               17.0|                18.3|
|2018-01-01 00:00:04|       2|        8.0|         9.3|               12.0|              13.175|
|2018-01-01 00:00:06|       1|       23.5|       29.75| 19.166666666666668|  22.116666666666664|
|2018-01-01 00:00:11|       1|

In [0]:
vendor_stats = df.groupBy("VendorID").agg(
    avg("fare_amount").alias("AverageFare"),
    _sum("Revenue").alias("TotalRevenue")
)

vendor_stats.show()

+--------+------------------+--------------------+
|VendorID|       AverageFare|        TotalRevenue|
+--------+------------------+--------------------+
|       1|11.959975333672704|1.1636389410079427E8|
|       2|  12.4669989040122|1.5502233030173886E8|
+--------+------------------+--------------------+



**Query 4**

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import count

window_spec = Window.partitionBy("payment_type").orderBy("tpep_pickup_datetime") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_moving_count = df.withColumn(
    "payment_type_moving_count",
    count("*").over(window_spec)
).select(
    "tpep_pickup_datetime", "payment_type", "payment_type_moving_count"
).orderBy("tpep_pickup_datetime")

df_moving_count.show(100, truncate=False)

+--------------------+------------+-------------------------+
|tpep_pickup_datetime|payment_type|payment_type_moving_count|
+--------------------+------------+-------------------------+
|2018-01-01 00:00:00 |2           |1                        |
|2018-01-01 00:00:02 |2           |2                        |
|2018-01-01 00:00:03 |2           |3                        |
|2018-01-01 00:00:03 |2           |4                        |
|2018-01-01 00:00:04 |2           |5                        |
|2018-01-01 00:00:04 |2           |6                        |
|2018-01-01 00:00:06 |1           |1                        |
|2018-01-01 00:00:11 |1           |2                        |
|2018-01-01 00:00:13 |1           |3                        |
|2018-01-01 00:00:14 |2           |7                        |
|2018-01-01 00:00:14 |1           |4                        |
|2018-01-01 00:00:15 |1           |5                        |
|2018-01-01 00:00:15 |2           |8                        |
|2018-01

**Query 5**

In [0]:
from pyspark.sql.functions import sum, desc, col, lit, to_date

target_date = "2018-01-19"
df_date_filtered = df.filter(to_date(col("tpep_pickup_datetime")) == lit(target_date))

# Add Revenue column
df_date_filtered = df_date_filtered.withColumn(
    "Revenue",
    col("fare_amount") + col("extra") + col("mta_tax") + col("improvement_surcharge") + col("tip_amount") + col("tolls_amount") + col("total_amount")
)

df_agg = df_date_filtered.groupBy("VendorID").agg(
    sum("Revenue").alias("total_revenue"),
    sum("passenger_count").alias("total_passengers"),
    sum("trip_distance").alias("total_distance")
)

# We will get top 2 vendors
top2_vendors = df_agg.limit(2)

top2_vendors.show(truncate=False)


+--------+-----------------+----------------+-----------------+
|VendorID|total_revenue    |total_passengers|total_distance   |
+--------+-----------------+----------------+-----------------+
|1       |4462849.16000325 |176404          |378842.5000000025|
|2       |5708885.429996097|341985          |500606.6799999969|
+--------+-----------------+----------------+-----------------+



**Query 6**

In [0]:
from pyspark.sql.functions import sum, desc

route_passenger_counts = df.groupBy("PULocationID", "DOLocationID").agg(
    sum("passenger_count").alias("total_passengers")
)
# We will get the route with the highest number of passengers
top_route = route_passenger_counts.orderBy(desc("total_passengers")).limit(1)

top_route.show(truncate=False)
# Show top 5 most frequently used routes
route_passenger_counts.orderBy(desc("total_passengers")).show(5, truncate=False)


+------------+------------+----------------+
|PULocationID|DOLocationID|total_passengers|
+------------+------------+----------------+
|264         |264         |186698          |
+------------+------------+----------------+

+------------+------------+----------------+
|PULocationID|DOLocationID|total_passengers|
+------------+------------+----------------+
|264         |264         |186698          |
|237         |236         |86732           |
|236         |236         |78691           |
|236         |237         |71751           |
|237         |237         |64492           |
+------------+------------+----------------+
only showing top 5 rows


**Query 7**

In [0]:
from pyspark.sql.functions import max as _max, sum as _sum, col, expr, desc

df_input = df
latest_ts = df_input.agg(_max("tpep_pickup_datetime")).first()[0]

window_seconds = 10
window_start = expr(f"timestamp('{latest_ts}') - INTERVAL {window_seconds} seconds")

recent_rides = df_input.filter(
    (col("tpep_pickup_datetime") > window_start) &
    (col("tpep_pickup_datetime") <= latest_ts)
)
top_pickup_locations = recent_rides.groupBy("PULocationID").agg(
    _sum("passenger_count").alias("total_passengers")
).orderBy(desc("total_passengers"))

top_pickup_locations.show(truncate=False)


+------------+----------------+
|PULocationID|total_passengers|
+------------+----------------+
|48          |2               |
+------------+----------------+



**Saving to Parquet**

In [0]:
from pyspark.sql.functions import col, sum as _sum

top_dropoff_locations = df.groupBy("DOLocationID") \
    .agg(_sum("passenger_count").alias("total_passengers")) \
    .orderBy("total_passengers", ascending=False)


In [0]:

from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import col

df = df.withColumn("Revenue", col("fare_amount") + col("extra") + col("mta_tax") + col("tip_amount") + col("tolls_amount") + col("improvement_surcharge") + col("congestion_surcharge"))

vendor_revenue = df.groupBy("VendorID") \
    .agg(_sum("Revenue").alias("total_revenue")) \
    .orderBy("total_revenue", ascending=False)

In [0]:
storage_account_name = "taxidatalake2025"
container_name = "taxidata"
abfss_path = f"abfss://taxidata@taxidatalake2025.dfs.core.windows.net"

top_pickup_locations.write.mode("overwrite").parquet(f"{abfss_path}/query1_top_pickup_locations")
top_dropoff_locations.write.mode("overwrite").parquet(f"{abfss_path}/query2_top_dropoff_locations")
vendor_revenue.write.mode("overwrite").parquet(f"{abfss_path}/query3_vendor_revenue")
df_moving_count.write.mode("overwrite").parquet(f"{abfss_path}/query4_moving_count")
top2_vendors.write.mode("overwrite").parquet(f"{abfss_path}/query5_top2_vendors")
top_route.write.mode("overwrite").parquet(f"{abfss_path}/query6_top_route")
top_pickup_locations.write.mode("overwrite").parquet(f"{abfss_path}/query7_top_pickup_locations")