In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('local').appName('test').getOrCreate()

In [3]:
spark

In [4]:
df_yellow = spark.read.parquet('D:/data-engineering-zoomcamp/Week_5_Spark/homework/yellow_tripdata_2024-10.parquet', override='True', header=True, inferSchema=True)

In [5]:
df_yellow.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [6]:
df_yellow_partitions = df_yellow.repartition(4)

In [7]:
df_yellow_partitions.coalesce(1).write.parquet('data/report/partition/', mode='overwrite')

In [8]:
from pyspark.sql.functions import date_format, col, max, datediff


In [9]:

df_yellow_partitions.filter(date_format(col("tpep_pickup_datetime"), 'yyyy-MM-dd') == "2024-10-15").count()


128893

In [20]:
# df_yellow_partitions.select(datediff(max('tpep_pickup_datetime'), (col('tpep_pickup_datetime')))).show()
df_yellow_partitions.createOrReplaceTempView("df_yellow_partitions")
spark.sql("""
    SELECT MAX((unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 3600) 
    FROM df_yellow_partitions
""").show()


+--------------------------------------------------------------------------------------------------------------------------------------+
|max(((unix_timestamp(tpep_dropoff_datetime, yyyy-MM-dd HH:mm:ss) - unix_timestamp(tpep_pickup_datetime, yyyy-MM-dd HH:mm:ss)) / 3600))|
+--------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                    162.61777777777777|
+--------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
#4040

In [28]:
df_zones = spark.read.csv('D:/data-engineering-zoomcamp/Week_5_Spark/homework/taxi_zone_lookup.csv', header=True, inferSchema=True)

In [33]:
df_zones.createOrReplaceTempView("df_zones")


In [35]:
spark.sql("""
    SELECT 
        yp.VendorID,
        yp.tpep_pickup_datetime,
        yp.tpep_dropoff_datetime,
        yp.passenger_count,
        yp.trip_distance,
        yp.RatecodeID,
        yp.store_and_fwd_flag,
        pz.Zone AS PickupLocation,
        dz.Zone AS DropoffLocation
    FROM df_yellow_partitions AS yp
    JOIN df_zones AS pz ON yp.PULocationID = pz.LocationID
    JOIN df_zones AS dz ON yp.DOLocationID = dz.LocationID
""").createOrReplaceTempView("df_yellow_partitions_with_zones")


In [42]:
spark.sql("""
    Select count(PickupLocation) AS PickupLocationCount, PickupLocation
    FROM df_yellow_partitions_with_zones Group By PickupLocation Order By PickupLocationCount ASC
          """).show()

+-------------------+--------------------+
|PickupLocationCount|      PickupLocation|
+-------------------+--------------------+
|                  1|Governor's Island...|
|                  2|       Rikers Island|
|                  2|       Arden Heights|
|                  3| Green-Wood Cemetery|
|                  3|         Jamaica Bay|
|                  4|   Rossville/Woodrow|
|                  4|       West Brighton|
|                  4|       Port Richmond|
|                  4|Eltingville/Annad...|
|                  4|Charleston/Totten...|
|                  6|         Great Kills|
|                  6|        Crotona Park|
|                  7|Heartland Village...|
|                  7|     Mariners Harbor|
|                  9|Saint George/New ...|
|                  9|             Oakwood|
|                 10|       Broad Channel|
|                 10|New Dorp/Midland ...|
|                 12|         Westerleigh|
|                 12|     Pelham Bay Park|
+----------