## Further preprocessing 

 ### 1. Importing libraries and files
 + Importing files from the first preprocessing

In [36]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [37]:
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

In [38]:
sdf_taxi = spark.read.parquet('../project-1-individual-phtangtr/data/processed_data/nyc_taxi_cleaned.parquet/') 
print("Taxi shape:", (sdf_taxi.count(), len(sdf_taxi.columns)))

Taxi shape: (33087357, 19)


### 2. Dropping unwanted features
+ Dropping airport fee
+ Dropping store_and_fwd_flag, mta_tax, 
+ Remove the cbd_congestion_fee from the toll for taxi entering zone 60th, removing extra toll for taxi

In [39]:
# double checking the value we have and dropping unwanted feature 
sdf_taxi = (
    sdf_taxi
    .drop("airport_fee","improvement_surcharge")
    .filter(
    (F.col("tpep_pickup_datetime") >= "2024-09-01") &
    (F.col("tpep_pickup_datetime") < "2025-05-01")
    )
)
print("Taxi shape:", (sdf_taxi.count(),
                       len(sdf_taxi.columns),
                       sdf_taxi.printSchema()))

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

Taxi shape: (28864486, 17, None)


+ there were no taxi with negative number of passenger but there is a large number of one without any passenger and we will remove them so they can't interfere with the data

#### Removing improvement surchage as it is irrelevant to the income

In [40]:
from pyspark.sql.functions import col
taxi_filtered = (
    sdf_taxi
    .select(
    col("tpep_pickup_datetime").alias("pickup_datetime"),
    col("tpep_dropoff_datetime").alias("dropoff_datetime"),
    col("fare_amount"),
    col("tip_amount"),
    col("payment_type"),
    col("DOLocationID"),
    col("PULocationID")
    )
)
print("Taxi after filter shape:", (taxi_filtered.count(), len(taxi_filtered.columns)))

Taxi after filter shape: (28864486, 7)


### 2. Fixing invalid value

In [41]:
taxi_filtered.filter(taxi_filtered['pickup_datetime'] > taxi_filtered['dropoff_datetime']).count()

                                                                                

1129

In [42]:
# swap pickup and dropoff datetime if they are in the wrong order
taxi_swapped = (
    taxi_filtered
    .withColumn(
    "pickup_temp",
    F.when(
        F.col("pickup_datetime") > F.col("dropoff_datetime"),
        F.col("dropoff_datetime")
        ).otherwise(F.col("pickup_datetime"))
    )
    .withColumn(
        "dropoff_temp", 
        F.when(F.col("pickup_datetime") > F.col("dropoff_datetime"),
        F.col("pickup_datetime")
        ).otherwise(F.col("dropoff_datetime"))
    )
    .drop("pickup_datetime", "dropoff_datetime")
    .withColumnRenamed("pickup_temp", "pickup_datetime")
    .withColumnRenamed("dropoff_temp", "dropoff_datetime")
)

print("After correcting datetime order shape:", 
      (taxi_swapped.count(), len(taxi_swapped.columns)))

After correcting datetime order shape: (28864486, 7)


In [44]:
# Filter trips to keep only realistic durations:
# - Minimum 1 minute
# - Maximum 15 hours
taxi_df = (
    taxi_swapped
    .filter(
        (F.col("dropoff_datetime") - F.col("pickup_datetime") 
         > F.expr("INTERVAL 1 MINUTE"))
        & (F.col("dropoff_datetime") - F.col("pickup_datetime") 
           < F.expr("INTERVAL 10 HOURS"))
    )
)
print("Shape for removing unrealistic trip duration time:", 
      (taxi_df.count(), len(taxi_df.columns)))

                                                                                

Shape for removing unrealistic trip duration time: (28508957, 7)


In [45]:
taxi_df.write.mode("overwrite").parquet("data/processed_data/nyc_taxi_final.parquet")

                                                                                