In [0]:

catalog = 'taxi_eda_db'


schema = 'yellow_taxi_trips'
volume = 'data'
file_name = 'yellow_tripdata_2019-01.parquet'
table_name = 'tbl_yellow_taxi_trips'
path_volume = '/Volumes/' + catalog + "/" + schema + '/' + volume
path_table =  catalog + "." + schema
download_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet'

In [0]:

spark.sql('create catalog if not exists ' + catalog)
spark.sql('create schema if not exists ' + catalog + '.' + schema)
spark.sql('create volume if not exists ' + catalog + '.' + schema + '.' + volume)

DataFrame[]

In [0]:

dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")

True

In [0]:

df_trips = spark.read.parquet(f"{path_volume}/{file_name}",
  header=True,
  inferSchema=True,
  sep=",")

In [0]:

df_trips.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2019-01-01 00:46:40|  2019-01-01 00:53:20|            1.0|          1.5|       1.0|                 N|         151|         239|           1|        7.0|  0.5|    0.5|      1.6

Part 1 :

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

df_trips = df_trips.withColumn("trip_id", monotonically_increasing_id())
df_trips.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|trip_id|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+
|       1| 2019-01-01 00:46:40|  2019-01-01 00:53:20|            1.0|          1.5|       1.0|                 N|         151|         239|           1|        7.0

In [0]:
from pyspark.sql.functions import col

df_trips.orderBy(col("passenger_count").desc()).select("trip_id", "passenger_count").show(1)


+-------+---------------+
|trip_id|passenger_count|
+-------+---------------+
| 949956|            9.0|
+-------+---------------+
only showing top 1 row


In [0]:
from pyspark.sql.functions import avg

df_trips.select(avg("passenger_count").alias("avg_passenger_count")).show()


+-------------------+
|avg_passenger_count|
+-------------------+
| 1.5670317144945614|
+-------------------+



In [0]:
# Plus court
df_trips.orderBy(col("trip_distance").asc()).select("trip_id", "trip_distance").show(1)

# Plus long
df_trips.orderBy(col("trip_distance").desc()).select("trip_id", "trip_distance").show(1)


+-------+-------------+
|trip_id|trip_distance|
+-------+-------------+
|      2|          0.0|
+-------+-------------+
only showing top 1 row
+-------+-------------+
|trip_id|trip_distance|
+-------+-------------+
|6074091|        831.8|
+-------+-------------+
only showing top 1 row


In [0]:
from pyspark.sql.functions import unix_timestamp

df_trips = df_trips.withColumn("trip_duration",
                               (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")))

# Plus court
df_trips.orderBy(col("trip_duration").asc()).select("trip_id", "trip_duration").show(1)

# Plus long
df_trips.orderBy(col("trip_duration").desc()).select("trip_id", "trip_duration").show(1)


+-------+-------------+
|trip_id|trip_duration|
+-------+-------------+
|1203184|     -5056830|
+-------+-------------+
only showing top 1 row
+-------+-------------+
|trip_id|trip_duration|
+-------+-------------+
|  68267|      2618881|
+-------+-------------+
only showing top 1 row


In [0]:
from pyspark.sql.functions import to_date, count

df_trips = df_trips.withColumn("pickup_date", to_date("tpep_pickup_datetime"))

df_trips.groupBy("pickup_date").agg(count("*").alias("trips_per_day")) \
    .orderBy(col("trips_per_day").desc()).show(1)

df_trips.groupBy("pickup_date").agg(count("*").alias("trips_per_day")) \
    .orderBy(col("trips_per_day").asc()).show(1)


+-----------+-------------+
|pickup_date|trips_per_day|
+-----------+-------------+
| 2019-01-25|       292499|
+-----------+-------------+
only showing top 1 row
+-----------+-------------+
|pickup_date|trips_per_day|
+-----------+-------------+
| 2018-12-21|            1|
+-----------+-------------+
only showing top 1 row


In [0]:
from pyspark.sql.functions import hour

df_trips = df_trips.withColumn("pickup_hour", hour("tpep_pickup_datetime"))

df_trips.groupBy("pickup_hour").agg(count("*").alias("trips_per_hour")) \
    .orderBy(col("trips_per_hour").desc()).show(1)

df_trips.groupBy("pickup_hour").agg(count("*").alias("trips_per_hour")) \
    .orderBy(col("trips_per_hour").asc()).show(1)


+-----------+--------------+
|pickup_hour|trips_per_hour|
+-----------+--------------+
|         18|        515390|
+-----------+--------------+
only showing top 1 row
+-----------+--------------+
|pickup_hour|trips_per_hour|
+-----------+--------------+
|          4|         61424|
+-----------+--------------+
only showing top 1 row


In [0]:
from pyspark.sql.functions import dayofweek

df_trips = df_trips.withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))

