# Week 2 New York Taxi ETL


In [0]:
from pyspark.sql.functions import to_date, lit, to_timestamp, col, desc, isnan, isnull, countDistinct, sum as _sum, when, count, unix_timestamp, year, month, dayofmonth, greatest, hour, dayofweek, coalesce, round, avg, rank, concat_ws, corr
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, IntegerType

## Initial Data Profiling and Understanding

In [0]:
# Read the data
raw_taxi_df = spark.read.parquet("/FileStore/tables/yellow_tripdata_2024_01.parquet")
display(raw_taxi_df.limit(10))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-01T00:57:55.000,2024-01-01T01:17:43.000,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,2024-01-01T00:03:00.000,2024-01-01T00:09:36.000,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
1,2024-01-01T00:17:06.000,2024-01-01T00:35:01.000,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
1,2024-01-01T00:36:38.000,2024-01-01T00:44:56.000,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
1,2024-01-01T00:46:51.000,2024-01-01T00:52:57.000,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0
1,2024-01-01T00:54:08.000,2024-01-01T01:26:31.000,1,4.7,1,N,148,141,1,29.6,3.5,0.5,6.9,0.0,1.0,41.5,2.5,0.0
2,2024-01-01T00:49:44.000,2024-01-01T01:15:47.000,2,10.82,1,N,138,181,1,45.7,6.0,0.5,10.0,0.0,1.0,64.95,0.0,1.75
1,2024-01-01T00:30:40.000,2024-01-01T00:58:40.000,0,3.0,1,N,246,231,2,25.4,3.5,0.5,0.0,0.0,1.0,30.4,2.5,0.0
2,2024-01-01T00:26:01.000,2024-01-01T00:54:12.000,1,5.44,1,N,161,261,2,31.0,1.0,0.5,0.0,0.0,1.0,36.0,2.5,0.0
2,2024-01-01T00:28:08.000,2024-01-01T00:29:16.000,1,0.04,1,N,113,113,2,3.0,1.0,0.5,0.0,0.0,1.0,8.0,2.5,0.0


In [0]:
# Print the schema of the raw_taxi_df DataFrame
raw_taxi_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [0]:
# Print the number of rows in the raw_taxi_df DataFrame
raw_taxi_df.count()

2964624

In [0]:
# Display summary statistics for the raw_taxi_df DataFrame
display(raw_taxi_df.describe())

summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
count,2964624.0,2824462.0,2964624.0,2824462.0,2824462,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2824462.0,2824462.0
mean,1.7542042431013174,1.3392808966805003,3.6521691789577058,2.069359403666964,,166.01788354948215,165.11671227110082,1.1612707041432573,18.175061916786696,1.4515984320439976,0.4833823108765226,3.3358700158961274,0.5270212040355687,0.9756318507844316,26.801504770889355,2.2561220508542865,0.141161130863152
stddev,0.4325902017035976,0.850281692480088,225.46257238219965,9.823218952795497,,63.6239144874133,69.31534978524881,0.5808685566109416,18.949547705905296,1.8041024767539,0.1177600301537983,3.896550599806768,2.1283096763989,0.2183644577274297,23.385577429672534,0.8232746699398359,0.4876238872392771
min,1.0,0.0,0.0,1.0,N,1.0,1.0,0.0,-899.0,-7.5,-0.5,-80.0,-80.0,-1.0,-900.0,-2.5,-1.75
max,6.0,9.0,312722.3,99.0,Y,265.0,265.0,4.0,5000.0,14.25,4.0,428.0,115.92,1.0,5000.0,2.5,1.75


In the previous cell, I performed an initial exploratory data analysis on the New York Taxi dataset. I used the `describe()` function to get summary statistics of the dataset. During this analysis, I noticed that several columns had negative values, which are not expected in this context. 

To address this, I will check the following columns for negative values: "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount", "congestion_surcharge", and "airport_fee". I will count the number of negative values in each column and calculate the percentage of negative values relative to the total number of entries in the dataset.

**Negative Columns**

In [0]:
# Define the columns we want to check
money_cols = [
    "fare_amount", "extra", "mta_tax", "tip_amount",
    "tolls_amount", "improvement_surcharge",
    "total_amount", "congestion_surcharge", "airport_fee"
]

# Compute negative-value counts for each column in one pass
neg_counts_row = (
    raw_taxi_df
    .select([
        _sum(when(col(c) < 0, 1).otherwise(0)).alias(c)
        for c in money_cols
    ])
    .first()
    .asDict()
)

display(neg_counts_row)

{'fare_amount': 37448,
 'extra': 17548,
 'mta_tax': 34434,
 'tip_amount': 102,
 'tolls_amount': 2035,
 'improvement_surcharge': 35502,
 'total_amount': 35504,
 'congestion_surcharge': 28825,
 'airport_fee': 4921}

In [0]:
# Calculate and display the percentage of negative values for each specified column
total_rows = raw_taxi_df.count()

for col_name, neg_count in neg_counts_row.items():
    pct = neg_count / total_rows
    print(f"{col_name:25} : {neg_count:7,} negatives ({pct:.2%} of all rows)")

fare_amount               :  37,448 negatives (1.26% of all rows)
extra                     :  17,548 negatives (0.59% of all rows)
mta_tax                   :  34,434 negatives (1.16% of all rows)
tip_amount                :     102 negatives (0.00% of all rows)
tolls_amount              :   2,035 negatives (0.07% of all rows)
improvement_surcharge     :  35,502 negatives (1.20% of all rows)
total_amount              :  35,504 negatives (1.20% of all rows)
congestion_surcharge      :  28,825 negatives (0.97% of all rows)
airport_fee               :   4,921 negatives (0.17% of all rows)


**Null Elements**

In [0]:
# Compute null-value counts for each column in one pass
null_counts_rows = (
    raw_taxi_df.
    select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in raw_taxi_df.columns
]).
    first()
    .asDict()

)
display(null_counts_rows)

{'VendorID': 0,
 'tpep_pickup_datetime': 0,
 'tpep_dropoff_datetime': 0,
 'passenger_count': 140162,
 'trip_distance': 0,
 'RatecodeID': 140162,
 'store_and_fwd_flag': 140162,
 'PULocationID': 0,
 'DOLocationID': 0,
 'payment_type': 0,
 'fare_amount': 0,
 'extra': 0,
 'mta_tax': 0,
 'tip_amount': 0,
 'tolls_amount': 0,
 'improvement_surcharge': 0,
 'total_amount': 0,
 'congestion_surcharge': 140162,
 'Airport_fee': 140162}

In [0]:
# Calculate and display the percentage of null values for each column with non-zero null count
for col_name, null_count in null_counts_rows.items():
    pct = null_count / raw_taxi_df.count()
    if pct > 0:
        print(f"{col_name:25} : {null_count:7,} nulls ({pct:.2%} of all rows)")

passenger_count           : 140,162 nulls (4.73% of all rows)
RatecodeID                : 140,162 nulls (4.73% of all rows)
store_and_fwd_flag        : 140,162 nulls (4.73% of all rows)
congestion_surcharge      : 140,162 nulls (4.73% of all rows)
Airport_fee               : 140,162 nulls (4.73% of all rows)


In [0]:
# Check for correlation where a null value in one column implies null values in other specified columns
# I took column "store_and_fwd_flag" as an example
null_store = raw_taxi_df.filter(col("store_and_fwd_flag").isNull())

null_store.select([
    count(when(col(c).isNull(), 1)).alias(c)
    for c in ["passenger_count", "RatecodeID", "congestion_surcharge", "Airport_fee"]
]).show()

+---------------+----------+--------------------+-----------+
|passenger_count|RatecodeID|congestion_surcharge|Airport_fee|
+---------------+----------+--------------------+-----------+
|         140162|    140162|              140162|     140162|
+---------------+----------+--------------------+-----------+



**Disinct**

In [0]:
# Calculate the distinct count of each column in the raw_taxi_df DataFrame
distinct_count_row = (
    raw_taxi_df.select([
        countDistinct(col(c)).alias(c)
        for c in raw_taxi_df.columns
    ])
    .first()
    .asDict()
)
display(distinct_count_row)

{'VendorID': 3,
 'tpep_pickup_datetime': 1575706,
 'tpep_dropoff_datetime': 1574780,
 'passenger_count': 10,
 'trip_distance': 4489,
 'RatecodeID': 7,
 'store_and_fwd_flag': 2,
 'PULocationID': 260,
 'DOLocationID': 261,
 'payment_type': 5,
 'fare_amount': 8970,
 'extra': 48,
 'mta_tax': 8,
 'tip_amount': 4192,
 'tolls_amount': 1127,
 'improvement_surcharge': 5,
 'total_amount': 19241,
 'congestion_surcharge': 6,
 'Airport_fee': 3}

