In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('module-5') \
    .getOrCreate()

25/02/25 23:09:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
df0 = spark.read.parquet('yellow_tripdata_2024-10.parquet')

                                                                                                                                                                                                                             

In [None]:
df0.printSchema()

In [4]:
df1 = df0.repartition(4)

In [None]:
output_path = './yellow-10-24/'
df1.write.parquet(output_path)

In [7]:
df0.count()

3833771

In [6]:
df1.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [15]:
import pyspark.sql.functions as F

(
    df0
    .withColumn("pickup_day", F.date_trunc('day', df0.tpep_pickup_datetime))
    .filter(F.col("pickup_day") == "2024-10-15 00:00:00")
    .count()
)

128892

In [17]:
df0.createOrReplaceTempView('yellow_10_24')

In [21]:
spark.sql("""
select count(1) as pickup_count
from yellow_10_24
where tpep_pickup_datetime >= '2024-10-15'
    and tpep_pickup_datetime < '2024-10-16'
"""
).show()

+------------+
|pickup_count|
+------------+
|      128892|
+------------+



                                                                                                                                                                                                                             

In [28]:
from pyspark.sql.functions import expr

(
    df0
    .withColumn("hour_diff", expr("TIMESTAMPDIFF(HOUR, tpep_pickup_datetime, tpep_dropoff_datetime)"))
    .select("tpep_pickup_datetime", "tpep_dropoff_datetime", "hour_diff")
    .sort("hour_diff", ascending=False)
    .show()
)

+--------------------+---------------------+---------+
|tpep_pickup_datetime|tpep_dropoff_datetime|hour_diff|
+--------------------+---------------------+---------+
| 2024-10-16 08:03:49|  2024-10-23 02:40:53|      162|
| 2024-10-03 13:47:25|  2024-10-09 13:06:55|      143|
| 2024-10-22 11:00:55|  2024-10-28 04:46:33|      137|
| 2024-10-18 04:53:32|  2024-10-22 23:43:37|      114|
| 2024-10-20 19:36:24|  2024-10-24 13:30:18|       89|
| 2024-10-20 08:30:52|  2024-10-24 01:57:38|       89|
| 2024-10-22 11:04:52|  2024-10-25 09:22:49|       70|
| 2024-10-12 14:32:51|  2024-10-15 10:07:15|       67|
| 2024-10-17 12:58:18|  2024-10-20 07:02:18|       66|
| 2024-10-21 09:28:21|  2024-10-23 07:53:42|       46|
| 2024-10-20 13:58:28|  2024-10-22 08:17:00|       42|
| 2024-10-24 15:29:58|  2024-10-26 05:58:25|       38|
| 2024-10-23 18:52:02|  2024-10-25 04:49:06|       33|
| 2024-10-02 05:36:50|  2024-10-03 07:54:45|       26|
| 2024-10-15 06:49:15|  2024-10-16 08:03:33|       25|
| 2024-10-

In [None]:
## Lookup table

In [33]:
df_tz = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [34]:
df_tz.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [35]:
(
    df0.join(df_tz, on=df0.PULocationID == df_tz.LocationID)
).printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone

In [46]:
(
    df0.join(df_tz, on=df0.PULocationID == df_tz.LocationID)
    .select("Zone", "tpep_pickup_datetime")
    .groupBy("Zone")
    .agg(F.count("*").alias("zone_count")) 
    .sort("zone_count")
).show(truncate=False)

+---------------------------------------------+----------+
|Zone                                         |zone_count|
+---------------------------------------------+----------+
|Governor's Island/Ellis Island/Liberty Island|1         |
|Arden Heights                                |2         |
|Rikers Island                                |2         |
|Jamaica Bay                                  |3         |
|Green-Wood Cemetery                          |3         |
|Charleston/Tottenville                       |4         |
|Port Richmond                                |4         |
|Rossville/Woodrow                            |4         |
|Eltingville/Annadale/Prince's Bay            |4         |
|West Brighton                                |4         |
|Crotona Park                                 |6         |
|Great Kills                                  |6         |
|Heartland Village/Todt Hill                  |7         |
|Mariners Harbor                              |7        