# Preprocess the TLC Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = (
    SparkSession.builder.appName("preprocess")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)

In [3]:
sdf = spark.read.parquet('../data/raw/yellow_taxi_data_2019/')

# Preliminary data analysis

In [4]:
print(f'total of {sdf.count():,} rows')

total of 84,598,444 rows


In [5]:
sdf.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [6]:
# we are not interested in these attributes

new_sdf = sdf.drop('VendorID', 
                   'store_and_fwd_flag', 
                   'fare_amount', 
                   'extra',
                   'mta_tax', 
                   'tolls_amount', 
                   'improvement_surcharge',
                   'total_amount',
                   'congestion_surcharge',
                   'airport_fee')

In [7]:
# Check the statistics for interested attributes
# new_sdf.describe(['tip_amount', 'trip_distance', 'passenger_count', 'RatecodeID', 'payment_type']).show()

In [8]:
# Check null value
#for column in new_sdf.columns:
    #print(f'{column} has {new_sdf.where(F.col(column).isNull()).count()} null value(s)')

In [9]:
# Modify the sdf
# line 18: Drop the rows with null passenger_count and RatecodeID
# line 19~20: Remove data not in 2019
# line 21: A reasonable tip amount between 0 and 1000
# line 22: A reasonable trip distance is between 0 and 1000
# line 23: RateCodeID should be one of the integer in the range of 1-6
# line 24: Only trips paid by cash and credit card are of our interest
# line 25: Passenger count must not be zero
# line 26~27: Zone 264, 265 are unknown zone
# line 28~30: Extract the pick-up month, day and hour
# line 31: trip_distance in kilometres
# line 32~33: duaration of trips
# line 34: weekday and weekend 
# line 35: is the tips > 0


new_sdf_mdf = new_sdf \
            .dropna(how='any') \
            .filter((F.year('tpep_pickup_datetime') == 2019) & 
                    (F.year('tpep_dropoff_datetime') == 2019) == True) \
            .filter((F.col('tip_amount') >= 0) & (F.col('tip_amount') < 1000)) \
            .filter((F.col('trip_distance') >= 0) & (F.col('trip_distance') < 1000)) \
            .filter(F.col('RatecodeID').isin(list(range(1, 7)))) \
            .filter(F.col('payment_type').isin([1, 2])) \
            .filter(F.col('passenger_count') != 0) \
            .filter((F.col('PULocationID').isin([264, 265]) == False) & 
                    (F.col('DOLocationID').isin([264, 265]) == False)) \
            .withColumn('PUMonth', F.month(F.col('tpep_pickup_datetime'))) \
            .withColumn('PUDay', F.dayofmonth(F.col('tpep_pickup_datetime'))) \
            .withColumn('PUHour', F.hour(F.col('tpep_pickup_datetime'))) \
            .withColumn('trip_distance(km)', F.col('trip_distance') * 1.60934) \
            .withColumn('time_difference(s)', F.col('tpep_dropoff_datetime').cast('long') - \
                                                F.col('tpep_pickup_datetime').cast('long')) \
            .withColumn('is_weekend', F.dayofweek('tpep_pickup_datetime').isin([1, 7])) \
            .withColumn('tips_given', F.col('tip_amount') > 0)
                     

In [10]:
aggregated_results = new_sdf_mdf \
                    .groupBy("PULocationID", 
                             "DOLocationID", 
                             "passenger_count",
                             "PUMonth",
                             "PUDay",
                             "PUHour",
                             "is_weekend"
                            ) \
                    .agg(
                        F.max("tip_amount").alias("max_tip_amount_usd"),
                        F.mean("tip_amount").alias("avg_tip_amount_usd"),
                        F.max("trip_distance(km)").alias("max_trip_distance(km)"),
                        F.mean("trip_distance(km)").alias("avg_trip_distance(km)"),
                        F.max("time_difference(s)").alias("max_duration"),
                        F.mean("time_difference(s)").alias("avg_duration"),
                        F.count("PULocationID").alias("total_pickup_count")
                    ) \
                    .orderBy("PUMonth", "PUDay", "PUHour")

aggregated_results.show()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3417, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-10-425dea3b16ef>", line 21, in <module>
    aggregated_results.show()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/pyspark/sql/dataframe.py", line 606, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_va

ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
aggregated_results.write.mode('overwrite').parquet('../data/curated/aggregated_results_2019.parquet')

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
