# Preprocessing
First part of the preprocessing step includes basic transformations, such as converting data types, changing column names, etc. 
Second part includes filtering, feature selection, feature engineering and aggregation.

In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import IntegerType
import geopandas as gpd
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1 - Preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)


import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In the next release, GeoPandas will switch to using Shapely by default, even if PyGEOS is installed. If you only have PyGEOS installed to get speed-ups, this switch should be smooth. However, if you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  import geopandas as gpd
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/21 07:24:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Fixing the schema
The dataset has different data types over different timelines. We need to use common schema to fix this issue. In addition, make sure that all columns are lower cased.

In [2]:
sdf_feb_yellow = spark.read.parquet('../data/landing/yellow-2023-02.parquet')

low_casing = [F.col(col_name).alias(col_name.lower()) for col_name in sdf_feb_yellow.columns]
sdf_feb_yellow = sdf_feb_yellow.select(*low_casing)

sdf_schema_yellow = sdf_feb_yellow.schema

                                                                                

In [3]:
YEAR = '2022'
MONTHS = range(1, 13)

timelines = []
for month in MONTHS:
    timelines.append(YEAR + "-" + str(month).zfill(2))

YEAR = '2023'
MONTHS = range(1, 6)
for month in MONTHS:
    timelines.append(YEAR + "-" + str(month).zfill(2))

for timeline in timelines:
    sdf_yellow = spark.read \
        .parquet('../data/landing/yellow-'+timeline+'.parquet') \
        .select(*low_casing)
    sdf_yellow = sdf_yellow \
        .select([F.col(c).cast(sdf_schema_yellow[i].dataType) for i, c in enumerate(sdf_yellow.columns)])
    sdf_yellow \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet('../data/raw/yellow/yellow-'+timeline)
    

23/08/21 07:24:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [4]:
sdf = spark.read.parquet('../data/raw/yellow/*')

## Data Wrangling
In this section, we changed data types and performed filtering. 

In [5]:
sdf = sdf \
    .withColumn('duration_sec', F.unix_timestamp(F.col('tpep_dropoff_datetime')) - F.unix_timestamp(F.col('tpep_pickup_datetime'))) \
    .withColumn('passenger_count', F.col('passenger_count').cast(IntegerType()))

In [6]:
sdf.show(5, truncate=100)
sdf.printSchema()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|duration_sec|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------+
|       1| 2022-10-01 00:03:41|  2022-10-01 00:18:39|              1|          1.7|         1|                 N|         249|         107|         

### Fare features
All fare features must be positive. Although they won't be used, incorrect entries make the data instance untrustworthy.

In [7]:
print("All values must be positive")
initial_count = sdf.count()
print("Before removing: ", initial_count)
sdf=sdf.filter((F.col('vendorid').between(1, 2)) &
                (F.col('ratecodeid').between(1, 6)) &
                (F.col('payment_type').between(1, 6)) &
                (F.col('fare_amount') >= 0) &
                (F.col('extra') >= 0) &
                (F.col('improvement_surcharge') >= 0) &
                (F.col('tip_amount') >= 0) &
                (F.col('fare_amount') >= 0) &
                (F.col('tolls_amount') >= 0) &
                (F.col('total_amount') >= 0) &
                (F.col('congestion_surcharge') >= 0) &
                (F.col('airport_fee') >= 0))
print("After removing: ", sdf.count())
print("Difference: ", initial_count - sdf.count())

All values must be positive
Before removing:  55842484


                                                                                

After removing:  53435612




Difference:  2406872


                                                                                

In [8]:
# Passenger count
print("Passenger count must be positive")

initial_count = sdf.count()
print("Before removing: ", initial_count)
sdf = sdf.filter(F.col('passenger_count').between(1, 4))
print("After removing: ", sdf.count())
print("Difference: ", initial_count - sdf.count())

Passenger count must be positive


                                                                                

Before removing:  53435612


                                                                                

After removing:  50902107




Difference:  2533505


                                                                                

In [9]:
# trip distance
print("Trip distance must be within the specified range")
MIN_DISTANCE = 0.1
MAX_DISTANCE = 50

initial_count = sdf.count()
print("Before removing: ", initial_count)
sdf = sdf.filter(F.col('trip_distance').between(MIN_DISTANCE, MAX_DISTANCE))
print("After removing: ", sdf.count())
print("Difference: ", initial_count - sdf.count())

Trip distance must be within the specified range


                                                                                

Before removing:  50902107


                                                                                

After removing:  50180416




Difference:  721691


                                                                                

In [10]:
# Location ID: Pick-up and Drop-off
print("Location IDs must be between 1 and 263")

initial_count = sdf.count()
print("Before removing: ", initial_count)
sdf = sdf \
    .filter(F.col('pulocationid').between(1, 263)) \
    .filter(F.col('dolocationid').between(1, 263))
print("After removing: ", sdf.count())
print("Difference: ", initial_count - sdf.count())

Location IDs must be between 1 and 263


                                                                                

Before removing:  50180416


                                                                                

After removing:  49373274




Difference:  807142


                                                                                

