In [None]:
#Not needed in Zeppelin
from pyspark.sql import *

spark = SparkSession.builder.appName("dataCleaning").getOrCreate()

In [None]:
import sys
from csv import reader
from pyspark.sql.functions import *
from datetime import datetime

In [None]:
# Final csv should be styled as such
# ('VendorID', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 
# 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 
# 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'month of trip', 
# 'trip length in seconds')

# Move data from green taxi set around to match above comment (minus last 2 columns)
def mapGreenTaxi(x):
   return (x[0], x[1], x[2], x[7], x[8], x[4], x[3], x[5], x[6], x[17], x[9], x[10], x[11], x[12], x[13], x[15], x[16], x[19])
    
# Ensure data from yellow taxi set is consistent with the new green taxi data format
def mapYellowTaxi(x):
    return (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16], x[17])
    
# Remove blank and standardize null
def removeBlankNulls(x):
    VendorID = x[0] if x[0].strip() == "1" or x[0].strip() == "2" else "NULL"
    pickup_datetime = x[1]
    dropoff_datetime = x[2]
    passenger_count = x[3] if x[3].strip() != "" else "NULL"
    trip_distance = x[4] if x[4].strip() != "" else "NULL"
    RatecodeID = x[5] if x[5].strip() != "" and x[5].strip() != "99" else "NULL"
    store_and_fwd_flag = x[6] if x[6].strip() != "" and x[6].strip() != "99" else "NULL"
    PULocationID = x[7] if x[7].strip() != "" and x[7].strip() != "99" else "NULL"
    DOLocationID = x[8] if x[8].strip() != "" and x[8].strip() != "99" else "NULL"
    payment_type = x[9] if x[9].strip() != "" and x[9].strip() != "99" else "NULL"
    fare_amount = x[10] if x[10].strip() != "" else "0"
    extra = x[11] if x[11].strip() != "" else "0"
    mta_tax = x[12] if x[12].strip() != "" else "0"
    tip_amount = x[13] if x[13].strip() != "" else "0"
    tolls_amount = x[14] if x[14].strip() != "" else "0"
    improvement_surcharge = x[15] if x[15].strip() != "" else "0"
    total_amount = x[16] if x[16].strip() != "" else "0"
    congestion_surcharge = x[17] if x[17].strip() != "" else "0"
    
    return (VendorID,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge)
    
# Remove the header lines
def filterHeaders(x):
    if (x[0] != "VendorID"):
        return True
        
# Remove any data with a negative trip_distance or fare_amount
def removeInvalidValues(x):
    year = x[1].split(" ")[0].split("/")[2]
    if "NULL" not in x and year == "2020" and int(x[3]) > 0 and int(x[3]) < 5 and float(x[4]) > 0:
        if float(x[10]) >= 2.5 and float(x[11]) >= 0 and float(x[12]) >= 0 and float(x[13]) >= 0 and float(x[14]) >= 0 and float(x[15]) >= 0 and float(x[16]) >= 0 and float(x[17]) >= 0:
            return True
            
# Validate the fares based on TLC taxi fare guidelines
def validateFares(x):
    rateCode = x[5]
    fare_amount = float(x[10])
    extra = float(x[11])
    mta_tax = float(x[12])
    improvement_surcharge = float(x[15])
    congestion_surcharge = float(x[17])
    
    if improvement_surcharge == 0.3:
        if rateCode == "1" or rateCode == "6":
            if (extra == 0 or extra == 0.5 or extra == 1) and (mta_tax == 0 or mta_tax == 0.5) and (congestion_surcharge == 0 or congestion_surcharge == 2.75 or congestion_surcharge == 2.5 or congestion_surcharge == 0.75):
                return True
        elif rateCode == "2":
            if fare_amount == 52 and (extra == 0 or extra == 4.5) and mta_tax == 0.5 and (congestion_surcharge == 0 or congestion_surcharge == 2.75 or congestion_surcharge == 2.5 or congestion_surcharge == 0.75):
                return True
        elif rateCode == "3":
            if mta_tax == 0 and congestion_surcharge == 0:
                return True
        elif rateCode == "4" or rateCode == "5":
            if (mta_tax == 0 or mta_tax == 0.5) and (congestion_surcharge == 0 or congestion_surcharge == 2.75 or congestion_surcharge == 2.5 or congestion_surcharge == 0.75):
                return True
    return False
        
# Join list by comma to create CSV lines
def formatAsCSV(x):
    return ','.join(x)
    
#get month and trip length data
def mapExtraInfo(x):
    month = x[1].split(" ")[0].split("/")[0]
    tripStart = datetime.strptime(x[1], "%m/%d/%Y %I:%M:%S %p")
    tripEnd = datetime.strptime(x[2], "%m/%d/%Y %I:%M:%S %p")
    tripTime = (tripEnd - tripStart).total_seconds()
    
    return (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16], x[17], month, str(tripTime))

In [None]:
green_taxi = sc.textFile("2020_Green_Taxi_Trip_Data.csv", 1)
yellow_taxi = sc.textFile("2020_Yellow_Taxi_Trip_Data.csv", 1)

green_taxi = green_taxi.mapPartitions(lambda x : reader(x)).map(mapGreenTaxi)
yellow_taxi = green_taxi.mapPartitions(lambda x : reader(x)).map(mapYellowTaxi)

taxi = green_taxi.filter(filterHeaders).map(removeBlankNulls).filter(removeInvalidValues).filter(validateFares).map(mapExtraInfo)

In [None]:
# Output all the taxi data (only if needed)
outputTaxi = taxi.map(formatAsCSV)
outputTaxi.saveAsTextFile("2020_Taxi_Trip_Data.out")

In [None]:
# Final csv should be styled as such
# ('month of trip', 'trip length in seconds', 'passenger_count', 'trip_distance', 'RatecodeID', 'total_amount')

# Remove data that we might not need for our analysis
def removeMiscData(x):
    return (x[18], x[19], x[3], x[4], x[5], x[16])

In [None]:
# Output the reduced taxi data (preferred)
reduced_taxi = taxi.map(removeMiscData)
outputRedTaxi = reduced_taxi.map(formatAsCSV)
outputRedTaxi.saveAsTextFile("reduced_2020_Taxi_Trip_Data.out")

In [None]:
#Not needed in Zeppelin
sc.close()