In [1]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034_Yellow_Taxi_Preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

22/08/17 12:29:36 WARN Utils: Your hostname, DESKTOP-85B961I resolves to a loopback address: 127.0.1.1; using 172.17.43.247 instead (on interface eth0)
22/08/17 12:29:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/17 12:29:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sdf_full = spark.read.parquet('../data/raw/tlc_data/tlc_yellow_data/2019*')
sdf_full.show(1, vertical=True, truncate=100)

[Stage 1:>                                                          (0 + 1) / 1]

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2019-03-01 11:24:41 
 tpep_dropoff_datetime | 2019-03-01 11:25:31 
 passenger_count       | 1.0                 
 trip_distance         | 0.0                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 145                 
 DOLocationID          | 145                 
 payment_type          | 2                   
 fare_amount           | 2.5                 
 extra                 | 0.5                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 3.8                 
 congestion_surcharge  | 0.0                 
 airport_fee           | null                
only showing top 1 row



                                                                                

In [4]:
sdf_full.count()

                                                                                

84598444

In [5]:
# for working a sub sample is used
sdf = sdf_full.sample(fraction=0.025, seed=0)

In [6]:
sdf.count()

                                                                                

2113986

In [7]:
sdf.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [8]:
from pyspark.sql import functions as F

In [9]:
# not sure about extra + mta tax + tolls_amount + imrpovement surcharge
#retained_features = {"RatecodeID", "PULocationID", "DOLocationID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "payment_type", "fare_amount", "tip_amount", #"congestion_surcharge", "extra", "trip_distance"}

# all features retained except for total_amount
retained_features = {"VendorID", "tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance","RatecodeID","store_and_fwd_flag","PULocationID","DOLocationID","payment_type","fare_amount","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge","congestion_surcharge"}

integer_features = {"VendorID", "RatecodeID", "PULocationID", "DOLocationID", "payment_type"}
time_features = {"tpep_pickup_datetime", "tpep_dropoff_datetime"}
boolean_features = {"store_and_fwd_flag"}
non_categorical_features = retained_features - integer_features

In [10]:
# Cast to correct types
#for field in ("VendorID", "PULocationID", "DOLocationID", "passenger_count", "RatecodeID", "payment_type"):
#    sdf = sdf.withColumn(
#        field,
##        F.col(field).cast('INT')
 #   )

# cast relevant attributes to int
for field in integer_features:
    sdf = sdf.withColumn(
        field,
        F.col(field).cast('INT')
    )

# convert store_and_fwd_flag to boolean
sdf = sdf.withColumn(
    'store_and_fwd_flag',
    (F.col("store_and_fwd_flag") == 'Y').cast('BOOLEAN')
)



In [11]:
sdf.describe(*non_categorical_features).show()



+-------+------------------+--------------------+------------------+------------------+-----------------+------------------+-----------------+---------------------+-------------------+
|summary|             extra|congestion_surcharge|           mta_tax|   passenger_count|       tip_amount|     trip_distance|      fare_amount|improvement_surcharge|       tolls_amount|
+-------+------------------+--------------------+------------------+------------------+-----------------+------------------+-----------------+---------------------+-------------------+
|  count|           2113986|             1981211|           2113986|           2102726|          2113986|           2113986|          2113986|              2113986|            2113986|
|   mean|1.0907474174379583|  2.1939951120804397|0.4941986465378673|1.5621084249683506|2.192062042984222|3.0210200114853976|13.36594043669184|  0.29859317895276305|0.38923132414314887|
| stddev|1.2493076618343268|  0.8307012172968692|0.0620966689124669|1.20727

                                                                                

# Findings #
* Fare amount has outliers
* congestion surcharge shouldnt be negative
* extra shouldnt be negative
* passenger count should be at most 7
* tip amount should be positive only
* trip distance should be positive

* VendorID should be 1 or 2, contains 4,5...
* RatecodeID should be 1:6 -> contains nulls + 99's?
* PULocation/DOLocationID's should be between 1-263
* Payment type should be 1 or 2

