Ensuring the data is clean and extracting relevant atrributes

In [None]:
#done using pyspark and spark functions
from pyspark.sql import SparkSession, functions as F

In [None]:
#starting spark session using configuration reccomendations from tutorials
spark = (
    SparkSession.builder.appName("Data Preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "America/New_York")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/05 13:37:34 WARN Utils: Your hostname, Cassie-Laptop, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/09/05 13:37:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/05 13:37:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
#load TLC datasets
yellowDF = spark.read.parquet("../datasets/yellow_tlc_data/")
hvfhvDF  = spark.read.parquet("../datasets/hvfhv_data/")

#confirm shape
print(yellowDF.count(), len(yellowDF.columns))
print(hvfhvDF.count(), len(hvfhvDF.columns))

#truncate pickup time to only hour of pickup
yellowDF = yellowDF.withColumn("pickupHour", F.date_trunc("hour", 
F.col("tpep_pickup_datetime")))

hvfhvDF = hvfhvDF.withColumn("pickupHour", F.date_trunc("hour",
F.col("pickup_datetime")))


#group by pickup hour
yellowCount =(yellowDF.groupBy("pickupHour").agg(F.count("*")
.alias("yellowCount")))

hvfhvCount = (hvfhvDF.groupBy("pickupHour")
.agg(F.count("*").alias("hvfhvCount")))

48559814 19
260628583 24


In [None]:
#create final input dataframe

#combine counts into one dataframe
FinalData = (yellowCount.join(hvfhvCount, on="pickupHour", how="outer").fillna(0))

#add ratio
FinalData = FinalData.withColumn("ratio", F.col("yellowCount") /
(F.col("hvfhvCount") + F.lit(1)))


#combine with weather data

#loading weather data
weatherDF = spark.read.parquet("../datasets/weather_data/")

#confirm shape
print(weatherDF.count(), len(weatherDF.columns))

#rename for joining
weatherDF = weatherDF.withColumnRenamed("time", "pickupHour")

#join
FinalData = (FinalData.join(weatherDF, on="pickupHour", how="inner").orderBy("pickupHour"))

#confirm shape
print(FinalData.count(), len(FinalData.columns))

#save the final dataframe- code commented out due to additional preprocessing below

#output_path = "../datasets/processed.parquet"
#FinalData.write.mode("overwrite").parquet(output_path)

9528 3




9528 6


                                                                                

Additional preprocessing added after initial visualisations, for modelling and for visualisations

In [None]:
#add day of week
FinalData = FinalData.withColumn(
    "day",
    F.dayofweek("pickupHour"))

#seperate weekend and weekday
FinalData = FinalData.withColumn(
    "weekend",
    F.when((F.col("day") == 1) | (F.col("day") == 7), 1).otherwise(0)
)

#add boolean if rushour or not
FinalData = FinalData.withColumn(
    "rushhour",
    F.when(
        (F.col("weekend") == 0) & 
        (F.hour("pickupHour").isin([7,8,9,16,17,18])),
        1
    ).otherwise(0)
)

#add plain hour column
FinalData = FinalData.withColumn(
    "hour",
    F.hour("pickupHour")
)

#add rain threshold boolean
FinalData = FinalData.withColumn(
    "raining", 
    (F.col("rain") > 0.1).cast("int")   # threshold 0.1mm/hr, adjust if needed
)

#save final dataframe
output_path = "../datasets/processed.parquet"
FinalData.write.mode("overwrite").parquet(output_path)

                                                                                

Row(pickupHour=datetime.datetime(2024, 3, 1, 16, 0), yellowCount=3249, hvfhvCount=23182, ratio=0.14014579648880646, rain=0.0, temp=-6.8, day=6, weekend=0, rushhour=0, hour=0, raining=0)

Aditional preprocessing, checking for null values and outliers

In [None]:
#checking no null hours
FinalData.filter(F.col("rain").isNull()).count()
FinalData.filter(F.col("pickupHour").isNull()).count()

                                                                                

0



In [None]:
#checking for outliers

for col in ["ratio", "yellowCount", "hvfhvCount"]:
    q1, q3 = FinalData.approxQuantile(col, [0.25, 0.75], 0.01)
    IQR = q3 - q1
    lower = q1 - 1.5 * IQR
    upper = q3 + 1.5 * IQR
    
    outliers = FinalData.filter((F.col(col) < lower) | (F.col(col) > upper)).count()
    
    print(col, outliers)


                                                                                

ratio: 0 outliers (0.00% of data)


                                                                                

yellowCount: 0 outliers (0.00% of data)




hvfhvCount: 6 outliers (0.06% of data)


                                                                                