In [39]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql.functions import col, count, to_date, lit, max as max_, unix_timestamp

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
spark.version

'3.3.2'

In [55]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-02-26 20:09:52--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.191.151, 65.8.191.179, 65.8.191.163, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.191.151|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: 'yellow_tripdata_2024-10.parquet'

     0K .......... .......... .......... .......... ..........  0% 2.95M 21s
    50K .......... .......... .......... .......... ..........  0% 2.44M 23s
   100K .......... .......... .......... .......... ..........  0% 2.15M 25s
   150K .......... .......... .......... .......... ..........  0% 14.5M 20s
   200K .......... .......... .......... .......... ..........  0% 1.86M 22s
   250K .......... .......... .......... .......... ..........  0% 7.50M 20s
   300K .......... .......... .......... .......... ..........  0%

In [5]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.LongType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.LongType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.LongType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [6]:
file_name = "yellow_tripdata_2024-10.parquet"

In [7]:
df_yellow = spark.read \
        .schema(yellow_schema) \
        .parquet(file_name)

In [8]:
df_yellow.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2024-09-30 20:30:44|  2024-09-30 20:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.5|         0.0|                  1.0

In [9]:
df_yellow = df_yellow.repartition(4)

In [10]:
df_yellow.write.parquet('yellow/2024/10/')

In [11]:
df_yellow = spark.read.parquet('yellow/2024/10/')

In [21]:
df_yellow = df_yellow.withColumn("pickup_date", to_date(col("tpep_pickup_datetime")))

In [23]:
df_yellow.filter(df_yellow["pickup_date"] == to_date(lit("2024-10-15"))).count()


128909

In [24]:
df_yellow.createOrReplaceTempView('yellow')

In [25]:
spark.sql(
    """select count(*) FROM
    yellow WHERE
    pickup_date = '2024-10-15' """ 
).show()

+--------+
|count(1)|
+--------+
|  128909|
+--------+



Even if I add tpep_dropoff_datetime it is still in 128k range for me

In [None]:
spark.sql(
    """select count(*) FROM
    yellow WHERE
    tpep_pickup_datetime >= '2024-10-15' and tpep_pickup_datetime < '2024-10-16' and 
    tpep_dropoff_datetime >= '2024-10-15' and tpep_dropoff_datetime < '2024-10-16'""" 
).show()

+--------+
|count(1)|
+--------+
|  128788|
+--------+



In [19]:
df_yellow.dtypes

