This notebook preprocesses the following raw data for the period Dec 2019 to Feb 2020 and Dec 2021 to Feb 2022:
- NYC Yellow Taxi Data 
- NYC For Hire Vehicles (FHV) Data
- NYC COVID-19 Data (Dec 2021-Feb 2022 only)

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
pd.options.mode.chained_assignment = None

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)

22/08/09 14:15:25 WARN Utils: Your hostname, kelman_HP_ENVY resolves to a loopback address: 127.0.1.1; using 172.26.243.108 instead (on interface eth0)
22/08/09 14:15:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/09 14:15:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
from urllib.request import urlretrieve
import os

output_relative_dir = '../data/raw/'

if not os.path.exists(output_relative_dir):
    os.makedirs(output_relative_dir)

if not os.path.exists(output_relative_dir + 'NYC TLC Data'):
    os.makedirs(output_relative_dir + 'NYC TLC Data')

if not os.path.exists(output_relative_dir + "External Data"):
    os.makedirs(output_relative_dir + "External Data")

In [3]:
URL_TEMPLATE = "https://d37ci6vzurychx.cloudfront.net/trip-data/"

def tlc_data_download(name, year, month):
    
    month = str(month).zfill(2)

    output_dir = f"{name}_tripdata_{year}-{month}.parquet"
    url =f'{URL_TEMPLATE}{output_dir}'

    urlretrieve(url, f"{output_relative_dir}NYC TLC Data/{output_dir}")
    print(f"Completed downloading {output_dir}")

for name in ('yellow', 'fhvhv'):
    tlc_data_download(name, 2019, 12)
    tlc_data_download(name, 2020, 1)
    tlc_data_download(name, 2020, 2)
    tlc_data_download(name, 2021, 12)
    tlc_data_download(name, 2022, 1)
    tlc_data_download(name, 2022, 2)

Completed downloading yellow_tripdata_2019-12.parquet
Completed downloading yellow_tripdata_2020-01.parquet
Completed downloading yellow_tripdata_2020-02.parquet
Completed downloading yellow_tripdata_2021-12.parquet
Completed downloading yellow_tripdata_2022-01.parquet
Completed downloading yellow_tripdata_2022-02.parquet
Completed downloading fhvhv_tripdata_2019-12.parquet
Completed downloading fhvhv_tripdata_2020-01.parquet
Completed downloading fhvhv_tripdata_2020-02.parquet
Completed downloading fhvhv_tripdata_2021-12.parquet
Completed downloading fhvhv_tripdata_2022-01.parquet
Completed downloading fhvhv_tripdata_2022-02.parquet


In [9]:
covid_url = 'https://data.cityofnewyork.us/api/views/rc75-m7u3/rows.csv?accessType=DOWNLOAD'
covid_output_dir = 'COVID-19_Daily_Counts_of_Cases__Hospitalizations__and_Deaths.csv'
urlretrieve(covid_url, f"{output_relative_dir}External Data/{covid_output_dir}")
print(f"Completed downloading {covid_output_dir}")

Completed downloading COVID-19_Daily_Counts_of_Cases__Hospitalizations__and_Deaths.csv


<h2><u>Preprocess TLC Data</u></h2>

In [2]:
# load yellow taxi data
yellow_2019_12_sdf = spark.read.parquet('../data/raw/NYC TLC Data/yellow_tripdata_2019-12.parquet', header=True)
yellow_2020_01_sdf = spark.read.parquet('../data/raw/NYC TLC Data/yellow_tripdata_2020-01.parquet', header=True)
yellow_2020_02_sdf = spark.read.parquet('../data/raw/NYC TLC Data/yellow_tripdata_2020-02.parquet', header=True)

yellow_2021_12_sdf = spark.read.parquet('../data/raw/NYC TLC Data/yellow_tripdata_2021-12.parquet', header=True)
yellow_2022_01_sdf = spark.read.parquet('../data/raw/NYC TLC Data/yellow_tripdata_2022-01.parquet', header=True)
yellow_2022_02_sdf = spark.read.parquet('../data/raw/NYC TLC Data/yellow_tripdata_2022-02.parquet', header=True)

# load FHV data
fhvhv_2019_12_sdf = spark.read.parquet('../data/raw/NYC TLC Data/fhvhv_tripdata_2019-12.parquet', header=True)
fhvhv_2020_01_sdf = spark.read.parquet('../data/raw/NYC TLC Data/fhvhv_tripdata_2020-01.parquet', header=True)
fhvhv_2020_02_sdf = spark.read.parquet('../data/raw/NYC TLC Data/fhvhv_tripdata_2020-02.parquet', header=True)

fhvhv_2021_12_sdf = spark.read.parquet('../data/raw/NYC TLC Data/fhvhv_tripdata_2021-12.parquet', header=True)
fhvhv_2022_01_sdf = spark.read.parquet('../data/raw/NYC TLC Data/fhvhv_tripdata_2022-01.parquet', header=True)
fhvhv_2022_02_sdf = spark.read.parquet('../data/raw/NYC TLC Data/fhvhv_tripdata_2022-02.parquet', header=True)