In [19]:
for x in integer_features:
    sdf.groupBy(x).count().sort(x).show()

                                                                                

+------------+-------+
|payment_type|  count|
+------------+-------+
|           0|  44578|
|           1|6060970|
|           2|2289811|
|           3|  44738|
|           4|  18624|
|           5|      3|
+------------+-------+



                                                                                

+------------+-----+
|PULocationID|count|
+------------+-----+
|           1|  858|
|           2|   13|
|           3|  248|
|           4|14498|
|           5|   45|
|           6|   21|
|           7|11756|
|           8|  119|
|           9|  187|
|          10| 3171|
|          11|  179|
|          12| 3687|
|          13|73320|
|          14|  918|
|          15|  190|
|          16|  293|
|          17| 1714|
|          18|  320|
|          19|  230|
|          20|  212|
+------------+-----+
only showing top 20 rows



                                                                                

+------------+-----+
|DOLocationID|count|
+------------+-----+
|           1|17647|
|           2|   10|
|           3|  734|
|           4|36530|
|           5|   95|
|           6|  146|
|           7|31977|
|           8|  162|
|           9|  831|
|          10| 6384|
|          11|  679|
|          12| 5372|
|          13|78176|
|          14| 6658|
|          15|  911|
|          16| 1465|
|          17|10415|
|          18| 1485|
|          19|  783|
|          20|  887|
+------------+-----+
only showing top 20 rows



                                                                                

+----------+-------+
|RatecodeID|  count|
+----------+-------+
|      null|  44578|
|         1|8115236|
|         2| 223798|
|         3|  19149|
|         4|   6595|
|         5|  48915|
|         6|     55|
|        99|    398|
+----------+-------+





+--------+-------+
|VendorID|  count|
+--------+-------+
|       1|3036220|
|       2|5395640|
|       4|  26842|
|       5|     22|
+--------+-------+



                                                                                

In [20]:
print(integer_features)
print(non_categorical_features)
sdf.count()

{'payment_type', 'PULocationID', 'DOLocationID', 'RatecodeID', 'VendorID'}
{'tpep_pickup_datetime', 'extra', 'tolls_amount', 'store_and_fwd_flag', 'mta_tax', 'congestion_surcharge', 'tip_amount', 'improvement_surcharge', 'passenger_count', 'fare_amount', 'trip_distance', 'tpep_dropoff_datetime'}


8458724

TypeError: when() missing 1 required positional argument: 'value'

In [13]:
# Filter out bad records
sdf = sdf.withColumn(
    'is_valid_record',
    F.when(
        (F.col('trip_distance') > 0)
        & (F.col('fare_amount') > 0)
        & ((F.col('PULocationID') >= 1) & (F.col('PULocationID') <= 263))
        & ((F.col('DOLocationID') >= 1) & (F.col('DOLocationID') <= 263))
        & (F.col('congestion_surcharge') >= 0)
        & (F.col('extra') >= 0)
        & (F.col('passenger_count') <= 6)
        & (F.col('tip_amount') >= 0)
        & ((F.col('VendorID') <=2) & (F.col('VendorID') >=1))
        & ((F.col('payment_type') <= 2) & (F.col('payment_type') >= 1))
        & ((F.col('RatecodeID') >= 1) & (F.col('RatecodeID') <= 6))
        & (F.col('tolls_amount') >= 0)
        & (F.col('tolls_amount') <= 30)
        & (F.col('mta_tax') >= 0)
        & (F.col('tip_amount') >= 0)
        & (F.col('improvement_surcharge') >= 0)
        & (F.col('total_amount') > 0),
        True
    ).otherwise(False)
)

In [14]:
sdf.groupBy("is_valid_record").count().show()



+---------------+-------+
|is_valid_record|  count|
+---------------+-------+
|           true|1923409|
|          false| 190577|
+---------------+-------+



                                                                                

