In [0]:
val rawDf = spark.read.option("header", "true").option("recursiveFileLookup", "true").parquet("/user/qz2166_nyu_edu/tlc_trip_data/yellow_taxi_trip")

In [1]:
val sortedDf = rawDf
    .filter(to_date($"tpep_dropoff_datetime") >= "2024-01-01" && to_date($"tpep_pickup_datetime") >= "2024-01-01")
    .filter(to_date($"tpep_dropoff_datetime") <= "2024-09-30" && to_date($"tpep_pickup_datetime") <= "2024-09-30")
    .sort(asc("tpep_pickup_datetime"))

In [2]:
val cleanDf = sortedDf
    .filter($"store_and_fwd_flag".isNotNull)
    .filter($"passenger_count" >= 1)
    .filter($"total_amount" > 0)
    .filter($"fare_amount" > 3)
    .filter($"trip_distance" > 0)
    .filter($"PULocationID" !== 264)
    .filter($"DOLocationID" !== 264)
    
// cleanDf.write.mode("overwrite").parquet("/user/qz2166_nyu_edu/tlc_trip_data/yellow_taxi_clean.parquet")

In [3]:
val locationDf = spark
    .read
    .option("header", "true")
    .csv("/user/qz2166_nyu_edu/taxi_zone_lookup.csv")
    .select(col("LocationID").alias("location_id"), concat_ws(",", col("Zone"), col("Borough")).alias("Location"))

In [4]:
val cleanLocDf = cleanDf
    .join(locationDf, cleanDf("PULocationID") === locationDf("location_id"))
    .withColumnRenamed("Location", "PULocation")
    .drop("location_id", "PULocationID")
    .join(locationDf, cleanDf("DOLocationID") === locationDf("location_id"))
    .withColumnRenamed("Location", "DOLocation")
    .drop("location_id", "DOLocationID")

cleanLocDf.write.mode("overwrite").parquet("/user/qz2166_nyu_edu/tlc_trip_data/yellow_taxi_clean_w_loc.parquet")

In [5]:
// additionally getting rid of outliers and changing the columns, keeping only credit card transactions since those tips are automatically populated (cash tips aren't recorded)
// also getting rid trips in and out of the city
val mlDf = cleanDf
    .filter($"PULocationID" !== 265)
    .filter($"DOLocationID" !== 265)
    .filter($"payment_type" === 1)
    .filter($"total_amount" < 250)
    .join(locationDf, cleanDf("PULocationID") === locationDf("location_id"))
    .withColumnRenamed("Location", "PULocation")
    .drop("location_id", "PULocationID")
    .join(locationDf, cleanDf("DOLocationID") === locationDf("location_id"))
    .withColumnRenamed("Location", "DOLocation")
    .drop("location_id", "DOLocationID")
    .withColumn("starting_time", hour($"tpep_pickup_datetime")*3600 + minute($"tpep_pickup_datetime")*60 + second($"tpep_pickup_datetime"))
    .withColumn("trip_duration", unix_timestamp($"tpep_dropoff_datetime") - unix_timestamp($"tpep_pickup_datetime"))
    .withColumn("no_tip_amount", $"total_amount" - $"tip_amount")
    .select("tip_amount", "no_tip_amount", "starting_time", "trip_duration", "trip_distance", "PULocation", "DOLocation", "passenger_count")
    
mlDf.write.mode("overwrite").parquet("/user/qz2166_nyu_edu/tlc_trip_data/yellow_taxi_ml_clean.parquet")

