In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/20 22:22:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark.sparkContext.setLogLevel("WARN")

In [11]:
zone = spark.read \
    .option("header", "true") \
    .csv("notebooks/taxi_zone_lookup.csv")
zone.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows


In [13]:
df = spark.read \
    .parquet("notebooks/green_tripdata_2025-11.parquet")
df.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|cbd_congestion_fee|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+
|       2| 2025-11-01 00:34:48|  2025-11-01 00:41:39|                 N|         1|          74|          

Q1

In [17]:
q1 = df.filter(col("trip_distance") <= 1)

In [18]:
q1.count()

8009

Q2

In [42]:
q2 = df.filter(col("trip_distance") < 100).sort(col("trip_distance").desc())
q2.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|cbd_congestion_fee|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+
|       2| 2025-11-14 15:36:27|  2025-11-14 18:40:48|                 N|         4|         130|         2

Q3

In [26]:
q3 = df.select("lpep_pickup_datetime", "PULocationID", "total_amount")

In [53]:
q3.withColumn("date", col("lpep_pickup_datetime").cast("date")) \
    .filter(col("date") == "2025-11-18") \
    .sort(col("total_amount").desc()) \
    .join(zone, col("PULocationID") == col("LocationID")) \
    .select("date", "total_amount", "Zone") \
    .groupby("Zone").agg(sum("total_amount").alias("summm")) \
    .sort(col("summm").desc()) \
    .show(10)

+--------------------+------------------+
|                Zone|             summm|
+--------------------+------------------+
|   East Harlem North| 9281.919999999996|
|   East Harlem South| 6696.130000000003|
|        Central Park|2378.7899999999995|
|Washington Height...|           2139.05|
| Morningside Heights|2100.5900000000006|
|             Jamaica|1998.1100000000001|
|         Fort Greene|1780.4099999999999|
|Downtown Brooklyn...|           1499.02|
|        Forest Hills|1423.7500000000005|
|            Elmhurst|1251.8199999999997|
+--------------------+------------------+
only showing top 10 rows


Q4

In [75]:
q4 = df.select("lpep_pickup_datetime", "PULocationID", "DOLocationID", "tip_amount") \
    .filter(month(col("lpep_pickup_datetime")) == 11) \
    .join(zone, col("PULocationID") == col("LocationID")) \
    .filter(col("Zone") == "East Harlem North") \
    .select(
        col("tip_amount"),
        col("DOLocationID"),
    ) \
    .join(zone, col("DOLocationID") == col("LocationID")) \
    .select("tip_amount", col("Zone").alias("DOZone")) \
    .groupby("DOZone").agg(
        max("tip_amount").alias("maxx")
    ) \
    .sort(desc("maxx")) \
    .show()

+--------------------+-----+
|              DOZone| maxx|
+--------------------+-----+
|      Yorkville West|81.89|
|   LaGuardia Airport| 50.0|
|   East Harlem North| 45.0|
|Long Island City/...|34.25|
|                  NA| 28.9|
|         JFK Airport|23.53|
|      Newark Airport| 20.0|
|   East Harlem South| 20.0|
|        Clinton East| 20.0|
| Morningside Heights| 20.0|
|      Queens Village| 18.0|
|    Sunset Park West|17.55|
|     Windsor Terrace|17.04|
|       Alphabet City| 15.9|
|  Stuyvesant Heights|15.45|
|      Central Harlem| 15.0|
|Washington Height...| 15.0|
|Briarwood/Jamaica...|14.06|
|        East Chelsea|13.53|
|Financial Distric...| 13.2|
+--------------------+-----+
only showing top 20 rows
