In [25]:
import pyspark
from pyspark.sql import SparkSession

spark= SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()
        


In [None]:
! wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet ./data/hw


In [26]:
df_yellow = spark.read.parquet('data/hw/*.parquet')

In [27]:
spark.version

'3.3.2'

In [28]:
df_yellow.rdd.getNumPartitions() # by default as many as the existing CPU cores in local machine

8

In [29]:
df_yellow = df_yellow.repartition(4)

In [30]:
df_yellow.write.parquet('data/tmp/', mode = "overwrite")

                                                                                

In [31]:
df_yellow.rdd.getNumPartitions()



4

In [32]:
! cd data/tmp && ls -lh

total 97M
-rw-r--r-- 1 manos manos   0 Mar  1 17:01 _SUCCESS
-rw-r--r-- 1 manos manos 25M Mar  1 17:01 part-00000-083252c8-10c0-4e8f-9d0e-2e060a42f247-c000.snappy.parquet
-rw-r--r-- 1 manos manos 25M Mar  1 17:01 part-00001-083252c8-10c0-4e8f-9d0e-2e060a42f247-c000.snappy.parquet
-rw-r--r-- 1 manos manos 25M Mar  1 17:01 part-00002-083252c8-10c0-4e8f-9d0e-2e060a42f247-c000.snappy.parquet
-rw-r--r-- 1 manos manos 25M Mar  1 17:01 part-00003-083252c8-10c0-4e8f-9d0e-2e060a42f247-c000.snappy.parquet


In [33]:
df_yellow.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [34]:
from datetime import datetime
from pyspark.sql.functions import lit

filt_date_start = datetime.strptime("2024-10-15" , "%Y-%m-%d")
filt_date_end = datetime.strptime("2024-10-16" , "%Y-%m-%d")

In [35]:
df_yellow.filter((df_yellow.tpep_pickup_datetime>= lit(filt_date_start)) & (df_yellow.tpep_pickup_datetime<lit(filt_date_end))).count()

                                                                                

122561

In [36]:
df_yellow.createOrReplaceTempView('trips_oct_2024')

In [38]:
spark.sql("""
SELECT
    MAX((unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 3600) AS longest_trip_hours
FROM
    trips_oct_2024
""").show()



+------------------+
|longest_trip_hours|
+------------------+
|162.61777777777777|
+------------------+



                                                                                

In [39]:
! wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv  -P ./data/hw



--2025-03-01 17:03:54--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.160.226.111, 3.160.226.161, 3.160.226.85, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.160.226.111|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘./data/hw/taxi_zone_lookup.csv.1’


2025-03-01 17:03:54 (190 MB/s) - ‘./data/hw/taxi_zone_lookup.csv.1’ saved [12331/12331]



In [40]:
lookup = spark.read \
    .option("header", "true") \
    .csv('data/hw/taxi_zone_lookup.csv')

In [42]:
lookup.show(4)

+----------+---------+--------------------+------------+
|LocationID|  Borough|                Zone|service_zone|
+----------+---------+--------------------+------------+
|         1|      EWR|      Newark Airport|         EWR|
|         2|   Queens|         Jamaica Bay|   Boro Zone|
|         3|    Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|Manhattan|       Alphabet City| Yellow Zone|
+----------+---------+--------------------+------------+
only showing top 4 rows



In [41]:
from pyspark.sql.functions import broadcast 
        
broadcast_dataframe = broadcast(lookup) 

In [46]:
broadcasted_join_table = df_yellow.join(broadcast_dataframe, df_yellow.PULocationID == broadcast_dataframe.LocationID )


In [48]:
broadcasted_join_table.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [PULocationID#95], [cast(LocationID#206 as int)], Inner, BuildRight, false
   :- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [plan_id=551]
   :  +- Filter isnotnull(PULocationID#95)
   :     +- FileScan parquet [VendorID#88,tpep_pickup_datetime#89,tpep_dropoff_datetime#90,passenger_count#91L,trip_distance#92,RatecodeID#93L,store_and_fwd_flag#94,PULocationID#95,DOLocationID#96,payment_type#97L,fare_amount#98,extra#99,mta_tax#100,tip_amount#101,tolls_amount#102,improvement_surcharge#103,total_amount#104,congestion_surcharge#105,Airport_fee#106] Batched: true, DataFilters: [isnotnull(PULocationID#95)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/manos/data-engineering-zoomcamp/05-batch/code/data/hw/yello..., PartitionFilters: [], PushedFilters: [IsNotNull(PULocationID)], ReadSchema: struct<VendorID:int,tpep_pickup_datetime:timestamp,tpep_dropoff_datetime:timestamp,passenger_coun..

In [51]:
broadcasted_join_table.createOrReplaceTempView('trips_oct_2024_with_zones')

In [73]:
spark.sql("""
WITH pu_locations_freqs AS (
    SELECT
        Zone,
        COUNT(PULocationID) as pu_location_freq
    FROM
        trips_oct_2024_with_zones
    GROUP BY Zone
)
,
min_freq AS (
    SELECT 
        MIN(pu_location_freq) AS min_freq 
    FROM pu_locations_freqs
)

SELECT 
    p.Zone,
    p.pu_location_freq
FROM pu_locations_freqs p
JOIN min_freq m 
    ON p.pu_location_freq = m.min_freq



""").show(40,truncate=False)

                                                                                

+---------------------------------------------+----------------+
|Zone                                         |pu_location_freq|
+---------------------------------------------+----------------+
|Governor's Island/Ellis Island/Liberty Island|1               |
+---------------------------------------------+----------------+

