In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [4]:
spark = (
    SparkSession.builder.appName("preprocessing of taxi data")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "15g")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/08 22:16:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sdf_all = spark.read.parquet('../data/raw/')
raw_data_count = sdf_all.count()

sdf_all.printSchema()



root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)



                                                                                

In [6]:
raw_data_count

244632132

In [7]:
sdf_reduce_col = sdf_all.select("tpep_pickup_datetime", 
                                "tpep_dropoff_datetime", 
                                "PULocationID",
                                "DOLocationID"
                                )

In [8]:
sdf_reduce_col.printSchema()

root
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)



In [9]:
Dict_Null = {col:sdf_reduce_col.filter(sdf_reduce_col[col].isNull()).count() for col in sdf_reduce_col.columns}
Dict_Null
    

                                                                                

{'tpep_pickup_datetime': 0,
 'tpep_dropoff_datetime': 0,
 'PULocationID': 0,
 'DOLocationID': 0}

In [10]:
valid_location_sdf = sdf_reduce_col.filter((F.col('PULocationID').between(1, 263)) & 
                                            (F.col('DOLocationID').between(1, 263)))
valid_loc_count = valid_location_sdf.count()
msg = (
        f"Out of {raw_data_count} raw data points, {raw_data_count - valid_loc_count} "
        f"are out of location ID range [1, 263] and has been cleaned"
)
print(msg)
print("Remaining data count:",valid_loc_count)



Out of 244632132 raw data points, 4495024 are out of location ID range [1, 263] and has been cleaned
Remaining data count: 240137108


                                                                                

In [11]:
# generate trip duration in seconds
trip_duration_add_sdf = valid_location_sdf.withColumn('duration (sec)',
                                            F.col("tpep_dropoff_datetime").cast("long")\
                                            - F.col('tpep_pickup_datetime').cast("long"))
trip_duration_add_sdf.show()


+--------------------+---------------------+------------+------------+--------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|PULocationID|DOLocationID|duration (sec)|
+--------------------+---------------------+------------+------------+--------------+
| 2016-09-01 00:05:53|  2016-09-01 00:13:33|         261|         158|           460|
| 2016-09-01 00:18:25|  2016-09-01 00:29:54|         246|          79|           689|
| 2016-09-01 00:15:45|  2016-09-01 00:32:59|          90|          75|          1034|
| 2016-09-01 00:40:44|  2016-09-01 00:49:34|          43|          48|           530|
| 2016-09-01 00:14:48|  2016-09-01 00:23:34|          95|         102|           526|
| 2016-09-01 00:45:45|  2016-09-01 00:48:25|          95|          95|           160|
| 2016-09-01 00:29:57|  2016-09-01 00:53:55|         170|         129|          1438|
| 2016-09-01 00:43:56|  2016-09-01 00:57:12|         238|         244|           796|
| 2016-09-01 00:08:45|  2016-09-01 00:19:35|         1

In [12]:
trip_duration_add_sdf.printSchema()

root
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- duration (sec): long (nullable = true)



In [13]:
trip_duration_add_sdf.describe("duration (sec)")

                                                                                

summary,duration (sec)
count,240137108.0
mean,978.7926779229806
stddev,98854.6891599128
min,-631151066.0
max,45466304.0


In [14]:
# from tlc the maximum per 24 hour period is 10 hours and the minimum would be zero
# with such assumption invalid data is removed.
valid_duration_sdf = trip_duration_add_sdf.filter(F.col("duration (sec)").between(0, 10*60*60))
valid_duration_count = valid_duration_sdf.count()

msg = (
        f"Out of {valid_loc_count} valid location data points, {valid_loc_count - valid_duration_count} "
        f"are out of duration range [0 hours, 10 hours] and has been cleaned"
)
print(msg)
print("Remaining data count:", valid_duration_count)



Out of 240137108 valid location data points, 378783 are out of duration range [0 hours, 10 hours] and has been cleaned
Remaining data count: 239758325


                                                                                

In [16]:
valid_datetime_sdf = valid_duration_sdf.filter((F.col("tpep_pickup_datetime") >= "2016-01-01") & \
                                                (F.col("tpep_pickup_datetime") <= "2017-12-31"))

valid_datetime_count = valid_datetime_sdf.count()

msg = (
        f"Out of {valid_duration_count} valid location data points, {valid_duration_count - valid_datetime_count} "
        f"are out of the range 2016-01-01 to 2017-12-31 and has been cleaned"
)
print(msg)
print("Remaining data count:", valid_datetime_count)



Out of 239758325 valid location data points, 237416 are out of the range 2016-01-01 to 2017-12-31 and has been cleaned
Remaining data count: 239520909


                                                                                

In [17]:
valid_datetime_sdf.write.mode('overwrite').parquet('../data/curated/preprocess_result1')


                                                                                

In [19]:
valid_datetime_sdf.show()
print(valid_datetime_sdf.count())

+--------------------+---------------------+------------+------------+--------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|PULocationID|DOLocationID|duration (sec)|
+--------------------+---------------------+------------+------------+--------------+
| 2016-09-01 00:05:53|  2016-09-01 00:13:33|         261|         158|           460|
| 2016-09-01 00:18:25|  2016-09-01 00:29:54|         246|          79|           689|
| 2016-09-01 00:15:45|  2016-09-01 00:32:59|          90|          75|          1034|
| 2016-09-01 00:40:44|  2016-09-01 00:49:34|          43|          48|           530|
| 2016-09-01 00:14:48|  2016-09-01 00:23:34|          95|         102|           526|
| 2016-09-01 00:45:45|  2016-09-01 00:48:25|          95|          95|           160|
| 2016-09-01 00:29:57|  2016-09-01 00:53:55|         170|         129|          1438|
| 2016-09-01 00:43:56|  2016-09-01 00:57:12|         238|         244|           796|
| 2016-09-01 00:08:45|  2016-09-01 00:19:35|         1



239520909


                                                                                