# Data Preprocessing

In this section, we will attempt to clean the following datasets:
   - TLC Yellow Taxi Trips Record 2021/10-2022/04
   - TLC Green Taxi Trips Record 2021/10-2022/04

<div class="alert alert-block alert-warning"><b>Note:</b>  There may be slight difference in the code and output between this notebook and the preprocessing.py. This notebook serves solely as an exploratory notebook to discover any parts in the data that need to be fixed or discarded. For the final preprocessing steps, please refer to the code in preprocessing.py or the output in EDA.ipynb.</div>

***

## Import Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
import shapefile as shp
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

%matplotlib inline
sns.set(style='whitegrid', palette='pastel', color_codes=True)
sns.mpl.rc('figure', figsize=(10,6))


## Inspect the dataset

In [2]:
# Starting a Spark session
spk = (
    SparkSession.builder.appName('Playground')
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

# Read the parquet dataset
green_df = spk.read.parquet('../data/raw/tlc_data/green')
yellow_df = spk.read.parquet('../data/raw/tlc_data/yellow')

In [7]:
total_instances = green_df.count() + yellow_df.count()
print(f"Number of instances\n  Green cab    : {green_df.count()}")
print(f"  Yellow cab   : {yellow_df.count()}")
print(f"Total instances: {total_instances}\n")
print(f"Number of features\n   Green cab    : {len(green_df.columns)}")
print(f"   Yellow cab   : {len(yellow_df.columns)}")

Number of instances
  Green cab    : 605648
  Yellow cab   : 22821986
Total instances: 23427634

Number of features
   Green cab    : 20
   Yellow cab   : 19


In [9]:
# Make all variable names consistent
column_name = {'VendorID': 'vendor_id', 
               'RatecodeID': 'rate_code_id', 
               'PULocationID': 'pu_location_id',
               'DOLocationID': 'do_location_id'}

for key, value in column_name.items():
    green_df = green_df.withColumnRenamed(key,value)
    yellow_df = yellow_df.withColumnRenamed(key,value)

In [10]:
# Check difference in features between the two data sets
print(f"green_df - yellow_df : {set(green_df.columns) - set(yellow_df.columns)}")
print(f"yellow_df - green_df : {set(yellow_df.columns) - set(green_df.columns)}")

green_df - yellow_df : {'lpep_pickup_datetime', 'trip_type', 'lpep_dropoff_datetime', 'ehail_fee'}
yellow_df - green_df : {'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'airport_fee'}


We can safely remove the `ehail_fee`, `airport_fees` and `trip_type` features because we would not have these features available when we are predicting hourly demand in a region so they are not useful in training our model.

In [11]:
# Drop the excess features
green_df = green_df.drop("trip_type", "ehail_fee")
yellow_df = yellow_df.drop("airport_fee")

# Rename the datetime columns to match each other
green_df = (green_df.withColumnRenamed('lpep_dropoff_datetime', 'tpep_dropoff_datetime')
                    .withColumnRenamed('lpep_pickup_datetime', 'tpep_pickup_datetime'))


In [13]:
# Concatenate both data sets
df = yellow_df.union(green_df)

## Check for outliers in the data based on each feature

In [14]:
# Extract trip duration in minutes
df = df.withColumn('trip_duration_min',
                   (func.col('tpep_dropoff_datetime').cast('long') - 
                    func.col('tpep_pickup_datetime').cast('long')) / 60)

# Extract the date as a standalone feature
df = df.withColumn("pickup_date", 
                   func.to_date(func.col("tpep_pickup_datetime")))

df = df.withColumn("dropoff_date",
                   func.to_date(func.col("tpep_dropoff_datetime")))


In [16]:
# Check earliest and latest of datetime variables
print("tpep_pickup_datetime:\n\tMax:", df.agg({"tpep_pickup_datetime": "max"}).collect()[0][0],
     "\n\tMin:", df.agg({"tpep_pickup_datetime": "min"}).collect()[0][0])

print("\ntpep_dropoff_datetime:\n\tMax:", df.agg({"tpep_dropoff_datetime": "max"}).collect()[0][0],
     "\n\tMin:", df.agg({"tpep_dropoff_datetime": "min"}).collect()[0][0], '\n')

# Check other variables
df.select(['passenger_count',
           'trip_distance',
           'trip_duration_min',
           'fare_amount',
           'extra',
           'mta_tax',
           'tip_amount',
           'tolls_amount',
           'improvement_surcharge',
           'total_amount',
           'congestion_surcharge']).describe().show(vertical=True)


tpep_pickup_datetime:
	Max: 2028-12-07 15:48:15 
	Min: 2003-01-01 11:10:06

tpep_dropoff_datetime:
	Max: 2028-12-07 16:29:13 
	Min: 2003-01-01 11:38:46 

-RECORD 0------------------------------------
 summary               | count               
 passenger_count       | 22525003            
 trip_distance         | 23301155            
 trip_duration_min     | 23427634            
 fare_amount           | 23427634            
 extra                 | 23427634            
 mta_tax               | 23427634            
 tip_amount            | 23427634            
 tolls_amount          | 23427634            
 improvement_surcharge | 23427634            
 total_amount          | 23301155            
 congestion_surcharge  | 22525003            
-RECORD 1------------------------------------
 summary               | mean                
 passenger_count       | 1.410832132728569   
 trip_distance         | 6.272660329069305   
 trip_duration_min     | 16.879362420892072  
 fare_amount      

Despite having retrieved the data from 2021-10 to 2022-04, the dataset still contains records from previous years and future years (??). The numerical features such as `trip_distance`, `fare_amount`, `tolls_amount`, `total_amount`, `trip_duration` also contain unrealistic values which need to be removed.

## Data Cleaning

To clean the code, we need to first consider the minimum possible and logical value for each column:
- `extra`, `mta_tax`, `tip_amount`, `tolls_amount`, `improvement_surcharge`, `congestion_surcharge` and `airport_fee` can take value greater or equal to 0.
- `passenger_count` must take value greater or equal to 1.
- We assume that the minimum `trip_distance` and `trip_duration_min` for a regular ride is atleast 0.5 miles and 1 min respectively.
- Based on the TLC website, the initial fare of each ride is \$2.5 so we assume that minimum `fare_amount` is \$2.5
- We also restrict `pu_location_id` to within the specified zones (1-263)

In [18]:
# Filter based on the minimum possible values for numerical features
df1 = df.where((func.col('passenger_count') > 0) &
               (func.col('trip_distance') > 0.5) &
               (func.col('trip_duration_min') > 1) &
               (func.col('fare_amount') >= 2.50) &
               (func.col('extra') >= 0) &
               (func.col('mta_tax') >= 0) &
               (func.col('tip_amount') >= 0) &
               (func.col('tolls_amount') >= 0) &
               (func.col('improvement_surcharge') >= 0) &
               (func.col('total_amount') >= 0) &
               (func.col('congestion_surcharge') >= 0) &
               (func.col('pu_location_id') >= 1))


Since the dataset is too large, it is not feasible to check the outlier individually for each numerical column. So instead, we looked at the 99.99th percentile of the data and decide if the value is logical and possible.

We also capped the `trip_duration_min` to atmost 5 hours (300 mins).

In [19]:
# Filter the columns to cover 99.99 percentile of the data for numerical features
df1 = df1.where((func.col('fare_amount') <= df1.selectExpr('percentile(fare_amount, 0.9999)').collect()[0][0]) &
                (func.col('trip_distance') <= df1.selectExpr('percentile(trip_distance, 0.9999)').collect()[0][0]) &
                (func.col('tip_amount') <= df1.selectExpr('percentile(tip_amount, 0.9999)').collect()[0][0]) &
                (func.col('total_amount') <= df1.selectExpr('percentile(total_amount, 0.9999)').collect()[0][0]) &
                (func.col('tolls_amount') <= df1.selectExpr('percentile(tolls_amount, 0.9999)').collect()[0][0]) &
                (func.col('trip_duration_min') <= 300) &
                (func.col('pu_location_id') <= 263))


In [20]:
# Filter the period to between 2021-10 to 2022-04
df1 = df1.where((func.col("pickup_date") >= '2021-10-01') & 
                (func.col("dropoff_date") >= '2021-10-01') &
                (func.col("pickup_date") <= '2022-04-30') & 
                (func.col("dropoff_date") <= '2022-04-30'))


In [22]:
# Check the descriptive statistics of all numerical columns
df1.select(['passenger_count',
           'trip_distance',
           'trip_duration_min',
           'fare_amount',
           'extra',
           'mta_tax',
           'tip_amount',
           'tolls_amount',
           'improvement_surcharge',
           'total_amount',
           'congestion_surcharge',
           'pickup_date']).describe()


summary,passenger_count,trip_distance,trip_duration_min,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
count,20131927.0,20131927.0,20131927.0,20131927.0,20131927.0,20131927.0,20131927.0,20131927.0,20131927,20131927.0,20131927.0
mean,1.4474582090427806,3.372818759970744,15.128057352218038,13.816403430730835,1.0308856847136385,0.4978774863429622,2.6037678300741995,0.4541050566115406,0.29998997117181014,20.448662433051897,2.3495472539712665
stddev,0.9885175522956112,4.142501840354908,11.470636373695918,11.63274126021313,1.2378554933189074,0.0332379381415083,2.774715266330236,1.7920122765500728,0.001734518214881...,14.82007017174797,0.5945534599851244
min,1.0,0.51,1.0166666666666666,2.5,0.0,0.0,0.0,0.0,0.0,3.0,0.0
max,9.0,57.3,299.9166666666667,200.0,33.5,17.1,50.0,25.8,0.3,231.85,2.75


In [23]:
# Check period of data
pu_earliest, pu_latest = df1.select(func.min("pickup_date"), func.max("pickup_date")).first()
do_earliest, do_latest = df1.select(func.min("dropoff_date"), func.max("dropoff_date")).first()

print(f"pickup_date: {pu_earliest} - {pu_latest}")
print(f"dropoff_date: {do_earliest} - {do_latest}")


pickup_date: 2021-10-01 - 2022-04-30
dropoff_date: 2021-10-01 - 2022-04-30


The dataset looks much cleaner now and easier to work with. In the following parts, we will begin feature engineering to facilitate our analysis later on.

## Feature Engineering

## Extract hourly pickup and dropoff demand in each location id

We first create columns to record date and hour. Then we group by `pu_location_id`/`do_location_id`, `date` and `hour` before counting the number of instances.

In [24]:
# Create columns for date and hour
df1 = (df1
  .withColumn("pickup_date", func.col("tpep_pickup_datetime").cast("date"))
  .withColumn("pickup_hour", func.hour(func.col("tpep_pickup_datetime")))
  .withColumn("dropoff_date", func.col("tpep_dropoff_datetime").cast("date"))
  .withColumn("dropoff_hour", func.hour(func.col("tpep_dropoff_datetime")))
)


In [25]:
# Aggregate and count number of hourly instances in each location id
pickup_hourly_demand = df1.groupBy("pu_location_id", "pickup_date", "pickup_hour").count()
dropoff_hourly_demand = df1.groupBy("do_location_id", "dropoff_date", "dropoff_hour").count()


In [26]:
# Aggregate and count number of daily instances in each location id
pickup_daily_demand = df1.groupBy("pu_location_id", "pickup_date").count()
dropoff_daily_demand = df1.groupBy("do_location_id", "dropoff_date").count()


In [31]:
pickup_hourly_demand

pu_location_id,pickup_date,pickup_hour,count
48.0,2022-03-02,22,208
138.0,2022-03-06,23,244
50.0,2022-03-07,13,43
13.0,2022-03-11,11,35
263.0,2022-03-13,7,36
132.0,2022-03-29,4,37
162.0,2022-03-01,7,139
97.0,2022-03-01,7,1
177.0,2022-03-01,8,1
95.0,2022-03-01,10,1


In [27]:
pickup_daily_demand.describe()


summary,pu_location_id,count
count,40666.0,40666.0
mean,134.81751340185906,495.055500909851
stddev,75.9882553142365,1027.335342503522
min,1.0,1.0
max,263.0,6907.0


In [28]:
dropoff_daily_demand.describe()


summary,do_location_id,count
count,52317.0,52317.0
mean,133.81677083930654,384.8066020605157
stddev,76.92941804905843,800.8124201636705
min,1.0,1.0
max,265.0,5969.0


It appears that the average daily pickup demand is about 494 with maximum number of pickup per hour at 6907. However, since demand is strongly based on `LocationID` which explains the large standard deviation at 1023.1, we need to examine the demand statistics based on individual location to make a legitimate inference. This will be done in EDA.ipynb.