# Pre-processing_final
This notebook merges 3 preprocessed datasets

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sbs
import geopandas as gpd
import folium 

In [2]:
from pyspark.sql import SparkSession

# Create a spark session with increased memory allocation
spark = (
    SparkSession.builder.appName("ADS Project1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "8g")  # Set the driver memory to 8GB
    .config("spark.executor.memory", "8g")  # Set the executor memory to 8GB
    .getOrCreate()
)

24/08/28 18:16:49 WARN Utils: Your hostname, Hanshis-Laptop.local resolves to a loopback address: 127.0.0.1; using 172.16.119.15 instead (on interface en0)
24/08/28 18:16:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/28 18:16:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.conf.set("spark.sql.parquet.compression.codec","gzip")

## Datasets 

In [4]:
# read the data from preprocessed tlc data
tdf = spark.read.parquet("../data/raw/tlc_df.parquet") 

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/hanshitang/Documents/ADS/Proj1/project-1-individual-HanshiTang/data/raw/tlc_df.parquet.

In [None]:
# read the preprocessed weather data
wdf = spark.read.csv("../data/raw/NYC_weather_raw.csv", header=True, inferSchema=True)


In [None]:
# read the preprocessed event data
edf = spark.read.parquet("../data/raw/NYC_Permitted_Event_Information_Historical.parquet") 

In [None]:
# show 5 rows of the tlc data
tdf.show(5)

+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+-------------+-----------+------------+-----------+------------+
|VendorID|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|ehail_fee|trip_duration|pickup_hour|dropoff_hour|pickup_date|dropoff_date|
+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+-------------+-----------+------------+-----------+------------+
|       2|              1|         1.75|         1|             false|          68|          50|           2|       12.1|  1.0|   

In [None]:
# schema of the tlc data
tdf.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- dropoff_date: date (nullable = true)



In [None]:
# show 5 rows of the weather data
wdf.show(5)

+----------+----+-------+---+------+---+----+------+
|      DATE|HOUR|    CIG|WND|   VIS|TMP| DEW|   SLP|
+----------+----+-------+---+------+---+----+------+
|2023-12-01|   0|22000.0|4.1|1609.3|9.4|-2.8|1020.1|
|2023-12-01|   1|22000.0|2.1|1609.3|8.9|-2.2|1020.0|
|2023-12-01|   2|22000.0|3.1|1609.3|8.9|-2.2|1020.4|
|2023-12-01|   3|22000.0|3.1|1609.3|8.3|-1.7|1020.7|
|2023-12-01|   4|22000.0|3.1|1609.3|7.8|-1.7|1020.8|
+----------+----+-------+---+------+---+----+------+
only showing top 5 rows



In [None]:
# show 5 rows of the event data
edf.show(5)

+--------+----------+----------+----------+--------+-------------+-------------+--------------------+
|Event ID|Start Date|Start Hour|  End Date|End Hour|   Event Type|Event Borough|      Event Location|
+--------+----------+----------+----------+--------+-------------+-------------+--------------------+
|  684438|2023-12-09|         7|2023-12-09|      10|Special Event|     Brooklyn|Prospect Park: Pi...|
|  693693|2023-12-03|        12|2023-12-03|      14|Special Event|    Manhattan|Central Park: Wag...|
|  686564|2023-12-03|        17|2023-12-03|      18|Special Event|    Manhattan|Carl Schurz Park:...|
|  684416|2023-12-09|         9|2023-12-09|      11|Special Event|    Manhattan|Washington Square...|
|  687023|2023-12-01|        11|2023-12-01|      12|Special Event|    Manhattan|Central Park: Lad...|
+--------+----------+----------+----------+--------+-------------+-------------+--------------------+
only showing top 5 rows



## Aggregation

### tlc aggregation for number of trips

In [None]:
# Convert pickup_date and pickup_hour to a timestamp and create Time column
tdf = tdf.withColumn(
    'Time', 
    to_timestamp(concat_ws(' ', col('pickup_date'), col('pickup_hour')))
)

# Aggregate hourly trip counts
hourly_trip_counts = tdf.groupBy('Time').agg(count('*').alias('hourly_trip_count'))

# Aggregate daily trip counts
daily_trip_counts = tdf.groupBy('pickup_date').agg(count('*').alias('daily_trip_count'))

# Join the aggregated values back to tdf
tdf = tdf.join(hourly_trip_counts, on='Time', how='left')
tdf = tdf.join(daily_trip_counts, on='pickup_date', how='left')

# Drop the Time column if you no longer need it
tdf = tdf.drop('Time')

### Map taxi zones to boroughs 

In [None]:
# Load the zones DataFrame
zones = spark.read.csv("../data/landing/external/taxi_zones.csv", header=True, inferSchema=True)

In [None]:
# Join the tdf DataFrame with zones to get the borough for PULocationID
tdf = tdf.join(zones.select('LocationID', 'borough').withColumnRenamed('borough', 'PUBorough'),
               tdf['PULocationID'] == zones['LocationID'], 'left').drop('LocationID')

# Join the tdf DataFrame with zones to get the borough for DOLocationID
tdf = tdf.join(zones.select('LocationID', 'borough').withColumnRenamed('borough', 'DOBorough'),
               tdf['DOLocationID'] == zones['LocationID'], 'left').drop('LocationID')


### Aggregation of number of events

In [None]:
# Combine 'Start Date' and 'Start Hour' to create a 'Start Time' column (formatted correctly)
edf = edf.withColumn('Start Time', concat_ws(' ', col('Start Date'), col('Start Hour')))

# Aggregate the number of events per date, hour, and borough
edf = edf.groupBy('Start Date', 'Start Hour', 'Event Borough').agg(count('Event ID').alias('Number of Events'))

# Sort the results if needed
edf = edf.orderBy('Start Date', 'Start Hour', 'Event Borough')

## Merge datasets 

In [None]:
# Step 1: Create unified datetime columns in each dataset

# For tdf (taxi dataset)
tdf = tdf.withColumn("datetime", to_timestamp(concat_ws(" ", col("pickup_date"), col("pickup_hour")), "yyyy-MM-dd H"))

# For edf (event dataset)
edf = edf.withColumn("datetime", to_timestamp(concat_ws(" ", col("Start Date"), col("Start Hour")), "yyyy-MM-dd H"))

# For wdf (weather dataset)
wdf = wdf.withColumn("datetime", to_timestamp(concat_ws(" ", col("DATE"), col("HOUR")), "yyyy-MM-dd H"))

# Step 2: Join tdf (taxi dataset) with edf (event dataset) on datetime and borough
tdf_edf = tdf.join(edf, 
                   (tdf["datetime"] == edf["datetime"]) & 
                   (tdf["PUBorough"] == edf["Event Borough"]), 
                   "left")

# Drop the redundant datetime column from edf after the join to avoid ambiguity
tdf_edf = tdf_edf.drop(edf["datetime"])

# Step 3: Join the result with the weather dataset on datetime only (since weather data is from one station)
final_df = tdf_edf.join(wdf, 
                        tdf_edf["datetime"] == wdf["datetime"], 
                        "left")

# Step 4: Drop the redundant datetime columns after the final join
final_df = final_df.drop(wdf["datetime"]).drop(edf["datetime"]) 


In [None]:
# Drop the redundant datetime columns after the final join 
final_df = final_df.drop("weather_datetime", 'event_datetime', 'datetime', 'DATE', 'HOUR', 'Start Date', 'Start Hour', 'Event Borough')

In [None]:
# print the schema of the final dataframe
final_df.printSchema()

root
 |-- pickup_date: date (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- dropoff_date: date (nullable = true)
 |-- hourly_trip_count: long 

In [None]:
final_df = final_df.select(
    'pickup_date', 'pickup_hour', 'dropoff_date', 'dropoff_hour',
    'VendorID', 'passenger_count', 'trip_distance', 'trip_duration',
    'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'PUBorough',
    'DOLocationID', 'DOBorough', 'payment_type',
    'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
    'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'ehail_fee',
    'hourly_trip_count', 'daily_trip_count',
    'CIG', 'WND', 'VIS', 'TMP', 'DEW', 'SLP',
    'Number of Events'
)


## Preprocessing

### Handling missing data
The assumption is no events occured if number of event in Null


In [None]:
# Impute missing values in Number of Events column with 0
final_df = final_df.fillna(0, subset=['Number of Events'])

In [None]:
# Calculate the mode for PUBorough based on PULocationID
puborough_mode = final_df.groupBy("PULocationID", "PUBorough").agg(count("*").alias("count"))
puborough_mode = puborough_mode.withColumn("row", row_number().over(Window.partitionBy("PULocationID").orderBy(col("count").desc())))
puborough_mode = puborough_mode.filter(col("row") == 1).select("PULocationID", col("PUBorough").alias("PUBorough_mode"))

# Join to fill in missing PUBorough
final_df = final_df.join(puborough_mode, "PULocationID", "left").withColumn(
    "PUBorough", coalesce(col("PUBorough"), col("PUBorough_mode"))
).drop("PUBorough_mode")

# Calculate the mode for DOBorough based on DOLocationID
doborough_mode = final_df.groupBy("DOLocationID", "DOBorough").agg(count("*").alias("count"))
doborough_mode = doborough_mode.withColumn("row", row_number().over(Window.partitionBy("DOLocationID").orderBy(col("count").desc())))
doborough_mode = doborough_mode.filter(col("row") == 1).select("DOLocationID", col("DOBorough").alias("DOBorough_mode"))

# Join to fill in missing DOBorough
final_df = final_df.join(doborough_mode, "DOLocationID", "left").withColumn(
    "DOBorough", coalesce(col("DOBorough"), col("DOBorough_mode"))
).drop("DOBorough_mode")


In [None]:
# Define a window specification with proper partitioning
window_spec = Window.partitionBy("pickup_date").orderBy("pickup_hour").rowsBetween(-sys.maxsize, 0)

# Apply forward fill to the missing weather columns
final_df = final_df.withColumn("CIG", last(col("CIG"), ignorenulls=True).over(window_spec))
final_df = final_df.withColumn("WND", last(col("WND"), ignorenulls=True).over(window_spec))
final_df = final_df.withColumn("VIS", last(col("VIS"), ignorenulls=True).over(window_spec))
final_df = final_df.withColumn("TMP", last(col("TMP"), ignorenulls=True).over(window_spec))
final_df = final_df.withColumn("DEW", last(col("DEW"), ignorenulls=True).over(window_spec))
final_df = final_df.withColumn("SLP", last(col("SLP"), ignorenulls=True).over(window_spec))

### Handling error values

In [None]:
# Only keep rows within the date range of the taxi data
final_df = final_df.filter(col("pickup_date").between("2023-12-01", "2024-5-31"))

In [None]:
# Rate code id max should be 6
final_df = final_df.filter(col('RatecodeID') <= 6)

## Export file to curated folder

In [24]:
# save the merged data
final_df.write.parquet("../data/curated/tlc_data/first_cleaned.parquet")

24/08/28 18:11:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


CodeCache: size=131072Kb used=35038Kb max_used=35038Kb free=96034Kb
 bounds [0x000000010b1e8000, 0x000000010d488000, 0x00000001131e8000]
 total_blobs=12632 nmethods=11634 adapters=908
 compilation: disabled (not enough contiguous free space left)


24/08/28 18:15:20 ERROR FileFormatWriter: Aborting job fe485e4f-e8fa-4c54-8122-95352c72c813.
java.io.FileNotFoundException: File file:/Users/hanshitang/Documents/ADS/Proj1/project-1-individual-HanshiTang/data/curated/tlc_data/first_cleaned.parquet/_temporary/0/.DS_Store does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:390)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
	at org.apache.hadoop.fs.RawLocalFileSystem.rename(RawLocalFileSystem.java:505)
	at org.apache.hadoop.fs.ChecksumFileSystem.rename(ChecksumFileSystem.java:700)
	at org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem.rename(ProxyLocalFileSystem.java:34)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.

Py4JJavaError: An error occurred while calling o361.parquet.
: java.io.FileNotFoundException: File file:/Users/hanshitang/Documents/ADS/Proj1/project-1-individual-HanshiTang/data/curated/tlc_data/first_cleaned.parquet/_temporary/0/.DS_Store does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:390)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
	at org.apache.hadoop.fs.RawLocalFileSystem.rename(RawLocalFileSystem.java:505)
	at org.apache.hadoop.fs.ChecksumFileSystem.rename(ChecksumFileSystem.java:700)
	at org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem.rename(ProxyLocalFileSystem.java:34)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:476)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:405)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