[('VendorID', 'int'),
 ('tpep_pickup_datetime', 'timestamp'),
 ('tpep_dropoff_datetime', 'timestamp'),
 ('passenger_count', 'bigint'),
 ('trip_distance', 'double'),
 ('RatecodeID', 'bigint'),
 ('store_and_fwd_flag', 'string'),
 ('PULocationID', 'int'),
 ('DOLocationID', 'int'),
 ('payment_type', 'bigint'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double'),
 ('congestion_surcharge', 'double')]

In [20]:
df_yellow = df_yellow.withColumn("duration", (df_yellow["tpep_dropoff_datetime"].cast("long") - df_yellow["tpep_pickup_datetime"].cast("long")) )

In [35]:
grouped_by_date_df = df_yellow.groupBy("pickup_date").agg(max_("duration").alias("max_duration"))

In [36]:
grouped_by_date_df = grouped_by_date_df.orderBy(col("max_duration").desc(), col("pickup_date").asc())

In [37]:
grouped_by_date_df.show()

+-----------+------------+
|pickup_date|max_duration|
+-----------+------------+
| 2024-10-16|      585424|
| 2024-10-03|      515970|
| 2024-10-22|      495938|
| 2024-10-18|      413405|
| 2024-10-20|      323634|
| 2024-10-12|      243264|
| 2024-10-17|      237840|
| 2024-10-21|      167121|
| 2024-10-24|      138507|
| 2024-10-23|      122224|
| 2024-10-02|       94675|
| 2024-10-14|       91049|
| 2024-10-15|       90858|
| 2024-10-10|       86367|
| 2024-10-05|       86359|
| 2024-10-26|       86350|
| 2024-10-28|       86350|
| 2024-10-11|       86348|
| 2024-10-01|       86335|
| 2024-10-27|       86334|
+-----------+------------+
only showing top 20 rows



In [41]:
df_yellow = df_yellow.withColumn("duration_seconds", unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime")))

In [43]:
df_yellow.selectExpr("duration", "duration_seconds").limit(5).show()

+--------+----------------+
|duration|duration_seconds|
+--------+----------------+
|     443|             443|
|     549|             549|
|     111|             111|
|     146|             146|
|     349|             349|
+--------+----------------+



In [None]:
df_yellow = df_yellow.drop("duration_seconds")

In [46]:
df_yellow = df_yellow.withColumn("duration_hours", col("duration")/3600)

In [47]:
grouped_by_date_df = df_yellow.groupBy("pickup_date").agg(max_("duration_hours").alias("max_duration_hours"))

In [48]:
grouped_by_date_df = grouped_by_date_df.orderBy(col("max_duration_hours").desc(), col("pickup_date").asc())

In [49]:
grouped_by_date_df.show()

+-----------+------------------+
|pickup_date|max_duration_hours|
+-----------+------------------+
| 2024-10-16|162.61777777777777|
| 2024-10-03|           143.325|
| 2024-10-22|137.76055555555556|
| 2024-10-18|114.83472222222223|
| 2024-10-20| 89.89833333333333|
| 2024-10-12| 67.57333333333334|
| 2024-10-17| 66.06666666666666|
| 2024-10-21|           46.4225|
| 2024-10-24| 38.47416666666667|
| 2024-10-23| 33.95111111111111|
| 2024-10-02| 26.29861111111111|
| 2024-10-14| 25.29138888888889|
| 2024-10-15|25.238333333333333|
| 2024-10-10|23.990833333333335|
| 2024-10-05|23.988611111111112|
| 2024-10-26| 23.98611111111111|
| 2024-10-28| 23.98611111111111|
| 2024-10-11|23.985555555555557|
| 2024-10-01|23.981944444444444|
| 2024-10-27|23.981666666666666|
+-----------+------------------+
only showing top 20 rows



In [53]:
df_yellow.createOrReplaceTempView('yellow')

In [55]:
spark.sql("""select pickup_date, max(duration_hours) as max_duration_hours from yellow group by pickup_date order by max_duration_hours desc, pickup_date asc""").show()

+-----------+------------------+
|pickup_date|max_duration_hours|
+-----------+------------------+
| 2024-10-16|162.61777777777777|
| 2024-10-03|           143.325|
| 2024-10-22|137.76055555555556|
| 2024-10-18|114.83472222222223|
| 2024-10-20| 89.89833333333333|
| 2024-10-12| 67.57333333333334|
| 2024-10-17| 66.06666666666666|
| 2024-10-21|           46.4225|
| 2024-10-24| 38.47416666666667|
| 2024-10-23| 33.95111111111111|
| 2024-10-02| 26.29861111111111|
| 2024-10-14| 25.29138888888889|
| 2024-10-15|25.238333333333333|
| 2024-10-10|23.990833333333335|
| 2024-10-05|23.988611111111112|
| 2024-10-26| 23.98611111111111|
| 2024-10-28| 23.98611111111111|
| 2024-10-11|23.985555555555557|
| 2024-10-01|23.981944444444444|
| 2024-10-27|23.981666666666666|
+-----------+------------------+
only showing top 20 rows



In [56]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-02-27 17:19:52--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.9.94.118, 65.9.94.147, 65.9.94.155, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.9.94.118|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: 'taxi_zone_lookup.csv.1'

     0K .......... ..                                         100%  811M=0s

2025-02-27 17:19:54 (811 MB/s) - 'taxi_zone_lookup.csv.1' saved [12331/12331]



In [59]:
schema = types.StructType([
      types.StructField("LocationID", types.IntegerType(), True),
      types.StructField("Borough", types.StringType(), True),
      types.StructField("Zone", types.StringType(), True),
      types.StructField("service_zone", types.StringType(), True),
])

In [60]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("C:\\Users\\fahmi\\projects\\dataeng_bootcamp\\week5\\taxi_zone_lookup.csv")

In [61]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [64]:
df_zones.write.parquet("zones")

In [65]:
df_zones = spark.read.parquet("zones")

In [66]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [68]:
df_zones.createOrReplaceTempView("zones")

In [79]:
spark.sql(
    """
   select concat(coalesce(puzones.Zone,'Unknown')) , count(*) 
   from yellow as taxi
   left join zones as puzones
    on taxi.PULocationID = puzones.LocationID
    group by 1
    order by 2 asc
    """
).show(20, False)

+---------------------------------------------+--------+
|concat(coalesce(Zone, Unknown))              |count(1)|
+---------------------------------------------+--------+
|Governor's Island/Ellis Island/Liberty Island|1       |
|Rikers Island                                |2       |
|Arden Heights                                |2       |
|Green-Wood Cemetery                          |3       |
|Jamaica Bay                                  |3       |
|Charleston/Tottenville                       |4       |
|Eltingville/Annadale/Prince's Bay            |4       |
|Rossville/Woodrow                            |4       |
|Port Richmond                                |4       |
|West Brighton                                |4       |
|Crotona Park                                 |6       |
|Great Kills                                  |6       |
|Mariners Harbor                              |7       |
|Heartland Village/Todt Hill                  |7       |
|Oakwood                       