## Preprocessing

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

In [None]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "3g")
    .config("spark.executer.memory", "4g")
    .getOrCreate()
)

In [None]:
# read the downloaded data 
sdf_all = spark.read.parquet('../data/raw/tlc_data/')
sdf_all.count()

In [None]:
sdf_all.printSchema()

### Feature Filtering
Irrelevant attributes are removed from the dataset. The following attributes are retained:
- pickup_datetime
- PULocationID
- DOLocationID
- trip_miles
- congestion_surcharge
- trip_time (response)


In [None]:
# only keep relevant features
sdf_all = sdf_all.select('pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_miles', 'congestion_surcharge', 'trip_time')
sdf_all.show(1, vertical=True, truncate=100)

### Outliers Removal
- Trips that have taken more than 5 hours or less than 2 minutes are removed from dataset.
- There are a large number of trips that have zero 'trip_miles', indicating that the vehicles has not moved at all. These records are removed.
- Trips with 'Unknown' pickup or dropoff location (LocationID is either 264 or 265) are removed.


In [None]:
# check the number of outliers
print("Number of trips has trip duration greater than 5 hours:", sdf_all.filter(F.col('trip_time') > 5*60*60).count())
print("Number of trips has trip duration less than 2 minutes:", sdf_all.filter(F.col('trip_time') < 2*60).count())
print("Number of trips with 0 distance", sdf_all.filter(F.col('trip_miles') <= 0).count())
print("Number of trips with unknown pickup/drop-off location:", sdf_all.filter((F.col('PULocationID') == 265) |
     (F.col('PULocationID') == 264) | (F.col('DOLocationID') == 265) | (F.col('DOLocationID') == 264)).count())

In [None]:
# filter out all outliers
sdf_all = sdf_all.filter((F.col('trip_time') <= 5*60*60) & (F.col('trip_time') > 2*60) & (F.col('trip_miles') > 0) &
                          (F.col('PULocationID') <= 263) & (F.col('DOLocationID') <= 263))

### Missing Values

In [None]:
# check if there are missing values for each attributes
features = ['pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_miles', 'congestion_surcharge', 'trip_time']
for attr in features:
    num_missing = sdf_all.filter(F.col(attr).isNull()).count()
    print(f"There are {num_missing} missing values for feature {attr}.")

Condsidering the large size of the dataset, it is reasonable to discard trips where 'congestion_surcharge' equals to null.

In [None]:
# remove records with missing values
sdf_all = sdf_all.filter(~ F.col("congestion_surcharge").isNull())
sdf_all.count()

### Preliminary Feature Engineering
- 'day of week', 'hour of day' and 'month,day' are extracted from 'pickup_datetime'.

- Convert columns "PULocationID" and "DOLocationID" to integers.

- Since the amount of surcharge only depends on the type of vehicle and is considered to have little correlation with trip time, it is replaced by an binary feature 'congestion_zone' which suggests whether a trip passes through the Congestion Zone. 

- Combine the dataset with external weather data based on date.

In [None]:
# extract hour of day and day of week from pickup datetime
sdf_all = sdf_all.withColumn("day_of_week", F.date_format('pickup_datetime', 'EEEE'))\
                 .withColumn("hour_of_day", F.hour(F.col('pickup_datetime')))\
                 .withColumn("month-day", F.date_format('pickup_datetime','MM,dd'))
    
# convert type of data entries from long to int
for field in ('PU', 'DO'):
    field = f'{field}LocationID'
    sdf_all = sdf_all.withColumn(
        field,
        F.col(field).cast('INT')
    )

# replace continous attribute "congestion_surcharge" with binary attribute "congestion_zone" 
sdf_all = sdf_all.withColumn('congestion_zone', (F.col('congestion_surcharge') > 0).cast('BOOLEAN'))
sdf_all = sdf_all.drop("congestion_surcharge")

In [None]:
# read the weather dataset
weather_sdf = spark.read.option("header", "true").csv("../data/raw/weather_data/weather.csv")
weather_sdf.limit(5)

In [None]:
weather_sdf.printSchema()

In [None]:
# convert all columns except "Month,Day" to float
cols = ["Temperature (F)", "Dew Point (F)", "Humidity (%)", "Wind Speed (mph)", "Pressure (in)", "Precipitation (in)"]
for col in cols:
    weather_sdf = weather_sdf.withColumn(col, weather_sdf[col].cast('float'))

weather_sdf.printSchema()

In [None]:
# join the weather dataset with for-hire vehicles trip data
sdf_all = sdf_all.join(weather_sdf, sdf_all["month-day"] == weather_sdf["Month,Day"], 'left')

# example of record after feature engineering
sdf_all.show(1, vertical=True, truncate=100)

### Train Test Split
To avoid data leakage, the dataset is divided into Train (Feb 2019 - May 2019 inclusive) and Test (Jun 2019 - Jul 2019 inclusive) sets.

In [None]:
# train/test split
train_sdf = sdf_all.filter(F.col("pickup_datetime").between('2019-02-01 00:00:00', '2019-5-31 23:59:59'))
test_sdf = sdf_all.filter(F.col("pickup_datetime").between('2019-06-01 00:00:00', '2019-07-31 23:59:59'))

In [None]:
# remove attributes "pickup_datetime" and "month-day" from the both dataframes
train_sdf = train_sdf.drop("pickup_datetime", "month-day", "Month,Day")
test_sdf = test_sdf.drop("pickup_datetime", "month-day", "Month,Day")

print("Number of instances in training set:", train_sdf.count())
print("Number of instances in test set:", test_sdf.count())

In [None]:
# save train/test sets
train_sdf.write.mode('overwrite').parquet('../data/curated/train')
test_sdf.write.mode('overwrite').parquet('../data/curated/test')

### Sampling Data for Visualisations

In [None]:
# sampling data for future visualisations
SAMPLE_SIZE = 0.005
sample_sdf = train_sdf.sample(SAMPLE_SIZE, seed=0)
print("Number of instances in sample dataset:", sample_sdf.count())

In [None]:
# save sample data
sample_sdf.write.mode('overwrite').parquet("../data/curated/sample_data")