In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/24 11:13:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [5]:
df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [7]:
pyspark.__version__

'3.5.5'

In [8]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-24 11:32:42--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.164.82.160, 3.164.82.197, 3.164.82.40, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.164.82.160|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-24 11:32:47 (14.9 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [12]:
df_yellow_oct = spark.read \
    .option("header", "true") \
    .parquet("yellow_tripdata_2024-10.parquet")

In [18]:
df_yellow_oct \
        .repartition(4) \
        .write.parquet("tmp/yellow_oct")

                                                                                

In [132]:
df_yellow_oct.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [172]:
spark.sql("""
SELECT
    lookup_zones.Zone,
    COUNT(1) as trip_num
      
FROM
    oct_data

INNER JOIN lookup_zones ON oct_data.PULocationID=lookup_zones.LocationID

GROUP BY Zone
ORDER BY trip_num ASC

""").show()

[Stage 173:>                                                        (0 + 4) / 4]

+--------------------+--------+
|                Zone|trip_num|
+--------------------+--------+
|Governor's Island...|       1|
|       Rikers Island|       2|
|       Arden Heights|       2|
|         Jamaica Bay|       3|
| Green-Wood Cemetery|       3|
|Charleston/Totten...|       4|
|   Rossville/Woodrow|       4|
|       West Brighton|       4|
|Eltingville/Annad...|       4|
|       Port Richmond|       4|
|         Great Kills|       6|
|        Crotona Park|       6|
|Heartland Village...|       7|
|     Mariners Harbor|       7|
|Saint George/New ...|       9|
|             Oakwood|       9|
|       Broad Channel|      10|
|New Dorp/Midland ...|      10|
|         Westerleigh|      12|
|     Pelham Bay Park|      12|
+--------------------+--------+
only showing top 20 rows



                                                                                

In [20]:
df_yellow_oct.createOrReplaceTempView('oct_data')

In [131]:
spark.sql("""
SELECT
    VendorID,
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    DATEDIFF(minute, tpep_pickup_datetime, tpep_dropoff_datetime)/60 AS hour_diff,
    trip_distance
    
FROM
    oct_data
order by hour_diff DESC
""").show()



+--------+--------------------+---------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|         hour_diff|trip_distance|
+--------+--------------------+---------------------+------------------+-------------+
|       2| 2024-10-16 13:03:49|  2024-10-23 07:40:53|162.61666666666667|        32.37|
|       2| 2024-10-03 18:47:25|  2024-10-09 18:06:55|143.31666666666666|          0.0|
|       2| 2024-10-22 16:00:55|  2024-10-28 09:46:33|            137.75|          0.0|
|       2| 2024-10-18 09:53:32|  2024-10-23 04:43:37|114.83333333333333|        12.37|
|       2| 2024-10-21 00:36:24|  2024-10-24 18:30:18| 89.88333333333334|         0.03|
|       2| 2024-10-20 13:30:52|  2024-10-24 06:57:38| 89.43333333333334|         1.88|
|       2| 2024-10-22 16:04:52|  2024-10-25 14:22:49| 70.28333333333333|         0.38|
|       2| 2024-10-12 19:32:51|  2024-10-15 15:07:15| 67.56666666666666|         0.53|
|       2| 2024-10-17 17:58:18|  2024-10-20

                                                                                

In [146]:
# write to parquet
df.write.parquet("lookup_zones")

In [147]:
# Load the zone lookup data into a temp view in Spark
df = spark.read.parquet("lookup_zones/part-00000-ee773c6e-6b73-419f-b3b9-a1bd783df62c-c000.snappy.parquet")

In [148]:
df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [149]:
df.createOrReplaceTempView('lookup_zones')