In [5]:
import pyspark
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [7]:
pyspark.__version__

'3.5.5'

In [8]:
spark.version

'3.5.5'

In [10]:
df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

In [12]:
df.show(3)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [16]:
df \
.repartition(4) \
.write.parquet('data/yellow/2024/10', mode='overwrite')

                                                                                

In [17]:
! ls -lh data/yellow/2024/10

total 90M
-rw-r--r-- 1 maxkaizo maxkaizo   0 Mar  7 15:19 _SUCCESS
-rw-r--r-- 1 maxkaizo maxkaizo 23M Mar  7 15:19 part-00000-ed178c3b-2821-4ad4-8454-93ba2f6b80da-c000.snappy.parquet
-rw-r--r-- 1 maxkaizo maxkaizo 23M Mar  7 15:19 part-00001-ed178c3b-2821-4ad4-8454-93ba2f6b80da-c000.snappy.parquet
-rw-r--r-- 1 maxkaizo maxkaizo 23M Mar  7 15:19 part-00002-ed178c3b-2821-4ad4-8454-93ba2f6b80da-c000.snappy.parquet
-rw-r--r-- 1 maxkaizo maxkaizo 23M Mar  7 15:19 part-00003-ed178c3b-2821-4ad4-8454-93ba2f6b80da-c000.snappy.parquet


In [19]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [20]:
df.groupBy('VendorID').count().show()

+--------+-------+
|VendorID|  count|
+--------+-------+
|       1| 892962|
|       2|2940734|
|       6|     75|
+--------+-------+



In [21]:
df.registerTempTable('trips_data')



In [23]:
spark.sql("""
SELECT
    count(1)
FROM
    trips_data
WHERE
    CAST(tpep_pickup_datetime AS DATE) = '2024-10-15'
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



In [41]:
spark.sql("""
SELECT 
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    (unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 3600 AS trip_duration_hours
FROM trips_data
ORDER BY trip_duration_hours DESC
LIMIT 1
""").show()


+--------------------+---------------------+-------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_duration_hours|
+--------------------+---------------------+-------------------+
| 2024-10-16 13:03:49|  2024-10-23 07:40:53| 162.61777777777777|
+--------------------+---------------------+-------------------+



In [44]:
spark.sql("""
SELECT 
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    timestampdiff(HOUR, tpep_pickup_datetime, tpep_dropoff_datetime) AS trip_duration_hours
FROM trips_data
ORDER BY trip_duration_hours DESC
LIMIT 1
""").show()


+--------------------+---------------------+-------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_duration_hours|
+--------------------+---------------------+-------------------+
| 2024-10-16 13:03:49|  2024-10-23 07:40:53|                162|
+--------------------+---------------------+-------------------+



                                                                                

In [63]:
spark.sql("""
SELECT 
    MAX(timestampdiff(HOUR, tpep_pickup_datetime, tpep_dropoff_datetime)) AS max_trip_duration_hours
FROM trips_data
""").show()



+-----------------------+
|max_trip_duration_hours|
+-----------------------+
|                    162|
+-----------------------+



                                                                                

In [93]:
df_zones = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('taxi_zone_lookup.csv')

In [96]:
df_least_frequent = spark.sql("""
SELECT 
    PULocationID, count(PULocationID) as qty
FROM
    trips_data
GROUP BY
    PULocationID
ORDER BY 
    qty ASC
LIMIT 10
""")

In [97]:
df_least_frequent.show()

+------------+---+
|PULocationID|qty|
+------------+---+
|         105|  1|
|           5|  2|
|         199|  2|
|           2|  3|
|         111|  3|
|         204|  4|
|          44|  4|
|          84|  4|
|         245|  4|
|         187|  4|
+------------+---+



In [99]:
df_zones = df_zones \
    .withColumnRenamed('LocationID', 'PULocationID') 

In [101]:
df_join = df_least_frequent.join(df_zones, on=['PULocationID'], how='inner')

In [102]:
from pyspark.sql.functions import asc

In [104]:
df_join.sort(asc("qty")).show(truncate=False)

+------------+---+-------------+---------------------------------------------+------------+
|PULocationID|qty|Borough      |Zone                                         |service_zone|
+------------+---+-------------+---------------------------------------------+------------+
|105         |1  |Manhattan    |Governor's Island/Ellis Island/Liberty Island|Yellow Zone |
|5           |2  |Staten Island|Arden Heights                                |Boro Zone   |
|199         |2  |Bronx        |Rikers Island                                |Boro Zone   |
|2           |3  |Queens       |Jamaica Bay                                  |Boro Zone   |
|111         |3  |Brooklyn     |Green-Wood Cemetery                          |Boro Zone   |
|44          |4  |Staten Island|Charleston/Tottenville                       |Boro Zone   |
|84          |4  |Staten Island|Eltingville/Annadale/Prince's Bay            |Boro Zone   |
|187         |4  |Staten Island|Port Richmond                                |Bo