In [64]:
import pyspark
from pyspark.sql import SparkSession

import pandas as pd

In [65]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [66]:
spark.version

'3.5.5'

In [67]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .parquet("../data/yellow_tripdata_2024-10.parquet")

In [68]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [69]:
df_part = df.repartition(4)

df_part.write.parquet("../data/part", mode="overwrite")

                                                                                

In [70]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [71]:
from pyspark.sql import functions as F

In [72]:
df_edit = df \
        .withColumn("pickup_date", F.to_date(F.col("tpep_pickup_datetime"))) \
        .withColumn("dropoff_date", F.to_date(F.col("tpep_dropoff_datetime"))) \
        .withColumn("pickup_time", F.date_format("tpep_pickup_datetime", "HH:mm:ss")) \
        .withColumn("dropoff_time", F.date_format("tpep_dropoff_datetime", "HH:mm:ss")) \
        .withColumn("time_diff", ( F.col("tpep_dropoff_datetime") - F.col("tpep_pickup_datetime") ))
        

In [73]:
df_edit.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee',
 'pickup_date',
 'dropoff_date',
 'pickup_time',
 'dropoff_time',
 'time_diff']

In [74]:
df_edit.registerTempTable('temp_data')



In [75]:
spark.sql("""
        SELECT
            count(1)
        FROM
            temp_data
        WHERE
            tpep_pickup_datetime LIKE "2024-10-15%"
        """).show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



In [61]:
df_edit.select("time_diff").agg({"time_diff": "max"}).collect()

[Row(max(time_diff)=datetime.timedelta(days=6, seconds=67024))]

In [78]:
!wget -P ../data/zone_lookup https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-08 02:12:56--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.85.39.65, 52.85.39.153, 52.85.39.117, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.39.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘../data/zone_lookup/taxi_zone_lookup.csv’


2025-03-08 02:12:57 (864 KB/s) - ‘../data/zone_lookup/taxi_zone_lookup.csv’ saved [12331/12331]



In [79]:
df_zone = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv("../data/zone_lookup/taxi_zone_lookup.csv")

In [82]:
df_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [81]:
df_part.select("PULocationID").show()

+------------+
|PULocationID|
+------------+
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
|         196|
+------------+
only showing top 20 rows



In [90]:
df_zone_tmp = df_zone \
    .withColumnRenamed('LocationID', 'PULocationID')

In [92]:
# Need to merge the pickup location zones to determine the name

df_join = df_part.join(df_zone_tmp, on=['PULocationID'], how='left')

In [93]:
df_join.show()



+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------+--------------------+------------+
|PULocationID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|  Borough|                Zone|service_zone|
+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------+--------------------+------------+
|          48|       1| 2024-10-03 03:40:19|  2024-10-0

                                                                                

In [97]:
df_join.groupBy(["Zone"]).count().sort(F.asc("count")).show()

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|Governor's Island...|    1|
|       Rikers Island|    2|
|       Arden Heights|    2|
|         Jamaica Bay|    3|
| Green-Wood Cemetery|    3|
|Charleston/Totten...|    4|
|   Rossville/Woodrow|    4|
|       Port Richmond|    4|
|Eltingville/Annad...|    4|
|       West Brighton|    4|
|         Great Kills|    6|
|        Crotona Park|    6|
|Heartland Village...|    7|
|     Mariners Harbor|    7|
|Saint George/New ...|    9|
|             Oakwood|    9|
|       Broad Channel|   10|
|New Dorp/Midland ...|   10|
|         Westerleigh|   12|
|     Pelham Bay Park|   12|
+--------------------+-----+
only showing top 20 rows

