In [1]:
# import pyspark to read parquet files
from pyspark.sql import SparkSession
# start a spark session, preprocessing
spark = (
    SparkSession.builder.appName("Project1 preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.debug.maxToStringFields", "100000")
    .config("spark.executor.memory", "8g")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)
# import functions in pyspark for date filtering
from pyspark.sql import functions as F

In [2]:
import pathlib #used to create new folder to store filtered dataset

# set up working directories for Yellow Taxi and High Volume For-Hire Vehicle trip records from 2019.2 
#to 2019.8


########################################################################


# NOTE, Need to replace the working directions of the file

# please do not include back slash at the end

# remove current directory and change it to your directory to the project


directory = "/Users/aobo/Desktop/project_1"



#########################################################################




Ytaxi_raw_train = directory+'/data/raw/ytaxi/train/'
HVFH_raw_train = directory+'/data/raw/HVFH/train/'
Ytaxi_raw_test = directory+'/data/raw/ytaxi/test/'
HVFH_raw_test = directory+'/data/raw/HVFH/test/'

# obtain HVFH and Yellow Taxi's file name to filter each file with that specific month's trip data
# can use os to obtain all file names in the folder, however since names are little, typed names is
# is more efficient

ytaxi_name = ["0219ytaxi.parquet", "0319ytaxi.parquet", "0419ytaxi.parquet", "0519ytaxi.parquet", 
              "0619ytaxi.parquet", "0719ytaxi.parquet", "0819ytaxi.parquet", "0919ytaxi.parquet"]
HVFH_name = ["0219HVFH.parquet", "0319HVFH.parquet", "0419HVFH.parquet", "0519HVFH.parquet",
             "0619HVFH.parquet", "0719HVFH.parquet", "0819HVFH.parquet", "0919HVFH.parquet"]



# create new folder for different taxi type
# seperate data into training and testing 
pathlib.Path(directory+"/data/curated/HVFH/train/").mkdir(parents=True, exist_ok=True)
pathlib.Path(directory+"/data/curated/ytaxi/train/").mkdir(parents=True, exist_ok=True)
pathlib.Path(directory+"/data/curated/HVFH/test/").mkdir(parents=True, exist_ok=True)
pathlib.Path(directory+"/data/curated/ytaxi/test/").mkdir(parents=True, exist_ok=True)

# Create filtered dataset's working directory
filter_ytaxi_train = directory+"/data/curated/ytaxi/train/"
filter_HVFH_train = directory+"/data/curated/HVFH/train/"
filter_ytaxi_test = directory+"/data/curated/ytaxi/test/"
filter_HVFH_test = directory+"/data/curated/HVFH/test/"

# time list, each attribute indicate the start of month
date = ["2019-02-01 00:00:00", "2019-03-01 00:00:00", "2019-04-01 00:00:00", "2019-05-01 00:00:00",
        "2019-06-01 00:00:00", "2019-07-01 00:00:00", "2019-08-01 00:00:00", "2019-09-01 00:00:00",
        "2019-10-01 00:00:00"]

In [3]:
# Jupyter notebook fails to read the files with folder directory, creating [Errno 61] Connection refused
# that needs to restart the kernel, other all other spark.read.parquet fails

In [6]:
# read example New York Weather csv file
NY_weather_2 = spark.read.parquet(directory+"/data/raw/weather/parquet/weather_2_2019.parquet", header=True)


In [7]:
# check if file is correctly read
NY_weather_2.limit(5)

Date,TMid,Weather,day,end_day
2019-02-01,16.5,3,0,2019-02-02 00:00:00
2019-02-02,25.0,0,1,2019-02-03 00:00:00
2019-02-03,39.5,11,1,2019-02-04 00:00:00
2019-02-04,45.0,13,0,2019-02-05 00:00:00
2019-02-05,51.0,13,0,2019-02-06 00:00:00


In [5]:
# read sport event in 2019 New York csv, and check
sport_2 = spark.read.parquet(directory+"/data/raw/sport/parquet/final_sports_2_2019.parquet", header=True)
sport_2.limit(5)


start_time,end_time,event
2019-02-01 18:00:00,2019-02-01 19:30:00,1
2019-02-01 21:30:00,2019-02-01 23:00:00,1
2019-02-02 18:00:00,2019-02-02 19:00:00,1
2019-02-02 20:30:00,2019-02-02 21:30:00,1
2019-02-03 11:00:00,2019-02-03 14:00:00,1


In [3]:
# we are only interested in pickup time, dropoff time, trip distance, passenger count, pick up & 
# drop off location and amount that is paid

