# Final Preprocessing Step
Read in the raw data, and perform the following steps:
1. Add columns for hour of day and day of week
2. Aggregate the data to plot the inter-borough movement of taxis (used in movement_plot.ipynb)
3. Combine with the Legally Operating Business data and weather data
4. Save the final data to a parquet file in the combined_data zone

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, dayofweek, month, date_trunc, col

In [2]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Tutorial 1")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "15g")
    .getOrCreate()
)

24/08/24 15:35:03 WARN Utils: Your hostname, coldbrew.local resolves to a loopback address: 127.0.0.1; using 172.16.119.16 instead (on interface en0)
24/08/24 15:35:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 15:35:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
tlc_all = spark.read.parquet('../data/raw/tlc_data/')

                                                                                

## Feature Engineering

In [4]:
# Add column for hour of day and day of week
tlc_all = tlc_all.withColumn("hourly_timestamp", date_trunc("hour",col("pickup_datetime")))
tlc_all = tlc_all.withColumn("pickup_hour_of_day", hour("pickup_datetime"))
tlc_all = tlc_all.withColumn("pickup_day_of_week", dayofweek("pickup_datetime"))
tlc_all = tlc_all.withColumn("pickup_month", month("pickup_datetime"))
tlc_all.show(5)

24/08/24 15:35:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----------------+--------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+-------------------+------------------+------------------+------------+
|hvfhs_license_num|dispatching_base_num|   request_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|   hourly_timestamp|pickup_hour_of_day|pickup_day_of_week|pickup_month|
+-----------------+--------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+----------------

In [5]:
zones = spark.read.csv("../data/taxi_zones/taxi+_zone_lookup.csv", header=True)
zones.show(10)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+


### Aggregating the movement data, to use in movement_plot.ipynb

In [6]:
# Merge the data with the shapefile
tlc_all = tlc_all.alias("tlc") \
    .join(zones.alias("zone"), tlc_all.PULocationID == zones.LocationID, how='left') \
    .select("tlc.*", "zone.Borough", "zone.Zone") \
    .withColumnRenamed("Borough", "pickup_borough")
tlc_all = tlc_all.alias("tlc") \
    .join(zones.alias("zone"), tlc_all.DOLocationID == zones.LocationID, how='left') \
    .select("tlc.*", "zone.Borough") \
    .withColumnRenamed("Borough", "dropoff_borough")

In [7]:
# Add a column for whether the pickup was at an airport, since they are often outliers
tlc_all = tlc_all.withColumn("pickup_at_airport", col("Zone").contains("Airport"))

In [8]:
movement_aggregates = tlc_all.groupBy('pickup_borough', 'dropoff_borough', 'pickup_hour_of_day').agg({
    '*': 'count',
})
movement_aggregates = movement_aggregates.withColumnRenamed('count(1)', 'num_trips')

In [9]:
movement_aggregates.write.mode('overwrite').parquet('../data/movement_aggregates')

                                                                                

In [10]:
demand_aggregate = tlc_all.groupBy('PULocationID', 'hourly_timestamp', 'pickup_hour_of_day', 'pickup_day_of_week', 'pickup_month', 'pickup_borough', "pickup_at_airport")\
    .agg({'*': 'count'})\
    .withColumnRenamed('count(1)', 'num_trips')\
    .orderBy('hourly_timestamp')

### Combining with LOB data

In [11]:
subway_df = spark.read.parquet("../data/raw/lob_data/")
subway_df.show(5)

+-----+----------+
|count|LocationID|
+-----+----------+
| 5001|         3|
| 3140|         4|
| 1758|         5|
| 2794|         6|
|22048|         7|
+-----+----------+


In [12]:
combined_df = demand_aggregate.alias("tlc").join(
    subway_df.withColumnsRenamed({"count": "pickup_num_businesses", "LocationID": "PULocationID"}),
    on="PULocationID",
    how='left'
)

### Combining with weather data

In [13]:
weather_df = spark.read.csv("../data/landing/weather/hourly_weather.csv", header=True)
combined_df = combined_df.alias("tlc").join(
    weather_df.withColumnsRenamed({"date": "hourly_timestamp"}),
    on="hourly_timestamp",
    how='left'
)

In [14]:
combined_df = combined_df.withColumn("temperature_2m", col("temperature_2m").cast("double"))
combined_df = combined_df.withColumn("relative_humidity_2m", col("relative_humidity_2m").cast("double"))
combined_df = combined_df.withColumn("rain", col("rain").cast("double"))
combined_df = combined_df.withColumn("snowfall", col("snowfall").cast("double"))
combined_df = combined_df.withColumn("wind_speed_10m", col("wind_speed_10m").cast("double"))

In [15]:
combined_df.printSchema()

root
 |-- hourly_timestamp: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- pickup_hour_of_day: integer (nullable = true)
 |-- pickup_day_of_week: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- pickup_at_airport: boolean (nullable = true)
 |-- num_trips: long (nullable = false)
 |-- pickup_num_businesses: long (nullable = true)
 |-- temperature_2m: double (nullable = true)
 |-- relative_humidity_2m: double (nullable = true)
 |-- rain: double (nullable = true)
 |-- snowfall: double (nullable = true)
 |-- wind_speed_10m: double (nullable = true)


In [16]:
combined_df.show(5)



+-------------------+------------+------------------+------------------+------------+--------------+-----------------+---------+---------------------+--------------+--------------------+----+--------+--------------+
|   hourly_timestamp|PULocationID|pickup_hour_of_day|pickup_day_of_week|pickup_month|pickup_borough|pickup_at_airport|num_trips|pickup_num_businesses|temperature_2m|relative_humidity_2m|rain|snowfall|wind_speed_10m|
+-------------------+------------+------------------+------------------+------------+--------------+-----------------+---------+---------------------+--------------+--------------------+----+--------+--------------+
|2022-05-01 00:00:00|         239|                 0|                 1|           5|     Manhattan|            false|      230|                 8034|       10.6285|           54.505844| 0.0|     0.0|     14.044615|
|2022-05-01 00:00:00|          37|                 0|                 1|           5|      Brooklyn|            false|      656|        

                                                                                

In [17]:
combined_df = combined_df.dropna()

In [18]:
combined_df.write.mode('overwrite').parquet('../data/curated')

                                                                                