df_trips.groupBy("day_of_week").agg(count("*").alias("trips_per_day_of_week")) \
    .orderBy(col("trips_per_day_of_week").desc()).show(1)

df_trips.groupBy("day_of_week").agg(count("*").alias("trips_per_day_of_week")) \
    .orderBy(col("trips_per_day_of_week").asc()).show(1)


+-----------+---------------------+
|day_of_week|trips_per_day_of_week|
+-----------+---------------------+
|          5|              1357043|
+-----------+---------------------+
only showing top 1 row
+-----------+---------------------+
|day_of_week|trips_per_day_of_week|
+-----------+---------------------+
|          1|               859905|
+-----------+---------------------+
only showing top 1 row


In [0]:
from pyspark.sql.functions import corr

df_trips.select(corr("trip_distance", "tip_amount").alias("corr_distance_tip"),
                corr("passenger_count", "tip_amount").alias("corr_passenger_tip")).show()


+------------------+--------------------+
| corr_distance_tip|  corr_passenger_tip|
+------------------+--------------------+
|0.5269200663652669|0.001084223312167...|
+------------------+--------------------+



In [0]:
df_trips.orderBy(col("extra").desc()).select("trip_id", "extra").show(1)


+-------+------+
|trip_id| extra|
+-------+------+
|5323483|535.38|
+-------+------+
only showing top 1 row


In [0]:
df_trips.filter((col("trip_duration") < 0) | (col("trip_distance") == 0) | (col("tip_amount") > 1000)).show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+-------------+-----------+-----------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|trip_id|trip_duration|pickup_date|pickup_hour|day_of_week|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+-------------+-----------+-----------+-----------+
|       2| 20

In [0]:
download_url_zones = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
path_zones = f"{path_volume}/taxi_zone_lookup.csv"

dbutils.fs.cp(download_url_zones, path_zones)

df_zones = spark.read.csv(path_zones, header=True, inferSchema=True)
df_zones.show(5)


+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows


In [0]:
df_trips_zones = df_trips.join(df_zones, df_trips.PULocationID == df_zones.LocationID, "left") \
                         .withColumnRenamed("Borough", "pickup_borough")

df_trips_zones = df_trips_zones.join(df_zones, df_trips.DOLocationID == df_zones.LocationID, "left") \
                               .withColumnRenamed("Borough", "dropoff_borough")

df_trips_zones.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+-------------+-----------+-----------+-----------+----------+--------------+--------------------+------------+----------+---------------+--------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|trip_id|trip_duration|pickup_date|pickup_hour|day_of_week|LocationID|pickup_borough|                Zone|service_zone|LocationID|dropoff_borough|                Zone|service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------

In [0]:
df_trips_zones.groupBy("pickup_borough").agg(count("*").alias("pickup_count")) \
    .orderBy(col("pickup_count").desc()).show(1)

df_trips_zones.groupBy("dropoff_borough").agg(count("*").alias("dropoff_count")) \
    .orderBy(col("dropoff_count").desc()).show(1)


+--------------+------------+
|pickup_borough|pickup_count|
+--------------+------------+
|     Manhattan|     6950965|
+--------------+------------+
only showing top 1 row
+---------------+-------------+
|dropoff_borough|dropoff_count|
+---------------+-------------+
|      Manhattan|      6817355|
+---------------+-------------+
only showing top 1 row


In [0]:
df_trips_zones.groupBy("pickup_borough", "pickup_hour").agg(count("*").alias("trips_count")) \
    .orderBy(col("trips_count").desc()).show(5)


+--------------+-----------+-----------+
|pickup_borough|pickup_hour|trips_count|
+--------------+-----------+-----------+
|     Manhattan|         18|     471539|
|     Manhattan|         19|     432836|
|     Manhattan|         17|     426498|
|     Manhattan|         15|     409119|
|     Manhattan|         14|     391823|
+--------------+-----------+-----------+
only showing top 5 rows


In [0]:
df_trips_zones.groupBy("pickup_borough", "day_of_week").agg(count("*").alias("trips_count")) \
    .orderBy(col("trips_count").desc()).show(5)