HVFH_required_col = ["pickup_datetime", "PULocationID", "DOLocationID", "trip_miles",
                    "trip_time", "total_amount"]
ytaxi_required_col = ["tpep_pickup_datetime", "passenger_count", "trip_distance","RatecodeID",
                      "PULocationID", "DOLocationID", "total_amount", "trip_Time"]

# HVFH data do not contain passenger count, therefore it will not be included

In [19]:
# test read one file
test_0919 = spark.read.parquet(directory+"/data/raw/HVFH/test/0919HVFH.parquet")
test_0919.limit(5)

hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
HV0003,B02764,B02764,2019-09-01 10:03:39,2019-09-01 10:08:57,2019-09-01 10:10:02,2019-09-01 10:17:07,136,169,1.63,426,6.91,0.0,0.0,0.6,0.0,,0.0,5.39,N,N,,N,N
HV0003,B02764,B02764,2019-09-01 10:14:31,2019-09-01 10:20:53,2019-09-01 10:22:37,2019-09-01 10:31:53,169,18,1.6,556,8.48,0.0,0.0,0.73,0.0,,0.0,6.34,N,N,,N,N
HV0003,B02764,B02764,2019-09-01 10:29:21,2019-09-01 10:33:26,2019-09-01 10:36:03,2019-09-01 11:04:01,94,198,18.01,1678,20.65,6.12,0.0,2.32,0.0,,0.0,39.68,Y,N,,N,N
HV0005,B02510,,2019-09-01 10:00:12,,2019-09-01 10:04:04,2019-09-01 10:26:58,114,112,4.834,1374,23.13,2.0,0.63,2.23,2.75,,4.61,16.64,N,N,N,N,N
HV0005,B02510,,2019-09-01 10:27:17,,2019-09-01 10:33:15,2019-09-01 10:41:07,112,112,1.941,472,4.44,0.0,0.11,0.39,0.0,,0.0,0.0,Y,N,N,N,N


In [4]:
from pyspark.sql.functions import col
from pyspark.sql.functions import date_format
from pyspark.sql.functions import *

# this function filters HVFH and Yellow taxi monthly data, join the filtered dataframe with weather and 
# sports data, and export the new dataframe as parquet file