## Data Cleaning, Validation, and Transformation 

### Invalid Trip Distances/Durations

In [0]:
# Display summary statistics for the "trip_distance" column in the raw_taxi_df DataFrame
display(raw_taxi_df.select(col("trip_distance")).describe())

summary,trip_distance
count,2964624.0
mean,3.6521691789577058
stddev,225.46257238219965
min,0.0
max,312722.3


In [0]:
# Display rows where the trip distance is exactly 312722.3(the highest value)
display(raw_taxi_df.filter(col('trip_distance') == 312722.3))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-30T06:37:00,2024-01-30T06:50:00,,312722.3,,,151,162,0,14.46,0.0,0.5,3.69,0.0,1.0,22.15,,


In [0]:
# Calculate the 99.9th percentile of the "trip_distance" column with no approximation error
raw_taxi_df.approxQuantile("trip_distance", [0.999], 0.0)[0]

[29.51]

In [0]:
# Filter rows where the trip distance is greater than 30 and display the count and the filtered DataFrame
print(raw_taxi_df.filter(col("trip_distance") > 30).count())
display(raw_taxi_df.filter(col("trip_distance") > 30))

2695


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,time_take_min
2,2024-01-01T00:35:44,2024-01-01T01:19:49,2.0,36.75,4.0,N,138,265,1,220.7,6.0,0.5,45.64,0.0,1.0,275.59,0.0,1.75,44.083333333333336
2,2024-01-01T00:35:57,2024-01-01T01:22:32,1.0,41.2,4.0,N,132,265,2,-201.8,-1.0,-0.5,0.0,-6.94,-1.0,-212.99,0.0,-1.75,46.583333333333336
2,2024-01-01T00:35:57,2024-01-01T01:22:32,1.0,41.2,4.0,N,132,265,2,201.8,1.0,0.5,0.0,6.94,1.0,212.99,0.0,1.75,46.583333333333336
2,2024-01-01T01:18:30,2024-01-01T02:48:00,1.0,54.94,4.0,N,132,265,2,243.1,1.0,0.0,0.0,13.88,1.0,260.73,0.0,1.75,89.5
2,2024-01-01T01:26:14,2024-01-01T02:14:32,1.0,33.11,5.0,N,132,265,1,167.0,0.0,0.0,5.0,12.75,1.0,187.5,0.0,1.75,48.3
2,2024-01-01T01:41:53,2024-01-01T02:44:20,1.0,38.06,4.0,N,142,265,1,248.0,1.0,0.0,65.69,12.75,1.0,328.44,0.0,0.0,62.45
1,2024-01-01T01:40:58,2024-01-01T03:02:12,2.0,34.0,1.0,N,234,45,2,136.0,3.5,0.5,0.0,0.0,1.0,141.0,2.5,0.0,81.23333333333333
2,2024-01-01T02:43:55,2024-01-01T04:01:45,1.0,39.87,5.0,N,226,265,2,270.0,0.0,0.0,0.0,12.75,1.0,283.75,0.0,0.0,77.83333333333333
2,2024-01-01T02:43:55,2024-01-01T03:57:59,4.0,44.76,4.0,N,170,265,1,303.3,1.0,0.0,20.0,20.0,1.0,345.3,0.0,0.0,74.06666666666666
1,2024-01-01T02:52:04,2024-01-01T03:40:33,3.0,31.8,4.0,N,50,265,1,140.9,3.5,0.5,1.0,6.94,1.0,153.84,2.5,0.0,48.48333333333333


