/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz

In [0]:
# Load dataset

df = spark.read.csv(
    "/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz",
    header=True,
    inferSchema=True
)

df.printSchema()
df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+---

In [0]:
# Data Exploration

df.count()
df.columns
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+---------------------+-----------------+--------------------+
|summary|          VendorID|   passenger_count|     trip_distance|        RatecodeID|store_and_fwd_flag|     PULocationID|      DOLocationID|      payment_type|       fare_amount|              extra|            mta_tax|        tip_amount|       tolls_amount|improvement_surcharge|     total_amount|congestion_surcharge|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+---------------------+-----------------+--------------------+
|  count|           7667792|           7

In [0]:
# Data Cleaning

clean_df = df.filter(
    (df.trip_distance > 0) &
    (df.fare_amount > 0) &
    (df.passenger_count > 0)
)

clean_df.count()

7490845

In [0]:
# Select & Create Columns

from pyspark.sql.functions import col

selected_df = clean_df.select(
    "tpep_pickup_datetime",
    "passenger_count",
    "trip_distance",
    "fare_amount"
)

selected_df.show(5)

+--------------------+---------------+-------------+-----------+
|tpep_pickup_datetime|passenger_count|trip_distance|fare_amount|
+--------------------+---------------+-------------+-----------+
| 2019-01-01 00:46:40|              1|          1.5|        7.0|
| 2019-01-01 00:59:47|              1|          2.6|       14.0|
| 2019-01-01 00:21:28|              1|          1.3|        6.5|
| 2019-01-01 00:32:01|              1|          3.7|       13.5|
| 2019-01-01 00:57:32|              2|          2.1|       10.0|
+--------------------+---------------+-------------+-----------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import expr

enhanced_df = selected_df.withColumn(
    "fare_per_km",
    col("fare_amount") / col("trip_distance")
)

enhanced_df.show(5)

+--------------------+---------------+-------------+-----------+------------------+
|tpep_pickup_datetime|passenger_count|trip_distance|fare_amount|       fare_per_km|
+--------------------+---------------+-------------+-----------+------------------+
| 2019-01-01 00:46:40|              1|          1.5|        7.0| 4.666666666666667|
| 2019-01-01 00:59:47|              1|          2.6|       14.0| 5.384615384615384|
| 2019-01-01 00:21:28|              1|          1.3|        6.5|               5.0|
| 2019-01-01 00:32:01|              1|          3.7|       13.5|3.6486486486486487|
| 2019-01-01 00:57:32|              2|          2.1|       10.0| 4.761904761904762|
+--------------------+---------------+-------------+-----------+------------------+
only showing top 5 rows


In [0]:
# Aggregations
clean_df.groupBy("passenger_count") \
    .avg("fare_amount") \
    .orderBy("passenger_count") \
    .show()

+---------------+------------------+
|passenger_count|  avg(fare_amount)|
+---------------+------------------+
|              1| 12.24320433581906|
|              2|12.436470654684639|
|              3|12.273248458935294|
|              4|12.345534719789526|
|              5| 12.19543464975819|
|              6|12.150177360302502|
|              7|              52.0|
|              8|             69.72|
|              9|              90.0|
+---------------+------------------+



In [0]:
#Top 10 Longest Trips

clean_df.orderBy(col("trip_distance").desc()).show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2019-01-25 21:56:39|  2019-01-25 22:06:08|              1|        831.8|         1|                 N|         140|         239|           1|        8.5|  0.5|    0.5|      1.96|         0.0|                  0.3

In [0]:
# Time-Based Analysis

from pyspark.sql.functions import hour

hourly_df = clean_df.withColumn(
    "pickup_hour",
    hour("tpep_pickup_datetime")
)

hourly_df.groupBy("pickup_hour") \
    .count() \
    .orderBy("pickup_hour") \
    .show()

+-----------+------+
|pickup_hour| count|
+-----------+------+
|          0|202991|
|          1|145842|
|          2|106690|
|          3| 75955|
|          4| 59090|
|          5| 72143|
|          6|171911|
|          7|296088|
|          8|364009|
|          9|356647|
|         10|351986|
|         11|365730|
|         12|390152|
|         13|392727|
|         14|420493|
|         15|439019|
|         16|407247|
|         17|455052|
|         18|502459|
|         19|464183|
+-----------+------+
only showing top 20 rows


In [0]:
# Window Functions

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("passenger_count") \
                    .orderBy(col("fare_amount").desc())