+--------------+-----------+-----------+
|pickup_borough|day_of_week|trips_count|
+--------------+-----------+-----------+
|     Manhattan|          5|    1229554|
|     Manhattan|          4|    1144782|
|     Manhattan|          3|    1086202|
|     Manhattan|          6|     984950|
|     Manhattan|          7|     927504|
+--------------+-----------+-----------+
only showing top 5 rows


In [0]:
df_trips_zones.groupBy("pickup_borough").agg(avg("trip_distance").alias("avg_distance"),
                                             avg("fare_amount").alias("avg_fare")).show()


+--------------+------------------+------------------+
|pickup_borough|      avg_distance|          avg_fare|
+--------------+------------------+------------------+
|      Brooklyn| 4.787677275447492|18.649132800172286|
|         Bronx| 7.233194552098303| 26.26890543682963|
|     Manhattan|2.2286693358402596|10.792468572351568|
|        Queens|11.283218499361993| 35.14462651722029|
|           N/A| 3.193850899742941|  59.5731593830335|
|       Unknown| 2.415464130400774|14.944423051653523|
| Staten Island|12.503601108033246|45.289861495844896|
|           EWR| 2.641098654708519| 76.24024663677126|
+--------------+------------------+------------------+



In [0]:
df_trips_zones.orderBy(col("fare_amount").desc()).select("trip_id", "fare_amount", "pickup_borough").show(1)
df_trips_zones.orderBy(col("fare_amount").asc()).select("trip_id", "fare_amount", "pickup_borough").show(1)


+-------+-----------+--------------+
|trip_id|fare_amount|pickup_borough|
+-------+-----------+--------------+
|2499655|  623259.86|     Manhattan|
+-------+-----------+--------------+
only showing top 1 row
+-------+-----------+--------------+
|trip_id|fare_amount|pickup_borough|
+-------+-----------+--------------+
|4890649|     -362.0|        Queens|
+-------+-----------+--------------+
only showing top 1 row


In [0]:
file_name_2025 = 'yellow_tripdata_2025-01.parquet'
download_url_2025 = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet'

dbutils.fs.cp(download_url_2025, f"{path_volume}/{file_name_2025}")
df_trips_2025 = spark.read.parquet(f"{path_volume}/{file_name_2025}", header=True, inferSchema=True)

df_trips_2025.select(avg("trip_distance").alias("avg_distance_2025"),
                     avg("fare_amount").alias("avg_fare_2025"),
                     avg("tip_amount").alias("avg_tip_2025")).show()


+-----------------+------------------+------------------+
|avg_distance_2025|     avg_fare_2025|      avg_tip_2025|
+-----------------+------------------+------------------+
|5.855126178844192|17.081802760453556|2.9598127862763484|
+-----------------+------------------+------------------+



Part 3: 

In [0]:
df_trips_zones.createOrReplaceTempView("trips_zones")


In [0]:
%sql
SELECT trip_id, passenger_count
FROM trips_zones
ORDER BY passenger_count DESC
LIMIT 1;


trip_id,passenger_count
949956,9.0


In [0]:
%sql
SELECT pickup_borough, COUNT(*) as pickup_count
FROM trips_zones
GROUP BY pickup_borough
ORDER BY pickup_count DESC
LIMIT 1;


pickup_borough,pickup_count
Manhattan,6950965


In [0]:
%sql
SELECT CORR(trip_distance, tip_amount) as corr_distance_tip
FROM trips_zones;


corr_distance_tip
0.5269200663652669


Part 4 :

In [0]:

df_trips.groupBy("pickup_hour").count().orderBy("pickup_hour").display()


df_trips_zones.groupBy("pickup_borough").count().display()


from pyspark.sql.functions import round
df_trips.groupBy("pickup_hour").agg(round(avg("tip_amount"),2).alias("avg_tip")).orderBy("pickup_hour").display()


pickup_hour,count
0,207842
1,149254
2,109421
3,78086
4,61424
5,75533
6,178598
7,304858
8,373742
9,365935


pickup_borough,count
Brooklyn,91905
Bronx,18062
Manhattan,6950965
Queens,471173
,3890
Unknown,159815
Staten Island,361
EWR,446


pickup_hour,avg_tip
0,1.95
1,1.8
2,1.68
3,1.63
4,1.74
5,1.96
6,1.65
7,1.72
8,1.78
9,1.76
