In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("nyc-taxi-inspect-bronze")
    .master("local[*]")
    .config("spark.driver.memory", "6g")
    .config("spark.sql.shuffle.partitions", "16")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/05 10:29:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/05 10:29:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
26/02/05 10:29:33 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
26/02/05 10:29:33 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [2]:
cleanSilver_path = "../data/processed/silver/trips_clean/clean_silver_yellow_tripdata"
investigation_path = "../data/processed/silver/investigation_trips/invest_silver_yellow_tripdata"


investigationDf = spark.read.parquet(investigation_path)
cleanDf= spark.read.parquet(cleanSilver_path)

In [3]:
cleanDf.count()

9317654

In [6]:
investigationDf.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: integer (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: integer (nullable = true)
 |-- extra: integer (nullable = true)
 |-- mta_tax: integer (nullable = true)
 |-- tip_amount: integer (nullable = true)
 |-- tolls_amount: integer (nullable = true)
 |-- improvement_surcharge: integer (nullable = true)
 |-- total_amount: integer (nullable = true)
 |-- _source_file: string (nullable = true)
 |-- pickup_ts: timestamp (nullable = true)
 |-- dropoff_ts: timestamp (

In [7]:
investigationDf.show()

26/02/05 10:33:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-------------------+-------------------+------------------+--------------------+---------------------+------------------+---------------------+-----------------------+-------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|        _source_file|          pickup_ts|         dropoff_ts| trip_duration_min|has_valid_timestamps|has_positive_duration|duration_within_6h|has_positive_distance|has_non_negative_amoun

In [8]:
invalid_trips = investigationDf.filter(~investigationDf.is_valid_trip)

In [9]:
invalid_trips.show()

+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-------------------+-------------------+------------------+--------------------+---------------------+------------------+---------------------+-----------------------+-------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|        _source_file|          pickup_ts|         dropoff_ts| trip_duration_min|has_valid_timestamps|has_positive_duration|duration_within_6h|has_positive_distance|has_non_negative_amoun

In [11]:
invalid_trips.groupBy("invalid_reason") \
    .count() \
    .orderBy("count", ascending=False) \
    .show(truncate=False)

+---------------------+-------+
|invalid_reason       |count  |
+---------------------+-------+
|non_positive_distance|2861735|
|duration_gt_6h       |19004  |
|non_positive_duration|12559  |
+---------------------+-------+



In [14]:
invalid_trips.groupBy("pickup_date").count().orderBy("pickup_date", ascending=True).show()


+-----------+------+
|pickup_date| count|
+-----------+------+
| 2016-03-01| 92229|
| 2016-03-02|103904|
| 2016-03-03|108866|
| 2016-03-04|110747|
| 2016-03-05|102869|
| 2016-03-06| 81236|
| 2016-03-07| 83781|
| 2016-03-08| 90869|
| 2016-03-09| 91021|
| 2016-03-10| 95234|
| 2016-03-11|102308|
| 2016-03-12| 96108|
| 2016-03-13| 74038|
| 2016-03-14|104440|
| 2016-03-15| 93687|
| 2016-03-16| 96066|
| 2016-03-17| 97926|
| 2016-03-18|105495|
| 2016-03-19| 99461|
| 2016-03-20| 86213|
+-----------+------+
only showing top 20 rows



In [16]:
invalid_trips.select(
    "pickup_ts",
    "dropoff_ts",
    "trip_duration_min",
    "trip_distance",
    "total_amount",
    "invalid_reason",
    "payment_type"
).show(30, truncate=False)


+-------------------+-------------------+------------------+-------------+------------+---------------------+------------+
|pickup_ts          |dropoff_ts         |trip_duration_min |trip_distance|total_amount|invalid_reason       |payment_type|
+-------------------+-------------------+------------------+-------------+------------+---------------------+------------+
|2016-03-12 05:38:11|2016-03-12 05:41:38|3.45              |0            |6           |non_positive_distance|2           |
|2016-03-12 23:06:33|2016-03-12 23:11:46|5.216666666666667 |0            |8           |non_positive_distance|1           |
|2016-03-12 23:06:34|2016-03-12 23:13:24|6.833333333333333 |0            |8           |non_positive_distance|1           |
|2016-03-12 05:38:16|2016-03-12 05:42:46|4.5               |0            |7           |non_positive_distance|1           |
|2016-03-12 23:06:35|2016-03-12 23:09:46|3.183333333333333 |0            |5           |non_positive_distance|2           |
|2016-03-12 23:0