def filter_file(ytaxi_dir, HVFH_dir, num, sport, weather, ytaxi_save_dir, HVFH_save_dir):
    
    # read raw data parquet file
    temp_ytaxi_data = spark.read.parquet(ytaxi_dir+ytaxi_name[num])
    temp_HVFH_data = spark.read.parquet(HVFH_dir+HVFH_name[num])
    
    
    # fill null values in airport_fee with 0
    temp_HVFH_data = temp_HVFH_data.fillna(int(0),"airport_fee")

    # add up the total trip cost for HVFH data
    temp_HVFH_data = temp_HVFH_data.withColumn("total_amount", 
                                               round(F.col("base_passenger_fare").cast("double")
                                                        +F.col("tolls").cast("double")
                                                        +F.col("bcf").cast("double")
                                                        +F.col("sales_tax").cast("double")
                                                        +F.col("airport_fee").cast("double")
                                                        +F.col("congestion_surcharge").cast("double"),3))
    
    # first create column with if each row has valid record, only obtain uber as HVFH data 
    # (dataset is too big for each month)
    
    temp_HVFH_data = temp_HVFH_data.withColumn(
        'is_valid_record',
        # when we have a positive distance/passenger/total amount then True
        # else False
        F.when(
            # trip distance, trip time total cost should be greater than 0, anything less 
            # or equal is not valid
            
            # remove outliers, if trip distance is more than 100 miles, total cost over $300 and trip
            # time over 2 hours
            (F.col('trip_miles') > 0)
            &(F.col('trip_miles') < 100)
            &(F.col('total_amount') > 0)
            &(F.col('total_amount') <300)
            & (F.col('trip_Time') > 0)
            & (F.col('trip_Time') < 7200)
            
            # only select Uber as HVFH data
            &(F.col('hvfhs_license_num') == "HV0003")
            
             # filter pickup time, only pickup date data within the month is valid
            & (F.col('pickup_datetime') >= date[num])
            & (F.col('pickup_datetime') < date[num+1]),
            True
        ).otherwise(False))
        
    # calculate trip time for yellow taxi
    temp_ytaxi_data = temp_ytaxi_data.withColumn("trip_Time", (F.col("tpep_dropoff_datetime").cast("int")
                                                     - F.col("tpep_pickup_datetime").cast("int")))
    
    temp_ytaxi_data = temp_ytaxi_data.withColumn(
        'is_valid_record',
        # when we have a positive distance/passenger/total amount then True
        # else False
        F.when(
            # trip distance, trip time total cost should be greater than 0, anything less 
            # or equal is not valid
            
            # remove outliers, if trip distance is more than 100 miles, total cost over $300 and trip
            # time over 2 hours
            (F.col('trip_distance') > 0)
            &(F.col('total_amount') > 0)
            & (F.col('trip_Time') > 0)
            &(F.col('trip_distance') < 100)
            &(F.col('total_amount') <300)
            & (F.col('trip_Time') < 7200)
            
            # filter pickup time, only pickup date data within the month is valid
            & (F.col('tpep_pickup_datetime') >= date[num])
            & (F.col('tpep_pickup_datetime') < date[num+1]),
            True
        ).otherwise(False))
    
    # First select valid data only, that is no null values in required columns
    temp_ytaxi_data = temp_ytaxi_data.filter(col("is_valid_record")==True)
    temp_HVFH_data = temp_HVFH_data.filter(col("is_valid_record")==True)
    
    
    # filter first to reduce data dimension for joining dataframe
    temp_ytaxi_data = temp_ytaxi_data.select([*ytaxi_required_col])
    temp_HVFH_data = temp_HVFH_data.select([*HVFH_required_col])

    
    # join trip data with weather and sport data
    # join if pickup time is within sport event time, and weather if it's that day
    temp_ytaxi_data = temp_ytaxi_data.join(weather, [weather.end_day>temp_ytaxi_data.tpep_pickup_datetime, 
                           weather.Date<=temp_ytaxi_data.tpep_pickup_datetime], how='full')
    
    temp_ytaxi_data = temp_ytaxi_data.join(sport, [sport.end_time>temp_ytaxi_data.tpep_pickup_datetime, 
                           sport.start_time<=temp_ytaxi_data.tpep_pickup_datetime], how='full')
    
    temp_HVFH_data = temp_HVFH_data.join(weather, [weather.end_day>temp_HVFH_data.pickup_datetime, 
                           weather.Date<=temp_HVFH_data.pickup_datetime], how='full')

    temp_HVFH_data = temp_HVFH_data.join(sport, [sport.end_time>temp_HVFH_data.pickup_datetime, 
                           sport.start_time<=temp_HVFH_data.pickup_datetime], how='full')
    
    # obtain only selected columns
    temp_ytaxi_data = temp_ytaxi_data.select([*ytaxi_required_col,"TMid", "Weather", "day", "event"])
    temp_HVFH_data = temp_HVFH_data.select([*HVFH_required_col,"TMid", "Weather", "day", "event"])
    
    
    
    # fill in event colunm, replace null values with 0
    temp_HVFH_data = temp_HVFH_data.fillna(int(2),"event")
    temp_ytaxi_data = temp_ytaxi_data.fillna(int(2),"event")
    
    
    # exact date of the trip record is not useful now, as events, weather, temperature and day type is 
    # need to have every column in numeric form, change pick_up time from dataframe to int
    
    temp_HVFH_data = temp_HVFH_data.withColumn('pickup_datetime', 
                                               date_format('pickup_datetime', 'HH'))
        
    temp_ytaxi_data = temp_ytaxi_data.withColumn('tpep_pickup_datetime', 
                                                 date_format('tpep_pickup_datetime', 'HH'))

    
    # convert hours to int for future models
    temp_HVFH_data = temp_HVFH_data.withColumn("pickup_datetime", 
                                               col("pickup_datetime").cast("int"))
    temp_ytaxi_data = temp_ytaxi_data.withColumn("tpep_pickup_datetime", 
                                                 col("tpep_pickup_datetime").cast("int"))
    
    
    # remove any null value from the dataset, since dataset is very large, this should not influence
    # the data distribution
    temp_HVFH_data = temp_HVFH_data.dropna('any')
    temp_ytaxi_data = temp_ytaxi_data.dropna('any')
    
    
    # export filtered file
    temp_ytaxi_data.write.mode('overwrite').parquet(ytaxi_save_dir+"final_"+ytaxi_name[num]) 
    temp_HVFH_data.write.mode('overwrite').parquet(HVFH_save_dir+"final_"+HVFH_name[num]) 
    




In [5]:
import os

# sort sports and weather filenames so that when joining dataframes, monthly trip data 
# matches monthly weather and sport event data. 

sport_filename = []
sport_work_dir = directory+"/data/raw/sport/parquet/"
for filename in os.listdir(sport_work_dir):
    if filename[-7:] == "parquet":
        sport_filename.append(filename)
# sort sport filename so that file sequence matches trip data month sequence
sport_filename = sorted(sport_filename)