In [0]:
# Add a new column "time_take_min" representing the trip duration in minutes
raw_taxi_df = raw_taxi_df.withColumn("time_take_min", (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0)

**Filtering car speeds less than 50**

In [0]:
# Count the number of trips with an average speed greater than 50 miles per hour
raw_taxi_df.filter(
  (col("trip_distance") / (col("time_take_min")/60) ) > 50
).count()

2527

In [0]:
# Filter trips with an average speed of 50 miles per hour or less
df_valid_speed = raw_taxi_df.filter(
    (col("trip_distance") / (col("time_take_min")/60)) <= 50
)

In [0]:
# Display summary statistics for trips with an average speed of 50 miles per hour or less
display(df_valid_speed.describe())

summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,time_take_min
count,2961283.0,2821266.0,2961283.0,2821266.0,2821266,2961283.0,2961283.0,2961283.0,2961283.0,2961283.0,2961283.0,2961283.0,2961283.0,2961283.0,2961283.0,2821266.0,2821266.0,2961283.0
mean,1.7544091530596704,1.3392966136479154,3.220015530429975,2.066472285846141,,166.02850723824776,165.0870190387072,1.16065502689206,18.12799562554201,1.451638293266805,0.4835556412541456,3.3331176317852944,0.5257548603064196,0.9757606753559028,26.7522982537959,2.2578546298009474,0.1408142656523702,15.619943951096609
stddev,0.4321204915337476,0.8500095307717548,4.344117538293714,9.816386264933268,,63.62159571558487,69.29599287399928,0.5800503340738477,18.05829946831211,1.803837555799984,0.1173655351986522,3.880255205993646,2.121239966615446,0.2179703784306085,22.64158980316624,0.8212191355222047,0.486997992739193,34.86451709445386
min,1.0,0.0,0.0,1.0,N,1.0,1.0,0.0,-800.0,-7.5,-0.5,-80.0,-80.0,-1.0,-801.0,-2.5,-1.75,-13.566666666666666
max,6.0,9.0,153.2,99.0,Y,265.0,265.0,4.0,2221.3,14.25,4.0,428.0,115.92,1.0,2225.3,2.5,1.75,9455.4


**Filtering trip distances less than 50**

In [0]:
# Filter trips with a trip distance of 50 miles or less
df_valid_distance = df_valid_speed.filter(col("trip_distance") <= 50)

**Save the cleaned `trip_distance` data to Parquet.**

In [0]:
# Save the filtered DataFrame to a Parquet file
output_path_parquet = "/FileStore/tables/innovateretail/clean/1_trip_distance"
df_valid_distance.write.mode("overwrite").parquet(output_path_parquet)
print(f"DataFrame saved to Parquet at: {output_path_parquet}")

DataFrame saved to Parquet at: /FileStore/tables/innovateretail/clean/1_trip_distance


### Timestamp Issues
**Filtering out rows that aren't in 2024**

In [0]:
# Count trips that did not occur in the year 2024
df_valid_distance.filter(
    (year(col("tpep_pickup_datetime")) != 2024) |
    (year(col("tpep_dropoff_datetime")) != 2024)
).count()

15

In [0]:
# Filter trips that occurred in the year 2024
df_2024 = df_valid_distance.filter(
    (year(col("tpep_pickup_datetime")) == 2024) &
    (year(col("tpep_dropoff_datetime")) == 2024)
)

**Save the cleaned pickup and dropoff time data to Parquet.**

In [0]:
# Save the filtered DataFrame to a Parquet file
output_path_parquet = "/FileStore/tables/innovateretail/clean/2_valid_year"
df_2024.write.mode("overwrite").parquet(output_path_parquet)
print(f"DataFrame saved to Parquet at: {output_path_parquet}")

DataFrame saved to Parquet at: /FileStore/tables/innovateretail/clean/2_valid_year


### Timetaken

**Fixing rows with negative timetaken**

In [0]:
# Display trips with negative duration
display(df_2024.filter
        (col("time_take_min") < 0).
        select(
            "tpep_pickup_datetime", 
            "tpep_dropoff_datetime",
            "trip_distance",
            "time_take_min"
            ))

tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,time_take_min
2024-01-02T10:00:00,2024-01-02T09:53:56,7.2,-6.066666666666666
2024-01-04T12:30:00,2024-01-04T12:18:23,4.8,-11.616666666666667
2024-01-04T14:00:00,2024-01-04T13:55:50,5.6,-4.166666666666667
2024-01-05T11:00:00,2024-01-05T10:57:16,0.6,-2.7333333333333334
2024-01-17T14:25:00,2024-01-17T14:11:26,5.4,-13.566666666666666
2024-01-18T11:00:00,2024-01-18T10:57:07,1.7,-2.8833333333333333
2024-01-26T11:30:00,2024-01-26T11:28:34,2.5,-1.4333333333333331
2024-01-27T11:00:00,2024-01-27T10:49:43,12.5,-10.283333333333331
2024-01-01T02:01:50,2024-01-01T02:01:39,7.46,-0.1833333333333333
2024-01-01T03:01:46,2024-01-01T03:01:39,7.65,-0.1166666666666666


In [0]:
# Create two columns: corrected_pickup and corrected_dropoff
df_time_corrected = df_2024.withColumn(
    "corrected_pickup",
    when(col("time_take_min") < 0, col("tpep_dropoff_datetime")) \
    .otherwise(col("tpep_pickup_datetime"))
).withColumn(
    "corrected_dropoff",
    when(col("time_take_min") < 0, col("tpep_pickup_datetime")) \
    .otherwise(col("tpep_dropoff_datetime"))
)

# Recompute time_take_min_corrected
df_time_corrected = df_time_corrected.withColumn(
    "time_take_min_corrected",
    (unix_timestamp("corrected_dropoff")
     - unix_timestamp("corrected_pickup")) / 60.0
)


# Drop the old duration and timestamp columns, if you want.
df_time_corrected = df_time_corrected.drop("tpep_pickup_datetime", "tpep_dropoff_datetime", "time_take_min")

# Rename corrected_pickup/dropoff back to original names
df_time_corrected = df_time_corrected.withColumnRenamed("corrected_pickup", "tpep_pickup_datetime") \
                   .withColumnRenamed("corrected_dropoff", "tpep_dropoff_datetime") \
                    .withColumnRenamed("time_take_min_corrected", "time_take_min")


**Dropping unrealistic time taken**

I chose to drop car trip that took more than 5 hours.

In [0]:
# Filter trips with a duration of less than 300 minutes
df_valid_durations = df_time_corrected.filter(col("time_take_min") < 300)

In [0]:
df_valid_durations.count()

2959170

In [0]:
# Display summary statistics for the valid durations DataFrame
display(df_valid_durations.describe())

summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,time_take_min
count,2959170.0,2819157.0,2959170.0,2819157.0,2819157,2959170.0,2959170.0,2959170.0,2959170.0,2959170.0,2959170.0,2959170.0,2959170.0,2959170.0,2959170.0,2819157.0,2819157.0,2959170.0
mean,1.7542510230909345,1.3389718983369852,3.2135130965772154,2.0667227827325685,,166.03504766539265,165.08105617453543,1.1605318383195289,18.10001434523593,1.4519849484821772,0.4835869855398642,3.332169966580524,0.5242195784604455,0.9757592500599674,26.72192650300477,2.258070763707023,0.1406942749197721,14.836718775873443
stddev,0.4322147504989964,0.8493116052773758,4.300215878604852,9.819145542366032,,63.62166557239008,69.29164826305004,0.5800417574806398,17.750213998646185,1.8039826712280749,0.1173136598564047,3.8677699328732,2.1099519196710195,0.2179800121069608,22.353659608196125,0.8209930765337757,0.4867983933823221,11.92690317162292
min,1.0,0.0,0.0,1.0,N,1.0,1.0,0.0,-800.0,-7.5,-0.5,-80.0,-80.0,-1.0,-801.0,-2.5,-1.75,0.0166666666666666
max,6.0,9.0,49.98,99.0,Y,265.0,265.0,4.0,820.0,14.25,4.0,428.0,115.92,1.0,821.0,2.5,1.75,299.28333333333336


**Save the cleaned duration data to Parquet.**


In [0]:
# Save the filtered DataFrame to a Parquet file
output_path_parquet = "/FileStore/tables/innovateretail/clean/3_valid_duration"
df_valid_durations.write.mode("overwrite").parquet(output_path_parquet)
print(f"DataFrame saved to Parquet at: {output_path_parquet}")

DataFrame saved to Parquet at: /FileStore/tables/innovateretail/clean/3_valid_duration


### Fare/Total Amounts

**Clean trip distance with 0 values**

In [0]:
# Classify trips into 'keep' or 'drop' categories based on specific conditions
df_clean = df_valid_distance.withColumn(
    "keep_or_drop",
    when(
        # 1) Flag-drop rides → KEEP
        (col("trip_distance") == 0) &
        (col("time_take_min") < 2) &
        (col("fare_amount") >= 4.50) &
        (col("payment_type").isin(1, 2)),
        lit("keep")
    ).when(
        # 2) Rounding-artifact (short crawl) → KEEP
        (col("trip_distance") == 0) &
        (col("time_take_min") >= 2) &
        (col("time_take_min") < 10) &
        (col("fare_amount") >= 4.50) &
        (col("payment_type").isin(1, 2)),
        lit("keep")
    ).when(
        # 3) Canceled / No‐Charge / Voided → DROP
        (col("trip_distance") == 0) &
        (col("payment_type").isin(3, 4, 6)),
        lit("drop")
    ).when(
        # 4) Meter‐glitch (≥10 min on meter, fare = 0) → DROP
        (col("trip_distance") == 0) &
        (col("time_take_min") >= 10) &
        (col("fare_amount") == 0),
        lit("drop")
    ).when(
        # 5) High‐fare, short‐time anomaly → DROP
        (col("trip_distance") == 0) &
        (col("time_take_min") < 5) &
        (col("fare_amount") > 20),
        lit("drop")
    ).otherwise(
        # 6) Everything else (including trip_distance > 0) → KEEP
        lit("keep")
    )
)

# Count and display the number of rows to be dropped
drop_count = df_clean.filter(col("keep_or_drop") == "drop").count()
print(f"We are going to drop {drop_count} rows of data.")

# Filter out rows marked as 'drop' and remove the 'keep_or_drop' column
df_clean = df_clean.filter(col("keep_or_drop") == "keep").drop("keep_or_drop")

We are going to drop 8713 rows of data.


****

In [0]:
# Filter out rows with negative fare amounts for specific payment types and short trip distances
# df_valid_fare_first = df_clean.filter(
#    ~ (
#     (col("fare_amount") < 0) &
#     (col("payment_type").isin(0, 1, 2)) &
#     (col("trip_distance") < 1)
#     )
# )

In [0]:
# df_valid_fare_first.filter(
#     (col("fare_amount") < 0) &
#     (col("payment_type").isin(0, 1, 2))
# ).count()

6678

In [0]:
# display(df_valid_fare_first.filter(
#     (col("fare_amount") < 0) &
#     (col("payment_type").isin(0, 1, 2))
# ).limit(20))

VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,tpep_pickup_datetime,tpep_dropoff_datetime,time_take_min
2,1,5.48,1,N,107,61,2,-33.1,-1.0,-0.5,0.0,0.0,-1.0,-38.1,-2.5,0.0,2024-01-01T00:42:02,2024-01-01T01:14:33,32.516666666666666
2,4,1.72,1,N,231,66,2,-11.4,-1.0,-0.5,0.0,0.0,-1.0,-16.4,-2.5,0.0,2024-01-01T00:23:02,2024-01-01T00:31:32,8.5
2,2,1.03,1,N,74,75,2,-6.5,-1.0,-0.5,0.0,0.0,-1.0,-11.5,-2.5,0.0,2024-01-01T00:46:39,2024-01-01T00:49:46,3.1166666666666667
2,2,3.15,1,N,239,166,2,-18.4,-1.0,-0.5,0.0,0.0,-1.0,-23.4,-2.5,0.0,2024-01-01T00:29:35,2024-01-01T00:46:18,16.716666666666665
2,2,11.72,1,N,239,95,2,-49.9,-1.0,-0.5,5.0,-6.94,-1.0,-56.84,-2.5,0.0,2024-01-01T00:58:11,2024-01-01T01:29:31,31.33333333333333
2,2,1.07,1,N,163,142,2,-12.8,-1.0,-0.5,0.0,0.0,-1.0,-17.8,-2.5,0.0,2024-01-01T00:56:06,2024-01-01T01:09:26,13.333333333333334
2,1,1.44,1,N,143,239,2,-8.6,-1.0,-0.5,0.0,0.0,-1.0,-13.6,-2.5,0.0,2024-01-01T00:40:34,2024-01-01T00:45:06,4.533333333333333
2,3,2.91,1,N,132,216,2,-13.5,-1.0,-0.5,0.0,0.0,-1.0,-17.75,0.0,-1.75,2024-01-01T00:13:57,2024-01-01T00:19:46,5.816666666666666
2,2,1.99,1,N,211,68,2,-10.7,-1.0,-0.5,0.0,0.0,-1.0,-15.7,-2.5,0.0,2024-01-01T00:36:15,2024-01-01T00:43:55,7.666666666666667
2,1,1.21,1,N,249,231,2,-10.7,-1.0,-0.5,0.0,0.0,-1.0,-15.7,-2.5,0.0,2024-01-01T00:23:46,2024-01-01T00:33:04,9.3


**Fixing Negative values**

In [0]:
df_valid_fare1 = df_clean.withColumn(
    # Correct negative fare amounts for customer that paid(no refund)
    "fare_amount",
    when(
        (col("fare_amount") < 0) &
        (col("payment_type").isin(0, 1, 2)),
        -col("fare_amount")
    ).otherwise(col("fare_amount"))
).withColumn(
    # Correct negative extra charges for customer that paid(no refund)
    "extra",
    when(
        (col("extra") < 0) &
        (col("payment_type").isin(0, 1, 2)),
        -col("extra")
    ).otherwise(col("extra"))
).withColumn(
    # Correct negative MTA tax for customer that paid(no refund)
    "mta_tax",
    when(
        (col("mta_tax") < 0) &
        (col("payment_type").isin(0, 1, 2)),
        -col("mta_tax")
    ).otherwise(col("mta_tax"))
).withColumn(
    # Correct negative tip amounts for customer that paid(no refund)
    "tip_amount",
    when(
        (col("tip_amount") < 0) & 
        (col("payment_type").isin(0, 1, 2)),
        -col("tip_amount")
    ).otherwise(col("tip_amount"))
).withColumn(
    # Correct negative tolls amounts for customer that paid(no refund)
    "tolls_amount",
    when(
        (col("tolls_amount") < 0) &
        (col("payment_type").isin(0, 1, 2)), 
        -col("tolls_amount")
    ).otherwise(col("tolls_amount"))
).withColumn(
    # Correct negative improvement surcharge for customer that paid(no refund)
    "improvement_surcharge",
    when(
        (col("improvement_surcharge") < 0) &
        (col("payment_type").isin(0, 1, 2)), 
        -col("improvement_surcharge")
    ).otherwise(col("improvement_surcharge"))
).withColumn(
    # Correct negative total amounts for customer that paid(no refund)
    "total_amount",
    when(
        (col("total_amount") < 0) &
        (col("payment_type").isin(0, 1, 2)), 
        -col("total_amount")
    ).otherwise(col("total_amount"))
).withColumn(
    # Correct negative congestion surcharge for customer that paid(no refund)
    "congestion_surcharge",
    when(
        (col("congestion_surcharge") < 0) &
        (col("payment_type").isin(0, 1, 2)), 
        -col("congestion_surcharge")
    ).otherwise(col("congestion_surcharge"))
).withColumn(
    # Correct negative airport fee for customer that paid(no refund)
    "airport_fee",
    when(
        (col("airport_fee") < 0) &
        (col("payment_type").isin(0, 1, 2)), 
        -col("airport_fee")
    ).otherwise(col("airport_fee"))
)

**Fixing Fare amounts inconsistent with distance/duration**

In [0]:
# Display summary statistics for the 'fare_amount' column
display(df_valid_fare1.select("fare_amount").describe())

summary,fare_amount
count,2952282.0
mean,18.26544340953581
stddev,17.53264163402026
min,-700.0
max,2221.3


In [0]:
df = df_valid_fare1

# Calculate fare based on distance and time
df = df.withColumn("distance_fare", col("trip_distance") * lit(3.50))
df = df.withColumn("time_fare", col("time_take_min") * lit(0.70))

# Choose the larger fare between distance and time for each trip
df = df.withColumn("meter_increment", greatest(col("distance_fare"), col("time_fare")))

# Compute expected fare based on RatecodeID
df = df.withColumn(
    "expected_meter_fare",
    when(
        col("RatecodeID") == 1,        # Standard metered rate
        lit(3.00) + col("meter_increment")
    ).when(
        col("RatecodeID") == 2,        # JFK flat fare
        lit(70.00)                     # The meter simply reads $70.00
    ).when(
        col("RatecodeID") == 3,        # Newark trips use standard meter + $20 surcharge
        lit(3.00) + col("meter_increment") + lit(20.00)
    ).when(
        col("RatecodeID") == 4,        # Nassau/Westchester: 1.5× standard meter
        lit(3.00) + (col("meter_increment") * lit(1.5))
    ).otherwise(lit(None))
)

# Filter out trips where the fare amount is not within the expected range
df_valid_fare2 = df.filter(
    ~((col("fare_amount") > col("expected_meter_fare") + lit(10.00)) |
      (col("fare_amount") < col("expected_meter_fare") - lit(1.00)))
)

# Drop intermediate columns used for fare calculation
df_valid_fare2 = df_valid_fare2.drop("expected_meter_fare", "meter_increment", "distance_fare", "time_fare")

**Save the cleaned fare data for checkpoint**

In [0]:
# Save the cleaned DataFrame to a Parquet file
output_path_parquet = "/FileStore/tables/innovateretail/clean/4_valid_fare"
df_valid_fare2.write.mode("overwrite").parquet(output_path_parquet)
print(f"DataFrame saved to Parquet at: {output_path_parquet}")

DataFrame saved to Parquet at: /FileStore/tables/innovateretail/clean/4_valid_fare


### Invalid Location IDs

In [0]:
# Display the count of trips where both pickup and dropoff locations are outside the valid range (1-266)
display(df_valid_fare2.filter(
    ~(col("PULocationID").between(1,266) |
     col("DOLocationID").between(1,266))
).count())

0

In [0]:
# Display summary statistics for pickup and dropoff location IDs
display(df_valid_fare2.select("PULocationID", "DOLocationID").describe())

summary,PULocationID,DOLocationID
count,2673653.0,2673653.0
mean,167.05211147445087,165.99854767989714
stddev,62.984571922821296,68.85652778788052
min,1.0,1.0
max,265.0,265.0


### Passenger Count Anomalies

In [0]:
# Display summary statistics for passenger count
display(df_valid_fare2.select("passenger_count").describe())

summary,passenger_count
count,2673653.0
mean,1.3429195935299008
stddev,0.8544193941239983
min,0.0
max,9.0


In [0]:
# Count the number of trips with zero passengers
df_valid_fare2.filter(col("passenger_count") == 0).count()

29505

In [0]:
# Display summary statistics for trips with zero passengers, focusing on trip distance, fare amount, time taken in minutes, and total amount
display(df_valid_fare2.filter(col("passenger_count") == 0).select(
    "trip_distance", "fare_amount", "time_take_min", "total_amount"
).describe())

summary,trip_distance,fare_amount,time_take_min,total_amount
count,29505.0,29505.0,29505.0,29505.0
mean,2.731383494323002,16.077661413319802,13.278930689713492,24.341803423147777
stddev,3.69457593337489,14.036950876927744,10.15345261739759,18.05655471357471
min,0.0,3.0,0.0333333333333333,4.0
max,38.7,182.9,130.96666666666667,284.5


**Filtering passenger count greater than 0**

In [0]:
# Filter trips to include only those with more than zero passengers
df_valid_passenger = df_valid_fare2.filter(col("passenger_count") > 0)

**Save the cleaned ``passenger_count`` for checkpoint.**

In [0]:
# Save the cleaned DataFrame to a Parquet file
output_path_parquet = "/FileStore/tables/innovateretail/clean/5_valid_passenger"
df_valid_passenger.write.mode("overwrite").parquet(output_path_parquet)
print(f"DataFrame saved to Parquet at: {output_path_parquet}")

DataFrame saved to Parquet at: /FileStore/tables/innovateretail/clean/5_valid_passenger


### Inconsistent Categorical Values/Codes

In [0]:
# Display distinct values in the 'RatecodeID' column
display(df_valid_passenger.select("RatecodeID").distinct())

RatecodeID
2
1
3
4


In [0]:
# Display distinct values in the 'store_and_fwd_flag' column
display(df_valid_passenger.select("store_and_fwd_flag").distinct())

store_and_fwd_flag
Y
N


In [0]:
# Display distinct values in the 'payment_type' column
display(df_valid_passenger.select("payment_type").distinct())

payment_type
2
1
3
4


### Handling Nulls

In [0]:
# Count the number of null values in each column of the df_valid_passenger DataFrame
df_valid_passenger.select([
    count(when(col(c).isNull(), 1)).alias(c)
    for c in df_valid_passenger.columns
]).first().asDict()

{'VendorID': 0,
 'tpep_pickup_datetime': 0,
 'tpep_dropoff_datetime': 0,
 'passenger_count': 0,
 'trip_distance': 0,
 'RatecodeID': 0,
 'store_and_fwd_flag': 0,
 'PULocationID': 0,
 'DOLocationID': 0,
 'payment_type': 0,
 'fare_amount': 0,
 'extra': 0,
 'mta_tax': 0,
 'tip_amount': 0,
 'tolls_amount': 0,
 'improvement_surcharge': 0,
 'total_amount': 0,
 'congestion_surcharge': 0,
 'airport_fee': 0,
 'time_take_min': 0}

### Implement Data Cleaning and Type Casting

In [0]:
# Display the schema of the df_valid_passenger DataFrame
display(df_valid_passenger.printSchema())

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- time_take_min: double (nullable = true)



In [0]:
# Cast specific columns to integer type
df_valid_schema1 = (df_valid_passenger.withColumn("passenger_count", col("passenger_count").cast("int"))
    .withColumn("RatecodeID", col("RatecodeID").cast("int"))
    .withColumn("payment_type", col("payment_type").cast("int"))
)

In [0]:
# Convert datetime columns to timestamp type 
df_valid_schema2 = (
    df_valid_schema1
    .withColumn("tpep_pickup_datetime",  to_timestamp(col("tpep_pickup_datetime")))
    .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
)

In [0]:
df_valid_schema2.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- time_take_min: double (nullable = true)



**Save the cleaned schema `data` for checkpoint.**

In [0]:
# Save the cleaned DataFrame to a Parquet file
output_path_parquet = "/FileStore/tables/innovateretail/clean/6_clean_schema"
df_valid_schema2.write.mode("overwrite").parquet(output_path_parquet)
print(f"DataFrame saved to Parquet at: {output_path_parquet}")

DataFrame saved to Parquet at: /FileStore/tables/innovateretail/clean/6_clean_schema


### Feature Engineering

In [0]:
# Calculate the average speed by dividing trip distance by time taken in minutes
df_speed = df_valid_schema2.withColumn("average_speed", (col("trip_distance") / col("time_take_min")))

In [0]:
# Extract the hour of the day and the day of the week from the pickup datetime
df_date = df_speed.withColumn(
    "pickup_hour_of_day",
    hour(col("tpep_pickup_datetime"))
).withColumn(
    "pickup_day_of_week",
    dayofweek(col("tpep_pickup_datetime"))
)

In [0]:
# Categorize the pickup time into time of day slots
df_date = df_date.withColumn(
    "time_of_day_slot",
    when(col("pickup_hour_of_day").between(0, 5),    lit("Night"))
    .when(col("pickup_hour_of_day").between(6, 11),   lit("Morning"))
    .when(col("pickup_hour_of_day").between(12, 16),  lit("Afternoon"))
    .when(col("pickup_hour_of_day").between(17, 20),  lit("Evening"))
    .otherwise(lit("LateNight"))
)

In [0]:
display(df_date.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,time_take_min,average_speed,pickup_hour_of_day,pickup_day_of_week,time_of_day_slot
2,2024-01-01T00:57:55.000Z,2024-01-01T01:17:43.000Z,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0,19.8,0.0868686868686868,0,2,Night
1,2024-01-01T00:03:00.000Z,2024-01-01T00:09:36.000Z,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0,6.6,0.2727272727272727,0,2,Night
1,2024-01-01T00:17:06.000Z,2024-01-01T00:35:01.000Z,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0,17.916666666666668,0.2623255813953488,0,2,Night
1,2024-01-01T00:36:38.000Z,2024-01-01T00:44:56.000Z,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0,8.3,0.1686746987951807,0,2,Night
1,2024-01-01T00:46:51.000Z,2024-01-01T00:52:57.000Z,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0,6.1,0.1311475409836065,0,2,Night


**Save the dataframe with new columns for checkpoint.**

In [0]:
output_path_parquet = "/FileStore/tables/innovateretail/clean/5_feature_eng"
df_date.write.mode("overwrite").parquet(output_path_parquet)
print(f"DataFrame saved to Parquet at: {output_path_parquet}")

DataFrame saved to Parquet at: /FileStore/tables/innovateretail/clean/5_feature_eng


### Data Validation Post-Transformation

In [0]:
# Display the schema of the DataFrame
df_date.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- time_take_min: double (nullable = true)
 |-- average_speed: double (nullable = true)
 |-- pickup_hour_of_day: integer (n

In [0]:
# Display summary statistics of the DataFrame
display(df_date.describe())

summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,time_take_min,average_speed,pickup_hour_of_day,pickup_day_of_week,time_of_day_slot
count,2644148.0,2644148.0,2644148.0,2644148.0,2644148,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148.0,2644148
mean,1.781070499835864,1.3579047012497032,3.1113047000391583,1.0428633344275735,,167.03551314071677,165.9750917876004,1.1879630035837632,17.616676267744875,1.516216996930582,0.4983195343074594,3.417220915774524,0.5055190859188867,0.9999133558333332,26.508934503653645,2.355136512782189,0.139265464716801,14.202813874260094,0.1901254980687076,14.312204536205993,4.079645693055003,
stddev,0.4135207839914704,0.8472486598875809,4.142579316162382,0.221698993464133,,62.976698076883686,68.84818842817774,0.4563650517035763,15.437997537449489,1.7709323490063271,0.0292225761816001,3.661203573745163,2.001008633146639,0.0079102604739585,19.948396124519963,0.584099548505086,0.4736240897884445,12.44216893000307,0.1107345445100457,5.651809513648375,1.9253902160179297,
min,1.0,1.0,0.0,1.0,N,1.0,1.0,1.0,2.5,0.0,0.0,0.0,0.0,0.0,4.0,0.0,0.0,0.0166666666666666,0.0,0.0,1.0,Afternoon
max,2.0,9.0,49.92,4.0,Y,265.0,265.0,4.0,273.9,14.25,4.0,422.7,101.69,1.0,453.55,2.5,1.75,2133.333333333333,0.8333333333333334,23.0,7.0,Night


**Saving the final cleaned data**

In [0]:
output_path_parquet = "/FileStore/tables/innovateretail/clean/final_cleaned_data"
df_date.write.mode("overwrite").parquet(output_path_parquet)
print(f"DataFrame saved to Parquet at: {output_path_parquet}")

DataFrame saved to Parquet at: /FileStore/tables/innovateretail/clean/final_cleaned_data


In [0]:
# Checking all the saved files
files = dbutils.fs.ls("dbfs:/FileStore/tables/innovateretail/clean")
display(files)

path,name,size,modificationTime
dbfs:/FileStore/tables/innovateretail/clean/1_trip_distance/,1_trip_distance/,0,1748778012000
dbfs:/FileStore/tables/innovateretail/clean/2_valid_year/,2_valid_year/,0,1748780649000
dbfs:/FileStore/tables/innovateretail/clean/3_valid_duration/,3_valid_duration/,0,1748778377000
dbfs:/FileStore/tables/innovateretail/clean/4_valid_fare/,4_valid_fare/,0,1748778456000
dbfs:/FileStore/tables/innovateretail/clean/5_feature_eng/,5_feature_eng/,0,1749378193000
dbfs:/FileStore/tables/innovateretail/clean/5_valid_passenger/,5_valid_passenger/,0,1748779276000
dbfs:/FileStore/tables/innovateretail/clean/6_clean_schema/,6_clean_schema/,0,1748779834000
dbfs:/FileStore/tables/innovateretail/clean/final_cleaned_data/,final_cleaned_data/,0,1748780538000


## Advanced Transformations and Business Logic

**Average Tip percentage for trips originating from airport locations compared to non-airport locations**

In [0]:
# Reading the cleaned data
input_path = "dbfs:/FileStore/tables/innovateretail/clean/final_cleaned_data"
df = spark.read.parquet(input_path)
display(df.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,time_take_min,average_speed,pickup_hour_of_day,pickup_day_of_week,time_of_day_slot
2,2024-01-01T00:57:55.000Z,2024-01-01T01:17:43.000Z,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0,19.8,0.0868686868686868,0,2,Night
1,2024-01-01T00:03:00.000Z,2024-01-01T00:09:36.000Z,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0,6.6,0.2727272727272727,0,2,Night
1,2024-01-01T00:17:06.000Z,2024-01-01T00:35:01.000Z,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0,17.916666666666668,0.2623255813953488,0,2,Night
1,2024-01-01T00:36:38.000Z,2024-01-01T00:44:56.000Z,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0,8.3,0.1686746987951807,0,2,Night
1,2024-01-01T00:46:51.000Z,2024-01-01T00:52:57.000Z,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0,6.1,0.1311475409836065,0,2,Night


In [0]:
# Calculate the percentage of trips that either start or end at one of the major airports (Newark, JFK, LaGuardia)
airport_ids = [1, 132, 138]

(df.filter(col("PULocationID").isin(airport_ids)).count() / df.count()) * 100

7.802437685031246

In [0]:
# Calculate the tip percentage and round it to 2 decimal places
df = df.withColumn(
    "tip_percent",
    round((col("tip_amount") / col("total_amount")) * 100 , 2)
)

In [0]:
df.count()

2644148

In [0]:
# Add a new column 'is_airport_pickup' to indicate if the pickup location is an airport
df = df.withColumn(
    "is_airport_pickup",
    col("PULocationID").isin(airport_ids)
)

In [0]:
# Display the average tip percentage grouped by whether the pickup location is an airport
display(df.groupBy("is_airport_pickup").agg(avg("tip_percent").alias("average_tip_percentage")))

is_airport_pickup,average_tip_percentage
True,11.12789188979563
False,12.5880558488907


Databricks visualization. Run in Databricks to view.

In [0]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee',
 'time_take_min',
 'average_speed',
 'pickup_hour_of_day',
 'pickup_day_of_week',
 'time_of_day_slot',
 'tip_percent',
 'is_airport_pickup']

In [0]:
# Display the average time taken for trips grouped by pickup and dropoff locations
display(df.groupBy("PULocationID", "DOLocationID").agg(avg("time_take_min")))

PULocationID,DOLocationID,avg(time_take_min)
224,236,21.913131313131316
263,263,4.015986055776895
66,25,10.144444444444444
152,116,5.796153846153847
80,75,34.166666666666664
13,181,19.21615384615385
162,181,32.550409836065576
209,229,14.254074074074072
137,93,13.075
163,195,31.7875


In [0]:
# Count the number of unique pickup and dropoff location pairs
df.groupBy("PULocationID", "DOLocationID").agg(avg("time_take_min")).count()

15199

In [0]:
# Display the count of trips for each payment type
display(df.groupBy("payment_type").agg(count("RatecodeID")))

payment_type,count(RatecodeID)
1,2199159
3,8929
4,21542
2,414518


**Ranking Payment Types by Usage Across Rate Codes**

In [0]:
# Group by RatecodeID and payment_type, and count the number of trips for each group
payment_group = df.groupBy("RatecodeID", "payment_type").count()

# Define a window specification to rank payment types within each RatecodeID group by count in descending order
windowspec = Window.partitionBy("RatecodeID").orderBy(desc("count"))

# Add a rank column to the payment_group DataFrame based on the window specification
ranked_payment = payment_group.withColumn("rank", rank().over(windowspec))

# Replace numeric RatecodeID values with descriptive labels
ranked_payment = ranked_payment.withColumn(
    "RatecodeID",
    when(col("RatecodeID") == 1, "Standard rate")
    .when(col("RatecodeID") == 2, "JFK")
    .when(col("RatecodeID") == 3, "Newark")
    .when(col("RatecodeID") == 4, "Nassau or Westchester")
)

# Replace numeric payment_type values with descriptive labels
ranked_payment = ranked_payment.withColumn(
    "payment_type",
    when(col("payment_type") == 1, "Credit card")
    .when(col("payment_type") == 2, "Cash")
    .when(col("payment_type") == 3, "No charge")
    .when(col("payment_type") == 4, "Dispute")
)

# Display the ranked_payment DataFrame
display(ranked_payment)

RatecodeID,payment_type,count,rank
Standard rate,Credit card,2114540,1
Standard rate,Cash,396786,2
Standard rate,Dispute,20493,3
Standard rate,No charge,8442,4
JFK,Credit card,78504,1
JFK,Cash,15953,2
JFK,Dispute,900,3
JFK,No charge,371,4
Newark,Credit card,5188,1
Newark,Cash,1465,2


Databricks visualization. Run in Databricks to view.

**Traffic Congestion**

In [0]:
# Display the average trip duration for each time of day slot
display(df.groupBy("time_of_day_slot").agg(avg("time_take_min")))

time_of_day_slot,avg(time_take_min)
Morning,13.862060007178703
Afternoon,15.249178506976577
LateNight,13.5339922142427
Night,12.688902973878466
Evening,14.043813289354285


In [0]:
# Group by time of day slot and calculate average speed, average duration, and trip count
traffic_congestion = (df
    .groupBy("time_of_day_slot")
    .agg(
        avg("average_speed").alias("avg_speed_mph"),  # Calculate average speed in mph
        avg("time_take_min").alias("avg_duration_min"),  # Calculate average trip duration in minutes
        count("*").alias("trip_count")  # Count the number of trips
    )
    .orderBy("time_of_day_slot"))  # Order by time of day slot

# Display the traffic congestion DataFrame
display(traffic_congestion)

time_of_day_slot,avg_speed_mph,avg_duration_min,trip_count
Afternoon,0.16811065793553,15.249178506976577,808467
Evening,0.1829786224518464,14.043813289354285,694265
LateNight,0.2271292161569624,13.5339922142427,372475
Morning,0.1870885636679543,13.862060007178703,577642
Night,0.2462230120330254,12.688902973878466,191299


Databricks visualization. Run in Databricks to view.

## Loading Data into Delta Lake and Basic Optimization

In [0]:
# Path where your “final_cleaned_data” Parquet files currently live
parquet_path = "dbfs:/FileStore/tables/innovateretail/clean/final_cleaned_data/"

# Read the Parquet data into Spark
final_df = spark.read.parquet(parquet_path)

# (Optional) Inspect schema
final_df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- time_take_min: double (nullable = true)
 |-- average_speed: double (nullable = true)
 |-- pickup_hour_of_day: integer (n

In [0]:
# Add year, month, and day columns extracted from pickup datetime
final_with_parts = (
    final_df
    .withColumn("pickup_year",  year("tpep_pickup_datetime"))
    .withColumn("pickup_month", month("tpep_pickup_datetime"))
    .withColumn("pickup_day",   dayofmonth("tpep_pickup_datetime"))
)

In [0]:
# Define Delta table target path
delta_target_path = "dbfs:/FileStore/delta/innovateretail/taxi_final_delta"

# Write DataFrame to Delta format, partitioned by year, month, and day
(final_with_parts
 .write
 .format("delta")
 .mode("overwrite")
 .partitionBy("pickup_year", "pickup_month", "pickup_day")
 .save(delta_target_path)
)

In [0]:
# List the top-level of your Delta folder
display(dbutils.fs.ls("dbfs:/FileStore/delta/innovateretail/taxi_final_delta"))

path,name,size,modificationTime
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/_delta_log/,_delta_log/,0,1749027966000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/,pickup_year=2024/,0,1749027973000


In [0]:
# Define the Delta table path
delta_path = "dbfs:/FileStore/delta/innovateretail/taxi_final_delta"

# Read data from Delta table into a DataFrame
df_again = spark.read.format("delta").load(delta_path)

# Display the schema of the DataFrame
df_again.printSchema()

# Show the first 5 rows of the DataFrame
display(df_again.limit(5))

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- time_take_min: double (nullable = true)
 |-- average_speed: double (nullable = true)
 |-- pickup_hour_of_day: integer (n

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,time_take_min,average_speed,pickup_hour_of_day,pickup_day_of_week,time_of_day_slot,pickup_year,pickup_month,pickup_day
2,2024-01-12T00:00:58.000Z,2024-01-12T00:27:22.000Z,5,11.16,1,N,132,61,2,45.0,1.0,0.5,0.0,0.0,1.0,49.25,0.0,1.75,26.4,0.4227272727272727,0,6,Night,2024,1,12
2,2024-01-12T00:03:42.000Z,2024-01-12T00:22:34.000Z,6,3.3,1,N,249,142,1,19.8,1.0,0.5,4.96,0.0,1.0,29.76,2.5,0.0,18.866666666666667,0.1749116607773851,0,6,Night,2024,1,12
1,2024-01-12T00:43:08.000Z,2024-01-12T00:55:08.000Z,1,1.6,1,N,140,142,1,11.4,3.5,0.5,4.1,0.0,1.0,20.5,2.5,0.0,12.0,0.1333333333333333,0,6,Night,2024,1,12
1,2024-01-12T00:08:23.000Z,2024-01-12T00:11:30.000Z,2,0.3,1,N,107,107,1,4.4,3.5,0.5,2.0,0.0,1.0,11.4,2.5,0.0,3.1166666666666667,0.0962566844919786,0,6,Night,2024,1,12
2,2024-01-12T00:51:18.000Z,2024-01-12T01:04:02.000Z,1,2.51,1,N,79,13,1,14.9,1.0,0.5,2.98,0.0,1.0,22.88,2.5,0.0,12.733333333333333,0.1971204188481675,0,6,Night,2024,1,12


In [0]:
%sql
-- Create database and table for taxi data using Delta format
CREATE DATABASE IF NOT EXISTS innovateretail_db;
CREATE TABLE IF NOT EXISTS innovateretail_db.taxi_delta
  USING DELTA
  LOCATION 'dbfs:/FileStore/delta/innovateretail/taxi_final_delta';

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-8465057304688005>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m"[39m[38;5;124mCREATE DATABASE IF NOT EXISTS innovateretail_db;[39m[38;5;130;01m\n[39;00m[38;5;124mCREATE TABLE IF NOT EXISTS innovateretail_db.taxi_delta[39m[38;5;130;01m\n[39;00m[38;5;124m  USING DELTA[39m[38;5;130;01m\n[39;00m[38;5;124m  LOCATION [39m[38;5;124m'[39m[38;5;124mdbfs:/FileStore/delta/innovateretail/taxi_final_delta[39m[38;5;124m'[39m[38;5;124m;[39m[38;5;130;01m\n[39;00m[38;5;124m"[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2541[0m, in [0;36mInteractiveShell.run_cell_magic[0;34m(self, magic_name, line, 

In [0]:
# List the top-level of your Delta folder
display(dbutils.fs.ls("dbfs:/FileStore/delta/innovateretail/taxi_final_delta"))

# Drill into one year folder
display(dbutils.fs.ls("dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024"))

# Drill one level deeper into a month folder
display(dbutils.fs.ls("dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1"))


path,name,size,modificationTime
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/_delta_log/,_delta_log/,0,1749027966000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/,pickup_year=2024/,0,1749027973000


path,name,size,modificationTime
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/,pickup_month=1/,0,1749027973000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=2/,pickup_month=2/,0,1749027980000


path,name,size,modificationTime
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=1/,pickup_day=1/,0,1749027973000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=10/,pickup_day=10/,0,1749027981000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=11/,pickup_day=11/,0,1749027981000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=12/,pickup_day=12/,0,1749027978000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=13/,pickup_day=13/,0,1749027976000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=14/,pickup_day=14/,0,1749027983000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=15/,pickup_day=15/,0,1749027980000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=16/,pickup_day=16/,0,1749027981000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=17/,pickup_day=17/,0,1749027982000
dbfs:/FileStore/delta/innovateretail/taxi_final_delta/pickup_year=2024/pickup_month=1/pickup_day=18/,pickup_day=18/,0,1749027982000


In [0]:
%sql
-- Create database and table for taxi data in Hive catalog
USE CATALOG hive_metastore;
CREATE DATABASE IF NOT EXISTS innovateretail_hive;
CREATE TABLE IF NOT EXISTS innovateretail_hive.taxi_delta
USING DELTA
LOCATION 'dbfs:/FileStore/delta/innovateretail/taxi_final_delta';

In [0]:
%sql
-- Show detailed metadata information about the taxi_delta table
USE CATALOG hive_metastore;
DESCRIBE DETAIL innovateretail_hive.taxi_delta;

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,c421a320-2872-4c2d-b092-abc2fe56457d,hive_metastore.innovateretail_hive.taxi_delta,,dbfs:/FileStore/delta/innovateretail/taxi_final_delta,2025-06-04T09:06:06.125Z,2025-06-04T09:06:27Z,"List(pickup_year, pickup_month, pickup_day)",List(),35,79961889,Map(delta.enableDeletionVectors -> true),3,7,"List(appendOnly, deletionVectors, invariants)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False


In [0]:
%sql
-- List all partitions of the taxi_delta table
USE CATALOG hive_metastore;
SHOW PARTITIONS innovateretail_hive.taxi_delta;

pickup_year,pickup_month,pickup_day
2024,2,1
2002,12,31
2024,1,3
2024,1,16
2024,1,24
2024,1,15
2024,1,20
2024,1,31
2009,1,1
2024,1,27


In [0]:
%sql
-- Retrieve all records from February 2024
SELECT *
FROM innovateretail_hive.taxi_delta
WHERE pickup_year = 2024 AND pickup_month = 2

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,time_take_min,average_speed,pickup_hour_of_day,pickup_day_of_week,time_of_day_slot,pickup_year,pickup_month,pickup_day
2,2024-02-01T00:01:15Z,2024-02-01T00:06:30Z,1,1.1,1,N,161,234,1,7.9,1.0,0.5,2.58,0.0,1.0,15.48,2.5,0.0,5.25,0.2095238095238095,0,5,Night,2024,2,1
2,2024-02-01T00:00:17Z,2024-02-01T00:20:13Z,1,8.93,1,N,138,152,2,36.6,6.0,0.5,0.0,6.94,1.0,52.79,0.0,1.75,19.933333333333334,0.4479933110367893,0,5,Night,2024,2,1
2,2024-02-01T00:00:39Z,2024-02-01T00:12:08Z,5,2.22,1,N,186,79,1,13.5,1.0,0.5,3.7,0.0,1.0,22.2,2.5,0.0,11.483333333333333,0.1933236574746009,0,5,Night,2024,2,1


In [0]:
delta_path = "dbfs:/FileStore/delta/innovateretail/taxi_final_delta"

df_again = spark.read.format("delta").load(delta_path)

df_again.printSchema()
display(df_again.limit(5))

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- time_take_min: double (nullable = true)
 |-- average_speed: double (nullable = true)
 |-- pickup_hour_of_day: integer (n

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,time_take_min,average_speed,pickup_hour_of_day,pickup_day_of_week,time_of_day_slot,pickup_year,pickup_month,pickup_day
2,2024-01-16T23:45:34.000Z,2024-01-16T23:53:08.000Z,1,3.09,1,N,132,10,1,14.2,1.0,0.5,0.1,0.0,1.0,18.55,0.0,1.75,7.566666666666666,0.4083700440528634,23,3,LateNight,2024,1,16
2,2024-01-16T23:59:51.000Z,2024-01-17T00:01:22.000Z,1,0.28,1,N,263,263,2,4.4,1.0,0.5,0.0,0.0,1.0,9.4,2.5,0.0,1.5166666666666666,0.1846153846153846,23,3,LateNight,2024,1,16
2,2024-01-16T23:59:42.000Z,2024-01-17T00:30:37.000Z,1,6.63,1,N,114,257,1,33.8,1.0,0.5,9.7,0.0,1.0,48.5,2.5,0.0,30.916666666666668,0.2144474393530997,23,3,LateNight,2024,1,16
2,2024-01-16T23:59:03.000Z,2024-01-17T00:05:13.000Z,1,1.09,1,N,249,79,1,8.6,1.0,0.5,2.0,0.0,1.0,15.6,2.5,0.0,6.166666666666667,0.1767567567567567,23,3,LateNight,2024,1,16
2,2024-01-16T23:59:53.000Z,2024-01-17T00:00:26.000Z,1,0.02,1,N,70,70,2,3.0,1.0,0.5,0.0,0.0,1.0,5.5,0.0,0.0,0.55,0.0363636363636363,23,3,LateNight,2024,1,16


**Optimization and Z-order**

In [0]:
# Load the Delta table from the specified path
from delta.tables import DeltaTable
delta_path = "dbfs:/FileStore/delta/innovateretail/taxi_final_delta"
delta_table = DeltaTable.forPath(spark, delta_path)

In [0]:
# Optimize the Delta table to compact small files and improve query performance
delta_table.optimize()

<delta.connect.tables.DeltaOptimizeBuilder at 0x7f9b37064350>

In [0]:
# Compact and Z-order by two high-cardinality columns
delta_table.optimize().executeZOrderBy("PULocationID", "DOLocationID")

DataFrame[path: string, metrics: struct<autoCompactParallelismStats:void,clusteringMetrics:void,clusteringStats:void,deletionVectorStats:struct<numDeletionVectorRowsRemoved:bigint,numDeletionVectorsRemoved:bigint>,endTimeMs:bigint,filesAdded:struct<avg:double,max:void,min:void,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<avg:double,max:void,min:void,totalFiles:bigint,totalSize:bigint>,numBatches:bigint,numBins:bigint,numBytesSkippedToReduceWriteAmplification:bigint,numFilesAdded:bigint,numFilesRemoved:bigint,numFilesSkippedToReduceWriteAmplification:bigint,numTableColumns:bigint,numTableColumnsWithStats:bigint,partitionsOptimized:bigint,preserveInsertionOrder:boolean,skippedArchivedFiles:bigint,startTimeMs:bigint,totalClusterParallelism:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,totalScheduledTasks:bigint,totalTaskExecutionTimeMs:bigint,zOrderStats:struct<inputCubeFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,inputOtherFiles:struct<num:bigint,siz

## Data Analysis and Insights from Delta Table

In [0]:
# Load the Delta table into a DataFrame
delta_path = "dbfs:/FileStore/delta/innovateretail/taxi_final_delta"

df = spark.read.format("delta").load(delta_path)

In [0]:
df.count()

2644134

In [0]:
display(df.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,time_take_min,average_speed,pickup_hour_of_day,pickup_day_of_week,time_of_day_slot,pickup_year,pickup_month,pickup_day
2,2024-01-12T00:00:58.000Z,2024-01-12T00:27:22.000Z,5,11.16,1,N,132,61,2,45.0,1.0,0.5,0.0,0.0,1.0,49.25,0.0,1.75,26.4,0.4227272727272727,0,6,Night,2024,1,12
2,2024-01-12T00:03:42.000Z,2024-01-12T00:22:34.000Z,6,3.3,1,N,249,142,1,19.8,1.0,0.5,4.96,0.0,1.0,29.76,2.5,0.0,18.866666666666667,0.1749116607773851,0,6,Night,2024,1,12
1,2024-01-12T00:43:08.000Z,2024-01-12T00:55:08.000Z,1,1.6,1,N,140,142,1,11.4,3.5,0.5,4.1,0.0,1.0,20.5,2.5,0.0,12.0,0.1333333333333333,0,6,Night,2024,1,12
1,2024-01-12T00:08:23.000Z,2024-01-12T00:11:30.000Z,2,0.3,1,N,107,107,1,4.4,3.5,0.5,2.0,0.0,1.0,11.4,2.5,0.0,3.1166666666666667,0.0962566844919786,0,6,Night,2024,1,12
2,2024-01-12T00:51:18.000Z,2024-01-12T01:04:02.000Z,1,2.51,1,N,79,13,1,14.9,1.0,0.5,2.98,0.0,1.0,22.88,2.5,0.0,12.733333333333333,0.1971204188481675,0,6,Night,2024,1,12


**Average fare amount vary by time_of_day_slot and pickup_day_of_week**

In [0]:
# Calculate and display the average fare and trip count grouped by time of day slot and day of the week
display(df
    .groupBy("time_of_day_slot", "pickup_day_of_week")
    .agg(
        round(avg("fare_amount"), 2).alias("avg_fare"),
        count("*").alias("trip_count")
    )
    .orderBy("pickup_day_of_week", "time_of_day_slot"))

time_of_day_slot,pickup_day_of_week,avg_fare,trip_count
Afternoon,1,17.92,99084
Evening,1,18.95,67129
LateNight,1,22.36,31809
Morning,1,17.06,51774
Night,1,15.66,51809
Afternoon,2,18.46,114169
Evening,2,18.02,92839
LateNight,2,21.59,38653
Morning,2,17.91,81830
Night,2,19.61,34057


Databricks visualization. Run in Databricks to view.

**Top 10 busiest routes (PULocationID to DOLocationID pairs) during peak hours**

In [0]:
# Display the top 10 most frequent trips during Morning and Evening time slots
display(df
    .filter(col("time_of_day_slot").isin("Morning", "Evening"))
    .groupBy("PULocationID", "DOLocationID")
    .agg(count("*").alias("trip_count"))
    .orderBy(col("trip_count").desc())
    .limit(10))

PULocationID,DOLocationID,trip_count
237,236,10072
236,237,9891
236,236,7733
237,237,6690
161,237,4776
237,161,4755
236,161,4599
239,142,4317
161,236,4078
142,239,4069


In [0]:
# Recalculate traffic congestion metrics grouped by time of day slot
traffic_congestion = (df
    .groupBy("time_of_day_slot")
    .agg(
        avg("average_speed").alias("avg_speed_mph"),
        avg("time_take_min").alias("avg_duration_min"),
        count("*").alias("trip_count")
    )
    .orderBy("time_of_day_slot"))

time_of_day_slot,avg_speed_mph,avg_duration_min,trip_count
Afternoon,0.1681106579355663,15.249178506976792,808467
Evening,0.1829786224518885,14.04381328935399,694265
LateNight,0.2271292161569906,13.533992214242629,372475
Morning,0.1870885636680011,13.862060007178604,577642
Night,0.2462230120330302,12.688902973878603,191299


In [0]:
# (c) By highest trip count (most frequent trips)
peak_by_volume = traffic_congestion.orderBy(col("trip_count").desc())
peak_by_volume.show(truncate=False)

+----------------+-------------------+------------------+----------+
|time_of_day_slot|avg_speed_mph      |avg_duration_min  |trip_count|
+----------------+-------------------+------------------+----------+
|Afternoon       |0.16811065793553004|15.249178506976577|808467    |
|Evening         |0.18297862245184646|14.043813289354283|694265    |
|Morning         |0.1870885636679543 |13.862060007178703|577642    |
|LateNight       |0.2271292161569624 |13.5339922142427  |372475    |
|Night           |0.24622301203302546|12.688902973878466|191299    |
+----------------+-------------------+------------------+----------+



In [0]:
# (a) By lowest average speed (worst congestion)
peak_by_speed = traffic_congestion.orderBy(col("avg_speed_mph").asc())
peak_by_speed.show(truncate=False)

# (b) By highest average duration
peak_by_duration = traffic_congestion.orderBy(col("avg_duration_min").desc())
peak_by_duration.show(truncate=False)

# (c) By highest trip count (most frequent trips)
peak_by_volume = traffic_congestion.orderBy(col("trip_count").desc())
peak_by_volume.show(truncate=False)

+----------------+-------------------+------------------+----------+
|time_of_day_slot|avg_speed_mph      |avg_duration_min  |trip_count|
+----------------+-------------------+------------------+----------+
|Afternoon       |0.16811065793553004|15.249178506976577|808467    |
|Evening         |0.18297862245184646|14.043813289354283|694265    |
|Morning         |0.1870885636679543 |13.862060007178703|577642    |
|LateNight       |0.2271292161569624 |13.5339922142427  |372475    |
|Night           |0.24622301203302546|12.688902973878466|191299    |
+----------------+-------------------+------------------+----------+

+----------------+-------------------+------------------+----------+
|time_of_day_slot|avg_speed_mph      |avg_duration_min  |trip_count|
+----------------+-------------------+------------------+----------+
|Afternoon       |0.16811065793553004|15.249178506976577|808467    |
|Evening         |0.18297862245184646|14.043813289354283|694265    |
|Morning         |0.1870885636679

In [0]:
# Define the time slots to filter
selected_peaks = ["Afternoon","Evening"]

# Filter the DataFrame for the selected time slots and group by pickup and dropoff locations
result = (df
          .filter(col("time_of_day_slot").isin(selected_peaks))
          .groupBy("PULocationID", "DOLocationID")
          .agg(count("*").alias("trip_count"))
          # Create a route column by concatenating pickup and dropoff locations
          .withColumn("route", concat_ws(" to ", "PULocationID", "DOLocationID"))
          # Order by trip count in descending order and limit to top 10
          .orderBy(col("trip_count").desc())
          .limit(10)
)

# Display the result
display(result)

PULocationID,DOLocationID,trip_count,route
237,236,13760,237 to 236
236,237,11701,236 to 237
236,236,9598,236 to 236
237,237,8842,237 to 237
161,237,7019,161 to 237
161,236,5956,161 to 236
142,239,5519,142 to 239
239,142,5413,239 to 142
239,238,5287,239 to 238
237,161,4617,237 to 161


Databricks visualization. Run in Databricks to view.

**Airport vs. Non-Airport Trips: Distance, Fare & Tip Comparison**

In [0]:
# Calculate correlation between trip duration and tip amount, along with average duration and tip amount, grouped by payment type
display(df
    .groupBy("payment_type")
    .agg(
        corr("time_take_min", "tip_amount").alias("corr_duration_tip"),
        round(avg("time_take_min"), 2).alias("avg_duration"),
        round(avg("tip_amount"), 2).alias("avg_tip")
    )
    .orderBy(col("corr_duration_tip").desc())
)

payment_type,corr_duration_tip,avg_duration,avg_tip
1,0.5993381839759536,14.25,4.11
2,0.009153671847001,14.08,0.0
3,0.0085427461014154,11.99,0.01
4,-0.0009332312314302628,13.07,0.0


In [0]:
# Define airport location IDs for tagging trips
airport_ids = [1, 132, 138]

# Tag trips as 'airport' or 'non_airport' and calculate tip percentage
tagged_df = (
    df
    .withColumn(
        "trip_type",
        when(
            col("PULocationID").isin(airport_ids) | col("DOLocationID").isin(airport_ids),
            "airport"
        ).otherwise("non_airport")
    )
    .withColumn(
        "tip_pct",
        when(col("fare_amount") > 0, round((col("tip_amount") / col("fare_amount")) * 100, 2))
        .otherwise(None)
    )
)

# Aggregate statistics by trip type
airport_stats_df = (
    tagged_df
    .groupBy("trip_type")
    .agg(
        round(avg("trip_distance"), 2).alias("avg_distance_miles"),
        round(avg("fare_amount"), 2).alias("avg_fare_usd"),
        round(avg("tip_pct"), 2).alias("avg_tip_percentage"),
        count("*").alias("total_trips")
    )
    .orderBy("trip_type")
)

# Display the aggregated statistics
display(airport_stats_df)

trip_type,avg_distance_miles,avg_fare_usd,avg_tip_percentage,total_trips
airport,13.43,53.93,17.5,252810
non_airport,2.02,13.78,22.07,2391324