ranked_df = clean_df.withColumn(
    "rank",
    row_number().over(window_spec)
)

ranked_df.filter(col("rank") == 1).show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|rank|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----+
|       1| 2019-01-11 19:33:15|  2019-01-11 19:53:09|              1|          2.4|         1|                 N|         237|          90|           3|  623259.86|  1.0|    0.5|       0.0|         0.0|      

### Spark SQL

In [0]:
# Temp View

clean_df.createOrReplaceTempView("taxi_trips")

In [0]:
# SQL Query 1 – Average Fare

spark.sql("""
SELECT passenger_count,
       AVG(fare_amount) AS avg_fare
FROM taxi_trips
GROUP BY passenger_count
ORDER BY passenger_count
""").show()

+---------------+------------------+
|passenger_count|          avg_fare|
+---------------+------------------+
|              1| 12.24320433581906|
|              2|12.436470654684639|
|              3|12.273248458935294|
|              4|12.345534719789526|
|              5| 12.19543464975819|
|              6|12.150177360302502|
|              7|              52.0|
|              8|             69.72|
|              9|              90.0|
+---------------+------------------+



In [0]:
# SQL Query 2 – Peak Hour

spark.sql("""
SELECT HOUR(tpep_pickup_datetime) AS hour,
       COUNT(*) AS trip_count
FROM taxi_trips
GROUP BY hour
ORDER BY trip_count DESC
LIMIT 5
""").show()

+----+----------+
|hour|trip_count|
+----+----------+
|  18|    502459|
|  19|    464183|
|  17|    455052|
|  15|    439019|
|  14|    420493|
+----+----------+



### Performance Improvement

In [0]:
# # Caching 

# clean_df.cache()
# clean_df.count()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4586035784541920>, line 3[0m
[1;32m      1[0m [38;5;66;03m# Caching [39;00m
[0;32m----> 3[0m clean_df[38;5;241m.[39mcache()
[1;32m      4[0m clean_df[38;5;241m.[39mcount()

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/dataframe.py:2126[0m, in [0;36mDataFrame.cache[0;34m(self)[0m
[1;32m   2125[0m [38;5;28;01mdef[39;00m [38;5;21mcache[39m([38;5;28mself[39m) [38;5;241m-[39m[38;5;241m>[39m ParentDataFrame:
[0;32m-> 2126[0m     [38;5;28;01mreturn[39;00m [38;5;28mself[39m[38;5;241m.[39mpersist()

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/dataframe.py:2133[0m, in [0;36mDataFrame.persist[0;34m(self, storageLevel)[0m
[1;32m   2128[0m [38;5;28;01mdef[39;00m [38;5;21mpersist[39m(
[1;32m

In [0]:
# # Repartitioning

# clean_df.rdd.getNumPartitions()

# repart_df = clean_df.repartition(8)
# repart_df.rdd.getNumPartitions()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkNotImplementedError[0m                Traceback (most recent call last)
File [0;32m<command-4586035784541922>, line 3[0m
[1;32m      1[0m [38;5;66;03m# Repartitioning[39;00m
[0;32m----> 3[0m clean_df[38;5;241m.[39mrdd[38;5;241m.[39mgetNumPartitions()

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/dataframe.py:2363[0m, in [0;36mDataFrame.rdd[0;34m(self)[0m
[1;32m   2361[0m [38;5;129m@property[39m
[1;32m   2362[0m [38;5;28;01mdef[39;00m [38;5;21mrdd[39m([38;5;28mself[39m) [38;5;241m-[39m[38;5;241m>[39m [38;5;124m"[39m[38;5;124mRDD[Row][39m[38;5;124m"[39m:
[0;32m-> 2363[0m     [38;5;28;01mraise[39;00m PySparkNotImplementedError(
[1;32m   2364[0m         errorClass[38;5;241m=[39m[38;5;124m"[39m[38;5;124mNOT_IMPLEMENTED[39m[38;5;124m"[39m,
[1;32m   2365[0m         messageParameters[38;5;241m=[39m{[38;

### Writing Data

In [0]:
# Writing Data to file
clean_df.write.mode("overwrite") \
    .parquet("/Volumes/big_data_workshop/gold_data/gold_data_volume")

In [0]:
# Writing Data to Table

clean_df.write.mode("overwrite").saveAsTable("big_data_workshop.gold_data.clean_df")