In [11]:
# Pickup time
start_date = "2022-01-01"
end_date = "2023-06-01"
MIN_DURATION = 120 # 2 minutes
MAX_DURATION = 14400 # 4 hours

initial_count = sdf.count()
print("Before removing: ", initial_count)
sdf = sdf \
    .filter(F.col('tpep_pickup_datetime') >= start_date) \
    .filter(F.col('duration_sec').between(MIN_DURATION, MAX_DURATION)) \
    .filter(F.col('tpep_dropoff_datetime') < end_date)

print("After removing: ", sdf.count())
print("Difference: ", initial_count - sdf.count())

                                                                                

Before removing:  49373274


                                                                                

After removing:  48894121




Difference:  479153


                                                                                

## Feature Selection
We have decided to drop features related to specific trips since we are dealing with total number of pickup and dropoff counts for a certain time period instead of single trips.

In [12]:
sdf = sdf.drop('ratecodeid', 'store_and_fwd_flag', 'payment_type', 
        'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
        'improvement_surcharge', 'total_amount', 'congestion_surcharge',
        'airport_fee', 'vendorid')

## Feature Engineering
We converted datetime features to nearest hours and extracted day, season information.

In [13]:
sdf = sdf \
    .withColumn('pickup_hour', F.hour((F.round(F.unix_timestamp('tpep_pickup_datetime')/3600)*3600).cast("timestamp"))) \
    .withColumn('pickup_dayofweek', F.dayofweek('tpep_pickup_datetime')) \
    .withColumn('pickup_month', F.month('tpep_pickup_datetime')) \
    .withColumn('dropoff_hour', F.hour((F.round(F.unix_timestamp('tpep_dropoff_datetime')/3600)*3600).cast("timestamp"))) \
    .withColumn('dropoff_dayofweek', F.dayofweek('tpep_dropoff_datetime')) \
    .withColumn('dropoff_month', F.month('tpep_dropoff_datetime')) \
    .withColumn('year', F.year('tpep_pickup_datetime'))

In [14]:
# Season is hardly likely to change. A single trip cannot have the effect
# of two seasons at the same time. 
sdf = sdf.withColumn('season', F.when((F.col('pickup_month').between(3, 5)), 'spring')
                            .when((F.col('pickup_month').between(6, 8)), 'summer')
                            .when((F.col('pickup_month').between(9, 11)), 'autumn')
                            .otherwise('winter'))

## Aggregation
Group the dataset using temporal and spatial features.

In [15]:
pickup = sdf.groupBy("pulocationid", "pickup_hour", "pickup_dayofweek", "season", "year").agg(
    F.count(F.col("pulocationid")).alias("pickup_count")
)
dropoff = sdf.groupBy("dolocationid", "dropoff_hour", "dropoff_dayofweek", "season", "year").agg(
    F.count(F.col("dolocationid")).alias("dropoff_count")
)

In [16]:
pickup.printSchema()
print("Total instances of pickup data: ", pickup.count())
dropoff.printSchema()
print("Total instances of dropoff data: ", dropoff.count())

root
 |-- pulocationid: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-- season: string (nullable = false)
 |-- year: integer (nullable = true)
 |-- pickup_count: long (nullable = false)



                                                                                

Total instances of pickup data:  129673
root
 |-- dolocationid: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- dropoff_dayofweek: integer (nullable = true)
 |-- season: string (nullable = false)
 |-- year: integer (nullable = true)
 |-- dropoff_count: long (nullable = false)





Total instances of dropoff data:  231653


                                                                                

Join the taxi data to the public transport data

In [17]:
import pandas as pd
public_transport = spark.read.csv('../data/raw/public_transport.csv', header=True, inferSchema=True)
pickup = pickup.join(public_transport.select('LocationID', 'Shape_Area', 'borough', 'num_stops'), 
                pickup['pulocationid'] == public_transport['LocationID'], how='inner')
dropoff = dropoff.join(public_transport.select('LocationID', 'Shape_Area', 'borough', 'num_stops'), 
                dropoff['dolocationid'] == public_transport['LocationID'], how='inner')

In [18]:
pickup = pickup.drop('LocationID')
dropoff = dropoff.drop('LocationID')

### Airport
Ensure borough value is Airport for taxi zones with airports

In [19]:
zone = gpd.read_file("../data/landing/taxi_zones/taxi_zones.shp")
airport_location_id = []
for location_id in zone.loc[zone['zone'].str.contains('Airport'), ['LocationID']].values:
    airport_location_id.append(location_id[0])    
pickup = pickup.withColumn("borough", F.when(F.col("pulocationid").isin(airport_location_id), 'Airport').otherwise(F.col("borough")))
dropoff = dropoff.withColumn("borough", F.when(F.col("dolocationid").isin(airport_location_id), 'Airport').otherwise(F.col("borough")))

The data is now clean and ready to be analyzed.

In [20]:
pickup.toPandas().to_csv('../data/curated/pickup.csv', index=False)
dropoff.toPandas().to_csv('../data/curated/dropoff.csv', index=False)

                                                                                