In [52]:
sdf = sdf.filter(sdf.is_valid_record == True)

In [53]:
sdf.count()

                                                                                

7696464

In [50]:
# inspect tolls_amount
# investigate -> tolls amount, extra, mta_tax, congestion surcharge, tip_amount, improvement_surcharge, trip_distance

sdf.sort(sdf.trip_distance.desc()).limit(20)

                                                                                

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,is_valid_record
1,2019-05-16 03:22:11,2019-05-16 03:33:40,1.0,401.4,1,False,152,74,1,8.5,1.0,0.5,1.55,0.0,0.3,11.85,0.0,,True
1,2019-07-17 23:42:23,2019-07-18 00:15:25,1.0,307.5,1,False,161,138,1,28.5,2.5,0.5,5.0,0.0,0.3,36.8,2.5,,True
2,2019-04-05 19:49:23,2019-04-06 02:19:54,1.0,162.77,4,False,229,233,2,907.0,0.0,0.5,0.0,12.24,0.3,922.54,2.5,,True
2,2019-10-14 23:40:33,2019-10-15 05:17:49,1.0,133.52,5,False,50,48,1,300.0,0.0,0.0,83.0,12.24,0.3,398.04,2.5,,True
1,2019-03-16 15:16:16,2019-03-16 17:49:31,1.0,99.2,5,False,179,138,2,310.0,0.0,0.0,0.0,0.0,0.3,310.3,0.0,,True
1,2019-08-23 12:09:08,2019-08-23 15:41:05,1.0,97.4,4,False,68,68,1,514.5,3.0,0.5,0.0,6.12,0.3,524.42,2.5,,True
1,2019-07-20 01:07:05,2019-07-20 05:47:45,1.0,95.0,1,False,164,39,1,283.0,2.5,0.5,289.5,6.12,0.3,581.92,2.5,,True
2,2019-04-26 12:28:31,2019-04-26 14:49:12,5.0,85.88,5,False,164,130,2,40.0,2.5,0.0,0.0,21.0,0.3,63.8,0.0,,True
1,2019-01-22 19:54:28,2019-01-22 22:34:32,2.0,84.3,2,False,132,48,1,52.0,0.0,0.5,5.0,0.0,0.3,57.8,0.0,,True
1,2019-08-13 07:18:36,2019-08-13 09:52:19,1.0,84.0,1,False,186,186,1,229.0,3.0,0.5,236.53,6.12,0.3,475.45,2.5,,True


Tolls:
Observe that tolls has a mean of 0.37 and std 1.6 => the entries with outlandish tolls should be removed.
E.g. entry with 1.6 miles distance and 66 usd of tolls? yikes
Tolls will be capped at 20 usd

In [54]:
sdf.describe(*non_categorical_features).show()



+-------+------------------+-------------------+--------------------+--------------------+------------------+---------------------+------------------+------------------+------------------+
|summary|             extra|       tolls_amount|             mta_tax|congestion_surcharge|        tip_amount|improvement_surcharge|   passenger_count|       fare_amount|     trip_distance|
+-------+------------------+-------------------+--------------------+--------------------+------------------+---------------------+------------------+------------------+------------------+
|  count|           7696464|            7696464|             7696464|             7696464|           7696464|              7696464|           7696464|           7696464|           7696464|
|   mean|1.1358013809978202|0.36445805502360723|  0.4984667049699707|  2.2294679738643617|2.2150395181481195|   0.2999679982885375|1.5675646634610387|13.048213169060483|2.9877111489120476|
| stddev|1.2593628512748982| 1.5562518064305295|0.02784

                                                                                

In [59]:
export_relative_dir = '../data/curated/'
sdf.write.mode('overwrite').parquet(export_relative_dir + "yellow_taxi_cleaned_subsampled.parquet")


                                                                                

In [60]:
# trying to stop spark session and free up memory...
sdf.unpersist()
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x7ff7fbddd280>>