weather_filename = []
weather_work_dir = directory+"/data/raw/weather/parquet/"
for filename in os.listdir(weather_work_dir):
    if filename[-7:] == "parquet":
        weather_filename.append(filename)
# sort sport filename so that file sequence matches trip data month sequence
weather_filename = sorted(weather_filename)


In [6]:
import time 

# check total time for filtering and merging dataframe
start = time.time()
# now merge all monthly data, trip date + sport data + weather data
# range is len(xx) since 
for i in range(0, len(ytaxi_name)):
    # read sport and weather monthly parquet file
    sport = spark.read.parquet(sport_work_dir+sport_filename[i], header=True)
    weather = spark.read.parquet(weather_work_dir+weather_filename[i], header=True)

    # if i is not len of list, it's a training dataset, use training directory
    # len(ytaxi_name)-1 to mark the last file in filenames
    if i != len(ytaxi_name)-1:
        filter_file(Ytaxi_raw_train, HVFH_raw_train, i, sport, weather, 
                    filter_ytaxi_train, filter_HVFH_train)
    
    else:
        filter_file(Ytaxi_raw_test, HVFH_raw_test, i, sport, weather, 
                    filter_ytaxi_test, filter_HVFH_test)
    
print(time.time() - start)

# takes more than 4 hours to join and write all 8 month's data, on 8GB 4-core i5 8th generation
# Jupyter notebook might create error due to memory usage, if happens, run each month's
# data from below

16247.472387075424


In [7]:
# need to join all training data, as spark cannot read folder of written parquet files (as it's also folder)
# form 


start = time.time()

check = 0

final_ytaxi_directory = directory+"/data/curated/ytaxi/train"
for filename in os.listdir(final_ytaxi_directory):
    # check if file is parquet, and is not the fully converted file (in case)
    # this code is run more than once
    if filename[-7:] == "parquet" and filename[:4] != "full":
        file_directory = final_ytaxi_directory +"/"+filename
        # read file in train set
        file = spark.read.parquet(file_directory)
        if check == 0:
            
            # create a new spark dataframe that contains all read files from training
            # with check == 0 , it means that this is the first file read, therefore there are no
            # files to union with, make the final full data = to the first file
            
            final_ytaxi_train = file
            
            # we are sure there are files to union with
            check = check + 1
            continue
            
        final_ytaxi_train = final_ytaxi_train.union(file)
        
# write the full training file as parquet, same direction as all other files

final_ytaxi_train.write.mode('overwrite').parquet(final_ytaxi_directory+"/full_ytaxi_train.parquet") 
        
print(time.time() - start)


31.114586114883423


In [8]:
# do the same thing with HVFH trip data

start = time.time()
check = 0

final_HVFH_directory = directory+"/data/curated/HVFH/train"
for filename in os.listdir(final_HVFH_directory):
    # check if file is parquet, and is not the fully converted file (in case)
    # this code is run more than once
    if filename[-7:] == "parquet" and filename[:4] != "full":
        file_directory = final_HVFH_directory +"/"+filename
        # read file in train set
        file = spark.read.parquet(file_directory)
        if check == 0:
            
            # create a new spark dataframe that contains all read files from training
            # with check == 0 , it means that this is the first file read, therefore there are no
            # files to union with, make the final full data = to the first file
    
            final_HVFH_train = file
            
            # we are sure there are files to union with
            check = check + 1
            continue
            
        final_HVFH_train = final_HVFH_train.union(file)
        
# write the full training file as parquet, same direction as all other files

final_HVFH_train.write.mode('overwrite').parquet(final_HVFH_directory+"/full_HVFH_train.parquet") 
        
print(time.time() - start)



53.549479961395264


In [5]:
# check if file succesfully written
test_ytaxi = spark.read.parquet(directory+"/data/curated/ytaxi/train/full_ytaxi_train.parquet")
test_ytaxi.limit(5)

tpep_pickup_datetime,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,total_amount,trip_Time,TMid,Weather,day,event
10,1.0,1.5,1.0,145,145,4.3,93,53.0,3,0,2
10,1.0,1.5,1.0,145,145,3.8,4,53.0,3,0,2
10,1.0,0.7,1.0,161,161,8.8,314,53.0,3,0,2
10,1.0,2.0,1.0,163,141,15.3,634,53.0,3,0,2
10,1.0,2.5,1.0,260,56,11.3,577,53.0,3,0,2


In [10]:
# check if file succesfully written
test_HVFH = spark.read.parquet(directory+"/data/curated/HVFH/train/full_HVFH_train.parquet")
test_HVFH.count()

99757143