In [22]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
#!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
spark.version

'3.5.5'

In [5]:
df_yellow_2024_10 = spark.read.parquet('yellow_tripdata_2024-10.parquet')

In [6]:
output_path = 'yellow_tripdata_2024-10_partitioned.parquet'

In [29]:
df_yellow_2024_10\
    .repartition(4) \
    .write.parquet(output_path, mode='overwrite')

In [8]:
df_yellow_2024_10.registerTempTable('yellow_2024_10_data')



In [13]:
count_records_query = """
SELECT
    COUNT(1) AS count_records    
FROM yellow_2024_10_data
WHERE day(tpep_pickup_datetime) = 15 and day(tpep_dropoff_datetime) = 15
"""

In [14]:
spark\
    .sql(count_records_query)\
    .show()

+-------------+
|count_records|
+-------------+
|       127993|
+-------------+



In [33]:
longest_trip_query =  """
SELECT
    MAX(date_diff(SECOND, tpep_pickup_datetime, tpep_dropoff_datetime)) / 3600.0 AS longest_trip_hours    
FROM yellow_2024_10_data
WHERE tpep_pickup_datetime < tpep_dropoff_datetime
"""

In [34]:
spark\
    .sql(longest_trip_query)\
    .show()

+------------------+
|longest_trip_hours|
+------------------+
|        162.617778|
+------------------+



In [37]:
 df_zone = spark.read \
     .option("header", "true") \
     .csv('taxi_zone_lookup.csv')

 df_zone \
     .repartition(4) \
     .write.parquet('zones')

In [38]:
df_zone.registerTempTable('zone_data')

In [42]:
least_pu_zone_query = """
SELECT
    COUNT(y.PULocationID) AS pu_count,
    z.Zone
FROM yellow_2024_10_data y
JOIN zone_data z
    ON y.PULocationID = z.LocationID
GROUP BY z.Zone
ORDER BY pu_count ASC
LIMIT 1
"""

In [43]:
spark\
    .sql(least_pu_zone_query)\
    .show()

+--------+--------------------+
|pu_count|                Zone|
+--------+--------------------+
|       1|Governor's Island...|
+--------+--------------------+

