This notebook consists of the preprocessing of the raw yellow and green taxi data of New York City (NYC) during January 2019-June 2019, the two parquet files obtained would be the hourly data and the daily data of the taxi rides

In [48]:
import os

# from the current `tute_1` directory, go back two levels to the `MAST30034 project` directory
output_relative_dir = '../../mast30034-project-1-janggani/data/raw/'

# check if it exists as it makedir will raise an error if it does exist
if not os.path.exists(output_relative_dir):
    os.makedirs(output_relative_dir)
    
# now, for each type of data set we will need, we will create the paths
for target_dir in ('yellow', 'green'): # should already exist
    if not os.path.exists(output_relative_dir + target_dir):
        os.makedirs(output_relative_dir + target_dir)

In [49]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 P1 PT") # P1 PT stands for project 1 preprocess taxi
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC") # ensure time zone isn't changed to Australian
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)


In [50]:
# get the yellow taxi data for jan 2019-june 2019
yellow = spark.read.parquet('../../mast30034-project-1-janggani/data/raw/yellow')
green = spark.read.parquet('../../mast30034-project-1-janggani/data/raw/green')

In [51]:
# remove null values in passenger count

yellow_t = yellow.select("tpep_pickup_datetime", "tpep_dropoff_datetime", 
                            "passenger_count", "trip_distance")

green_t = green.select("lpep_pickup_datetime", "lpep_dropoff_datetime", 
                            "passenger_count", "trip_distance")

# remove rows with null values
yellow_t = yellow_t.dropna()
green_t = green_t.dropna()

# make sure pickup time ranges from 2019 only
yellow_t = yellow_t.filter(F.col('tpep_pickup_datetime') > "2018-12-31 23:59:59")
green_t = green_t.filter(F.col('lpep_pickup_datetime') > "2018-12-31 23:59:59")

# remove 0 passengers and 0 trip distance as they may be errors or bugs
from pyspark.sql import functions as F
yellow_t = yellow_t.filter((F.col('passenger_count') >0) & (F.col('trip_distance') > 0))
green_t = green_t.filter((F.col('passenger_count')>0) & (F.col('trip_distance') > 0))

yellow_t.count(), green_t.count()

                                                                                

(43358435, 3231991)

In [52]:
# insert new column with trip duration in minutes and round it to 2 decimal points
yellow_new = yellow_t.withColumn('trip_duration_min',(col("tpep_dropoff_datetime").cast("long") - col('tpep_pickup_datetime').cast("long"))/60)
yellow_new = yellow_new.withColumn("trip_duration_min", F.round(yellow_new["trip_duration_min"], 2))

green_new = green_t.withColumn('trip_duration_min',(col("lpep_dropoff_datetime").cast("long") - col('lpep_pickup_datetime').cast("long"))/60)
green_new = green_new.withColumn("trip_duration_min", F.round(green_new["trip_duration_min"], 2))



In [53]:
# insert new column with pickup hour 
yellow_curated = yellow_new.withColumn("hour", hour(col("tpep_pickup_datetime")))
green_curated = green_new.withColumn("hour", hour(col("lpep_pickup_datetime")))

# new column with pickup date
yellow_curated = yellow_curated.withColumn("date", yellow_curated.tpep_pickup_datetime	.substr(1,10))
green_curated = green_curated.withColumn("date", green_curated.lpep_pickup_datetime	.substr(1,10))

# get the necessary columns
yellow_curated = yellow_curated.select("date", "hour", "trip_distance", "trip_duration_min")
green_curated = green_curated.select("date", "hour", "trip_distance", "trip_duration_min")


In [54]:
# get mean values for all columns per day and hour
yellow_mean = (yellow_curated
            .groupBy(F.col("hour"), F.col("date"))
            .agg(round(mean("trip_distance"), 2).alias("distance"), 
                 round(mean("trip_duration_min"), 2).alias("duration"))
            .orderBy("date","hour")
           )

green_mean = (green_curated
            .groupBy(F.col("hour"), F.col("date"))
            .agg(round(mean("trip_distance"), 2).alias("distance"),
                round(mean("trip_duration_min"), 2).alias("duration"))
            .orderBy("date","hour")
           )

In [55]:
# combine hour and date columns into one new column
green_dh = green_mean.select(concat_ws(' ', green_mean.date, green_mean.hour).alias("date hour"), "distance", "duration")
yellow_dh = yellow_mean.select(concat_ws(' ', yellow_mean.date, yellow_mean.hour).alias("date hour"), "distance", "duration")

In [56]:
yellow_curated.write.mode("overwrite").parquet("../../mast30034-project-1-janggani/data/curated/yellow/yellow_c.parquet")
yellow_dh.write.mode("overwrite").parquet("../../mast30034-project-1-janggani/data/curated/yellow/yellow_dh.parquet")\

green_curated.write.mode("overwrite").parquet("../../mast30034-project-1-janggani/data/curated/green/green_c.parquet")
green_dh.write.mode("overwrite").parquet("../../mast30034-project-1-janggani/data/curated/green/green_dh.parquet")

                                                                                

22/08/24 18:39:41 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 895735 ms exceeds timeout 120000 ms
22/08/24 18:39:41 WARN SparkContext: Killing executors is not supported by current scheduler.
