In [0]:
import os
# To work with Amazon S3 storage, set the following variables using your AWS Access Key and Secret Key
# Set the Region to where your files are stored in S3.
access_key = 'xxxxxxxxxxxxxx'
secret_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
# Set the environment variables so boto3 can pick them up later
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key
encoded_secret_key = secret_key.replace("/", "%2F")
aws_region = "us-east-2"

# Update the Spark options to work with our AWS Credentials
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")

sdf = spark.read.parquet('s3a://hvfhv-project-mc/landing/fhvhv_tripdata_2023-01.parquet')

In [0]:
# Import some functions we will need later on
from pyspark.sql.functions import col, isnan, when, count, udf

# Set the Spark logging level to only show errors
sc.setLogLevel("ERROR")

In [0]:
sdf.count()

Out[10]: 18479031

In [0]:
sdf.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

In [0]:
# Drop unnecessary columns (11)
dropped_cols_sdf = sdf.drop('hvfhs_license_num', 'dispatching_base_num', 'originating_base_num', 'PULocationID', 'DOLocationID', 'driver_pay', 'shared_request_flag', 'shared_match_flag', 'wav_request_flag', 'wav_match_flag', 'access_a_ride_flag')

In [0]:
# Print schema of dropped_cols_sdf
dropped_cols_sdf.printSchema()

root
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)



In [0]:
# Check how many records are null in the "on_scene_datetime" column
dropped_cols_sdf.select([count(when(col(c).isNull(), c)).alias(c) for c in ["on_scene_datetime"] ] ).show()

+-----------------+
|on_scene_datetime|
+-----------------+
|          4891992|
+-----------------+



In [0]:
# Drop null records from "on_scene_datetime" column
clean_sdf = dropped_cols_sdf.na.drop(subset=["on_scene_datetime"])

In [0]:
# Check how many records are null in the "on_scene_datetime" column
clean_sdf.select([count(when(col(c).isNotNull(), c)).alias(c) for c in ["on_scene_datetime"] ] ).show()
clean_sdf.select([count(when(col(c).isNull(), c)).alias(c) for c in ["on_scene_datetime"] ] ).show()

+-----------------+
|on_scene_datetime|
+-----------------+
|         13587039|
+-----------------+

+-----------------+
|on_scene_datetime|
+-----------------+
|                0|
+-----------------+



In [0]:
clean_sdf.printSchema()

root
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)



In [0]:
clean_sdf.select([count(when(col(c).isNull(), c)).alias(c) for c in ["request_datetime", "on_scene_datetime", "pickup_datetime", "dropoff_datetime", "trip_miles", "trip_time", "base_passenger_fare", "tolls", "bcf", "sales_tax", "congestion_surcharge", "airport_fee", "tips"] ] ).show()

+----------------+-----------------+---------------+----------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+
|request_datetime|on_scene_datetime|pickup_datetime|dropoff_datetime|trip_miles|trip_time|base_passenger_fare|tolls|bcf|sales_tax|congestion_surcharge|airport_fee|tips|
+----------------+-----------------+---------------+----------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+
|               0|                0|              0|               0|         0|        0|                  0|    0|  0|        0|                   0|          0|   0|
+----------------+-----------------+---------------+----------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+



In [0]:
# Write clean_sdf back to parquet file and save it in raw folder
clean_sdf.write.parquet('s3://hvfhv-project-mc/raw/cleaned_fhvhv_tripdata_2023-01.parquet')

In [0]:
# Drop unnecessary columns (10), and save it as dropped_cols_sdf. DONE
# Drop all nulls from "on_scene_datetime" columns, and save it as clean_sdf. DONE
# Write clean_sdf back to parquet file and save it in raw folder. DONE

In [0]:
# Check that "access_a_ride_flag" was dropped
myColumns=dropped_cols_sdf.columns
"access_a_ride_flag" in myColumns

Out[25]: False

In [0]:
# clean_sdf.select([count(when(isnan(c) | col(c).isNotNull(), c)).alias(c) for c in ["originating_base_num"]] ).show()
# clean_sdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ["originating_base_num"]] ).show()
# new_sdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ["access_a_ride_flag"]] ).show()