# Preprocessing the Data

In [38]:
from pyspark.sql import SparkSession, functions as F

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("ADS Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

### Reading in the Raw Layer for 2023 Yellow Taxi Trip Records 

In [39]:
sdf_yellow = spark.read.parquet('Raw/tlc_yellow_data/*')

In [40]:
sdf_yellow.count()

38310226

### Filtering 2023 Trip Records for Yellow Taxis

In [41]:
# remove passenger count of less than or equal to 0
sdf_yellow = sdf_yellow.filter(F.col('passenger_count') > 0)

# remove trip distance that are less than 0.5 miles 
sdf_yellow = sdf_yellow.filter(F.col('trip_distance') > 0.5)

sdf_yellow = sdf_yellow.filter(F.col('RateCodeID') <= 6)

# 3 dollars initial charge effective 2022 December 19
sdf_yellow = sdf_yellow.filter(F.col('fare_amount') > 3.00)

sdf_yellow = sdf_yellow.filter(F.col('extra') >= 0.00)

# MTA State Surcharge for all trips that end in NYC or Nassau
sdf_yellow = sdf_yellow.filter((F.col('mta_tax') == 0.50) | (F.col('mta_tax') == 0.00))

sdf_yellow = sdf_yellow.filter(F.col('tip_amount') >= 0.00)

sdf_yellow = sdf_yellow.filter(F.col('tolls_amount') >= 0.00)

# airport fee for pickups at LAGuardia and John F Kennedy Airports
sdf_yellow = sdf_yellow.filter((F.col('airport_fee') == 1.25) | (F.col('airport_fee') == 0.00) | (F.col('airport_fee') == 1.75))

# Congestion surchage is $2.50 for non shared trips for yellow taxicabs, or $0.75 for shared rides 
sdf_yellow = sdf_yellow.filter((F.col('congestion_surcharge') == 2.50) | (F.col('congestion_surcharge') == 0.00) | (F.col('congestion_surcharge') == 0.75))

# Remove Store and Forward Flag and total amount
sdf_yellow = sdf_yellow.drop('store_and_fwd_flag')
sdf_yellow = sdf_yellow.drop('total_amount')

sdf_yellow=sdf_yellow.withColumn('duration_seconds', F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime'))
sdf_yellow = sdf_yellow.filter(F.col('duration_seconds') >= 60)

# Removing rows with NANs
sdf_yellow = sdf_yellow.dropna()

sdf_yellow.count()


                                                                                

33834153

In [42]:
sdf_yellow \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('Processed/tlc_yellow_data')

                                                                                

### Applying the same filtering conditions to the 2024 Trip Records for Yellow Taxis

In [48]:
sdf_yellow_24 = spark.read.parquet('2024_Raw/tlc_yellow_data/*')

In [49]:
sdf_yellow_24 = sdf_yellow_24.withColumnRenamed("Airport_fee", "airport_fee")

In [50]:
sdf_yellow_24.count()

20332093

In [51]:

# remove passenger count of less than or equal to 0
sdf_yellow_24 = sdf_yellow_24.filter(F.col('passenger_count') > 0)

# remove trip distance that are less than 0.5 miles 
sdf_yellow_24 = sdf_yellow_24.filter(F.col('trip_distance') > 0.5)

sdf_yellow_24 = sdf_yellow_24.filter(F.col('RateCodeID') <= 6)

# 3 dollars initial charge effective 2022 December 19
sdf_yellow_24 = sdf_yellow_24.filter(F.col('fare_amount') > 3.00)

sdf_yellow_24 = sdf_yellow_24.filter(F.col('extra') >= 0.00)

# MTA State Surcharge for all trips that end in NYC or Nassau
sdf_yellow_24 = sdf_yellow_24.filter((F.col('mta_tax') == 0.50) | (F.col('mta_tax') == 0.00))

sdf_yellow_24 = sdf_yellow_24.filter(F.col('tip_amount') >= 0.00)

sdf_yellow_24 = sdf_yellow_24.filter(F.col('tolls_amount') >= 0.00)

# airport fee for pickups at LAGuardia and John F Kennedy Airports
sdf_yellow_24 = sdf_yellow_24.filter((F.col('airport_fee') == 1.25) | (F.col('airport_fee') == 0.00) | (F.col('airport_fee') == 1.75))

# Congestion surchage is $2.50 for non shared trips for yellow taxicabs, or $0.75 for shared rides 
sdf_yellow_24 = sdf_yellow_24.filter((F.col('congestion_surcharge') == 2.50) | (F.col('congestion_surcharge') == 0.00) | (F.col('congestion_surcharge') == 0.75))

# Remove Store and Forward Flag and total amount
sdf_yellow_24 = sdf_yellow_24.drop('store_and_fwd_flag')
sdf_yellow_24 = sdf_yellow_24.drop('total_amount')

sdf_yellow_24 = sdf_yellow_24.withColumn('duration_seconds', F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime'))
sdf_yellow_24 = sdf_yellow_24.filter(F.col('duration_seconds') >= 60)

# Removing rows with NANs
sdf_yellow_24 = sdf_yellow_24.dropna()

sdf_yellow_24.count()

                                                                                

16593051

In [52]:
sdf_yellow_24 \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('2024_Processed/tlc_yellow_data')

                                                                                