In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import unix_timestamp, hour, to_date, dayofweek, when
import pandas as pd


In [2]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("tlc_preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/08/21 14:38:40 WARN Utils: Your hostname, MinhVu resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/08/21 14:38:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/21 14:38:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/21 14:38:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Using lower case convention for the columns' names

In [3]:
def rename(year, month):
    INPUT_PATH = '../data/landing/'
    month = str(month).zfill(2)
    sdf = spark.read.parquet(f'{INPUT_PATH}{year}-{month}.parquet')
    lower_cols = [c.lower() for c in sdf.columns]
    new_cols = dict(zip(sdf.columns, lower_cols))
    for key, value in new_cols.items():
        sdf = sdf.withColumnRenamed(key, value)
    return sdf

In [4]:
data = {2023: [12], 2024: [1,2,3,4,5]}
sdfs = {}

for year in data.keys():
    for month in data[year]:
        month = str(month).zfill(2)
        print(f"Begin rename {year}-{month}")
        sdfs[f'sdf_{year}_{month}'] = rename(year, month)
        print(f"End rename {year}-{month}")        

Begin rename 2023-12


                                                                                

End rename 2023-12
Begin rename 2024-01
End rename 2024-01
Begin rename 2024-02
End rename 2024-02
Begin rename 2024-03
End rename 2024-03
Begin rename 2024-04
End rename 2024-04
Begin rename 2024-05
End rename 2024-05


#### Checking stuffs related to the original data (before cleaning + feature selection)

In [5]:
# types
sdfs['sdf_2023_12'].printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- pulocationid: integer (nullable = true)
 |-- dolocationid: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_

In [6]:
# Total records before cleaning
total = 0
for sdf in sdfs.values():
    total += sdf.count()
total    

                                                                                

121257739

### Outliers detecting and Feature selections

#### 1. Removing records based on the range of the field 

1. '2023-12-01' <= request_datetime <= on_scene_datetime <= pickup_datetime < dropoff_datetime < '2024-06-01'
2. 1 <= pulocationid, dolocationid <= 263
3. 0.1 <= trip_miles <= 360
4. 0 <= trip_time <= 8*3600
5. 7 <= base_passenger_fare
6. 0 <= tolls, bcf, sales_tax, congestion_surcharge, airport_fee, tips, driver_pay
7. Others only contain Y or N

In [9]:
def range_remove(sdf):
    sdf = sdf.where(
        (F.col('hvfhs_license_num') == 'HV0003') &
        (F.col('request_datetime') >= '2023-12-01') &
        (F.col('request_datetime') <= F.col('on_scene_datetime')) &
        (F.col('on_scene_datetime') <= F.col('pickup_datetime')) &
        (F.col('pickup_datetime') < F.col('dropoff_datetime')) &
        (F.col('dropoff_datetime') < '2024-06-01') &
        (F.col('pulocationid').between(1, 263)) &
        (F.col('dolocationid').between(1, 263)) & 
        (F.col('trip_miles').between(0.1, 360)) &
        (F.col('trip_time').between(0, 28800)) &
        (F.col('base_passenger_fare') >= 7) &
        (F.col('tolls') >= 0) &
        (F.col('bcf') >= 0) &
        (F.col('sales_tax') >= 0) &
        (F.col('congestion_surcharge') >= 0) &
        (F.col('tips') >= 0) &
        (F.col('driver_pay') >= 0) &
        (F.col('airport_fee') % 2.5 == 0) &
        (F.col('shared_request_flag').isin('Y', 'N')) &
        (F.col('shared_match_flag').isin('Y', 'N')) &
        (F.col('access_a_ride_flag').isin('Y', 'N')) &
        (F.col('wav_request_flag').isin('Y', 'N')) &
        (F.col('wav_match_flag').isin('Y', 'N'))
    )
    return sdf

In [10]:
for key, sdf in sdfs.items():
    print(f'Begin {key}')
    sdfs[key] = range_remove(sdf)
    print(f'End {key}\n')
    

Begin sdf_2023_12
End sdf_2023_12

Begin sdf_2024_01
End sdf_2024_01

Begin sdf_2024_02
End sdf_2024_02

Begin sdf_2024_03
End sdf_2024_03

Begin sdf_2024_04
End sdf_2024_04

Begin sdf_2024_05
End sdf_2024_05



#### Keep data in 0.1st to 99.9th approx percentile

In [13]:
def quantile_remove(sdf):
    q_miles = sdf.approxQuantile("trip_miles", [0.001, 0.999], 0.0001)
    q_time = sdf.approxQuantile("trip_time", [0.001, 0.999], 0.0001)
    q_base_passenger_fare = sdf.approxQuantile("base_passenger_fare", [0.001, 0.999], 0.0001)
    q_tolls = sdf.approxQuantile("tolls", [0.001, 0.999], 0.0001)
    q_bcf = sdf.approxQuantile("bcf", [0.001, 0.999], 0.0001)
    q_sales_tax = sdf.approxQuantile("sales_tax", [0.001, 0.999], 0.0001)
    q_congestion_surcharge = sdf.approxQuantile("congestion_surcharge", [0.001, 0.999], 0.0001)
    q_tips = sdf.approxQuantile("tips", [0.001, 0.999], 0.0001)
    q_driver_pay = sdf.approxQuantile("driver_pay", [0.001, 0.999], 0.0001)
    sdf = sdf.where(
        (F.col('trip_miles').between(q_miles[0], q_miles[1])) &
        (F.col('trip_time').between(q_time[0], q_time[1])) &
        (F.col('base_passenger_fare').between(q_base_passenger_fare[0], q_base_passenger_fare[1])) &
        (F.col('tolls').between(q_tolls[0], q_tolls[1])) &
        (F.col('bcf').between(q_bcf[0], q_bcf[1])) &
        (F.col('sales_tax').between(q_sales_tax[0], q_sales_tax[1])) &
        (F.col('congestion_surcharge').between(q_congestion_surcharge[0], q_congestion_surcharge[1])) &
        (F.col('tips').between(q_tips[0], q_tips[1])) &
        (F.col('driver_pay').between(q_driver_pay[0], q_driver_pay[1]))
    )
    return sdf

In [14]:
for key, sdf in sdfs.items():
    print(f'Begin {key}')
    sdfs[key] = quantile_remove(sdf)
    print(f'End {key}\n')
    

Begin sdf_2023_12


                                                                                

End sdf_2023_12

Begin sdf_2024_01


                                                                                

End sdf_2024_01

Begin sdf_2024_02


                                                                                

End sdf_2024_02

Begin sdf_2024_03


                                                                                

End sdf_2024_03

Begin sdf_2024_04


                                                                                

End sdf_2024_04

Begin sdf_2024_05




End sdf_2024_05



                                                                                

In [15]:
# Total records after cleaning
total_c = 0
for sdf in sdfs.values():
    total_c += sdf.count()

print(f"Number of records before cleaning: {total}")   
print(f"Number of records after cleaning: {total_c}")
print(f"There are {total_c/total*100: .4f}% of the original data kept")



Number of records before cleaning: 121257739
Number of records after cleaning: 82592829
There are  68.1134% of the original data kept


                                                                                

#### 2. Splitting datetime into date and hour

In [18]:
def date_hour(sdf):
    sdf = sdf.withColumn('day_of_week', dayofweek(sdf['request_datetime']))
    sdf = sdf.withColumn('hour', hour(sdf['request_datetime']))
    sdf = sdf.withColumn('is_weekend', when(sdf['day_of_week'].between(2, 6), 0).otherwise(1))
    sdf = sdf.withColumn('date', to_date(sdf['request_datetime']))
    return sdf

In [19]:
for key, sdf in sdfs.items():
    sdfs[key] = date_hour(sdf)

In [20]:
sdf_train = sdfs['sdf_2023_12']
sdf_train = sdf_train.union(sdfs['sdf_2024_01'])
sdf_train = sdf_train.union(sdfs['sdf_2024_02'])
sdf_train = sdf_train.union(sdfs['sdf_2024_03'])
sdf_train = sdf_train.union(sdfs['sdf_2024_04'])

sdf_test = sdfs['sdf_2024_05']

In [21]:
weather = spark.createDataFrame(pd.read_csv('../data/curated/weather_processed.csv'))
weather = weather.select('wnd', 'tmp', 'dew', 'slp', 'date', 'hour')


#### Join with weather data and store

In [22]:
train_data = sdf_train.join(weather, ["date", "hour"], "inner")
test_data = sdf_test.join(weather, ["date", "hour"], "inner")

In [23]:
train_data \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/train_data')
    
test_data \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/test_data')

24/08/21 14:56:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [24]:
spark.stop()