# merge datasets categorised by type and period
yellow_19_20_sdf = yellow_2019_12_sdf.unionAll(yellow_2020_01_sdf).unionAll(yellow_2020_02_sdf)
yellow_21_22_sdf = yellow_2021_12_sdf.unionAll(yellow_2022_01_sdf).unionAll(yellow_2022_02_sdf)

fhvhv_19_20_sdf = fhvhv_2019_12_sdf.unionAll(fhvhv_2020_01_sdf).unionAll(fhvhv_2020_02_sdf)
fhvhv_21_22_sdf = fhvhv_2021_12_sdf.unionAll(fhvhv_2022_01_sdf).unionAll(fhvhv_2022_02_sdf)

                                                                                

<h3>Feature engineering</h3>

**Yellow Taxis**

In [3]:
def yellow_taxi_feature_eng(data_sdf):
    # trip duration in minutes
    data_sdf_eng = data_sdf.withColumn(
        'trip_time',
        (F.col('tpep_dropoff_datetime').cast('long') - F.col('tpep_pickup_datetime').cast('long'))/60
    )

    # pickup day is weekend or not
    data_sdf_eng = data_sdf_eng.withColumn(
        'is_weekend',
        F.dayofweek(F.col('tpep_pickup_datetime')).isin([1, 7])
    )

    # tip percentage over total amount paid
    data_sdf_eng = data_sdf_eng.withColumn(
        'tip_percent',
        (F.col('tip_amount') / F.col('total_amount')) * 100
    )

    return data_sdf_eng

yellow_19_20_sdf_eng = yellow_taxi_feature_eng(yellow_19_20_sdf)
yellow_21_22_sdf_eng = yellow_taxi_feature_eng(yellow_21_22_sdf)


**FHV**

In [4]:
def fhvhv_feature_eng(data_sdf):
    # trip duration in mniutes
    data_sdf_eng = data_sdf.withColumn(
        'trip_time',
        F.col('trip_time')/60
    )

    # fill null values in airport_fee with 0
    data_sdf_eng = data_sdf_eng.na.fill(value=0, subset=['airport_fee'])

    # total amount paid by passenger
    data_sdf_eng = data_sdf_eng.withColumn(
        'total_amount',
        F.col('base_passenger_fare') + F.col('tolls') + F.col('sales_tax') + F.col('tips') + F.col('airport_fee')
    )

    # pickup day is weekend or not
    data_sdf_eng = data_sdf_eng.withColumn(
        'is_weekend',
        F.dayofweek(F.col('pickup_datetime')).isin([1, 7])
    )

    # tip percentage over total amount paid
    data_sdf_eng = data_sdf_eng.withColumn(
        'tip_percent',
        (F.col('tips') / F.col('total_amount')) * 100
    )

    return data_sdf_eng

fhvhv_19_20_sdf_eng = fhvhv_feature_eng(fhvhv_19_20_sdf)
fhvhv_21_22_sdf_eng = fhvhv_feature_eng(fhvhv_21_22_sdf)

<h3>Outlier Detection</h3>

In [5]:
trip_time_clean_mask = (F.col('trip_time') > 0) & (F.col('trip_time') < 300)
location_id_clean_mask = (((F.col('PULocationID') >= 1) & (F.col('PULocationID') <= 263))
                            | ((F.col('DOLocationID') >= 1) & (F.col('DOLocationID') <= 263)))

**Yellow Taxis**

In [14]:
print("Data sizes for Yellow Taxis 2019-2020")
print("Original data size: ", yellow_19_20_sdf_eng.count())
print("Trip time cleaned size: ", yellow_19_20_sdf_eng.where(trip_time_clean_mask).count())
print("Location ID cleaned size: ", yellow_19_20_sdf_eng.where(location_id_clean_mask).count())

Data sizes for Yellow Taxis 2019-2020
Original data size:  19600692


                                                                                

Trip time cleaned size:  19543652




Location ID cleaned size:  19506018


                                                                                

In [15]:
print("Data sizes for Yellow Taxis 2021-2022")
print("Original data size: ", yellow_21_22_sdf_eng.count())
print("Trip time cleaned size: ", yellow_21_22_sdf_eng.where(trip_time_clean_mask).count())
print("Location ID cleaned size: ", yellow_21_22_sdf_eng.where(location_id_clean_mask).count())

Data sizes for Yellow Taxis 2021-2022
Original data size:  8657731
Trip time cleaned size:  8638019
Location ID cleaned size:  8615868


In [6]:
yellow_19_20_sdf_cleaned = yellow_19_20_sdf_eng.where(trip_time_clean_mask)
yellow_19_20_sdf_cleaned = yellow_19_20_sdf_cleaned.where(location_id_clean_mask)

yellow_21_22_sdf_cleaned = yellow_21_22_sdf_eng.where(trip_time_clean_mask)
yellow_21_22_sdf_cleaned = yellow_21_22_sdf_cleaned.where(location_id_clean_mask)
                                                            
print("Cleaned data size for Yellow Taxi 2019-2020:", yellow_19_20_sdf_cleaned.count())   
print("Cleaned data size for Yellow Taxi 2021-2022:", yellow_21_22_sdf_cleaned.count())   

                                                                                

Cleaned data size for Yellow Taxi 2019-2020: 19450894
Cleaned data size for Yellow Taxi 2021-2022: 8596662


                                                                                

**FHV**

In [17]:
print("Data sizes for FHVHV Trips 2019-2020")
print("Original data size: ", fhvhv_19_20_sdf_eng.count())
print("Trip time cleaned size: ", fhvhv_19_20_sdf_eng.where(trip_time_clean_mask).count())
print("Location ID cleaned size: ", fhvhv_19_20_sdf_eng.where(location_id_clean_mask).count())

Data sizes for FHVHV Trips 2019-2020
Original data size:  64538369


                                                                                

Trip time cleaned size:  64536919




Location ID cleaned size:  64536983


                                                                                

In [119]:
print("Data sizes for FHVHV Trips 2021-2022")
print("Original data size: ", fhvhv_21_22_sdf_eng.count())
print("Trip time cleaned size: ", fhvhv_21_22_sdf_eng.where(trip_time_clean_mask).count())
print("Location ID cleaned size: ", fhvhv_21_22_sdf_eng.where(location_id_clean_mask).count())

Data sizes for FHVHV Trips 2021-2022
Original data size:  46825369


                                                                                

Trip time cleaned size:  46824989




Location ID cleaned size:  46824554


                                                                                

In [7]:
fhvhv_19_20_sdf_cleaned = fhvhv_19_20_sdf_eng.where(trip_time_clean_mask)
fhvhv_19_20_sdf_cleaned = fhvhv_19_20_sdf_cleaned.where(location_id_clean_mask)

fhvhv_21_22_sdf_cleaned = fhvhv_21_22_sdf_eng.where(trip_time_clean_mask)
fhvhv_21_22_sdf_cleaned = fhvhv_21_22_sdf_cleaned.where(location_id_clean_mask)
                                                            
print("Cleaned data size for FHVHV Trips 2019-2020:", yellow_19_20_sdf_cleaned.count())   
print("Cleaned data size for FHVHV Trips 2021-2022:", fhvhv_21_22_sdf_cleaned.count())   

                                                                                

Cleaned data size for FHVHV Trips 2019-2020: 19450894




Cleaned data size for FHVHV Trips 2021-2022: 46824174


                                                                                

<h3>Feature selection</h3>

In [12]:
drop_features_taxi = ['VendorID', 'passenger_count', 'trip_distance', 'RatecodeID', 
                'store_and_fwd_flag', 'payment_type', 'fare_amount', 'extra',
                'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
                'congestion_surcharge', 'airport_fee', 'trip_time']

yellow_19_20_sdf_final = yellow_19_20_sdf_cleaned.drop(*drop_features_taxi)
yellow_21_22_sdf_final = fhvhv_21_22_sdf_cleaned.drop(*drop_features_taxi)

                                                                                

46824174

In [9]:
yellow_19_20_sdf_final.write.mode('overwrite').parquet('../data/curated/yellow_19-20_curated')

                                                                                

In [13]:
yellow_21_22_sdf_final.write.mode('overwrite').parquet('../data/curated/yellow_21-22_curated')

                                                                                

In [16]:
drop_features_fhvhv = ['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num',
                        'request_datetime', 'on_scene_datetime', 'trip_miles', 'trip_time',
                        'base_passenger_fare', 'tolls', 'bcf', 'sales_tax', 'congestion_surcharge',
                        'airport_fee', 'tips', 'driver_pay', 'shared_request_flag', 'shared_match_flag',
                        'access_a_ride_flag', 'wav_request_flag', 'wav_match_flag']

fhvhv_19_20_sdf_final = fhvhv_19_20_sdf_cleaned.drop(*drop_features_fhvhv)
fhvhv_21_22_sdf_final = fhvhv_21_22_sdf_cleaned.drop(*drop_features_fhvhv)

                                                                                

46824174

In [17]:
fhvhv_19_20_sdf_final.write.mode('overwrite').parquet('../data/curated/fhvhv_19-20_curated')

                                                                                

In [18]:
fhvhv_21_22_sdf_final.write.mode('overwrite').parquet('../data/curated/fhvhv_21-22_curated')

                                                                                

<h2><u>Preprocess COVID Data</u></h2>

In [11]:
# load COVID-19 Daily Counts data
covid_df = pd.read_csv('../data/raw/External Data/COVID-19_Daily_Counts_of_Cases__Hospitalizations__and_Deaths.csv')

# select necessary features
covid_df_selected = covid_df[['date_of_interest', 'CASE_COUNT', 'HOSPITALIZED_COUNT', 'DEATH_COUNT']]

# select required data between dates 2021-12-01 and 2022-02-28
covid_df_selected['date_of_interest'] = pd.to_datetime(covid_df_selected['date_of_interest'], format='%m/%d/%Y')
mask = (covid_df_selected['date_of_interest'] >= '2021-12-01') & (covid_df_selected['date_of_interest'] <= '2022-2-28')
covid_df_selected = covid_df_selected.loc[mask]

covid_df_selected.to_parquet('../data/curated/nyc_covid_2021-2022_clean.parquet')