In [1]:
from pyspark.sql import SparkSession, functions as F

# Cell to create a spark session
spark = (
    SparkSession.builder.appName("MAST30034 ASSIGNMENT 1 DUSTIN")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

22/08/26 01:27:12 WARN Utils: Your hostname, DESKTOP-3ADPNV0 resolves to a loopback address: 127.0.1.1; using 172.25.27.116 instead (on interface eth0)
22/08/26 01:27:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/26 01:27:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/26 01:27:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/08/26 01:27:16 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
# read in the raw files

yellow = spark.read.parquet('../../mast30034-project-1-dustintano10/data/raw/yellow_taxi/')

                                                                                

In [3]:
# number of rows before preprocessing

yellow.count()

                                                                                

55274200

In [4]:
# create a new column called day_of_week to get day of week for the trip and is_weekend which identify if record is a 
# weekend or not
# drop day_of_week since we only want to know if its weekend or not

from pyspark.sql.functions import *
yellow = yellow.withColumn("day_of_week", date_format(col("tpep_pickup_datetime"),"E"))

yellow = yellow.withColumn("is_weekend", col("day_of_week").isin(["Sat", "Sun"]).cast("boolean"))

yellow = yellow.drop("day_of_week")

In [5]:
# separates the date and time from both the pickup and dropoff date time columns
# creates new columns for pickup/dropoff date

yellow = yellow.withColumn("pickup_date",
                 to_date(col("tpep_pickup_datetime"),"yyyy-MM-dd"))

yellow = yellow.withColumn("dropoff_date",
                 to_date(col("tpep_dropoff_datetime"),"yyyy-MM-dd"))

# ensure passenger_count is not 0
yellow = yellow.where( (F.col('passenger_count') > 0) )

#drop columns that are not important for analysis
yellow = yellow.drop("extra", "mta_tax", "congestion_surcharge", "airport_fee", "improvement_surcharge",
                    "passenger_count", "store_and_fwd_flag", )


In [6]:
# filter out all other payment_types as tips are only counted with credit card payment
yellow_credit = yellow.filter(F.col('payment_type') == 1)

# remove records that has trips starting before the month of october
yellow_credit = yellow_credit.filter(F.col('pickup_date') >= '2018-10-01')

# remove other RatecodeID's as they make up such a small amount of the total dataset
yellow_credit = yellow_credit.where( (F.col('RatecodeID') == 1) | (F.col('RatecodeID') == 2))

# remove VendorID not being 1 or 2
yellow_credit = yellow_credit.where( (F.col('VendorID') > 0 ) & (F.col('VendorID') < 3))



In [7]:
# remove records which don't follow the initial amount of 2.5 from fare_amount

yellow_credit = yellow_credit.where(F.col('fare_amount') >= 2.5)

# remove records where trip distance is 0

yellow_credit = yellow_credit.where(F.col('trip_distance') > 0)

# remove records where the PU and DO location is not in the range

yellow_credit = yellow_credit.where( ( F.col('PULocationID') < 264 ) & (F.col('DOLocationID') < 264) & 
                                    ( F.col('PULocationID') > 0) & (F.col('DOLocationID') > 0))

In [8]:
# create a length of trip column in mins

yellow_credit = yellow_credit.withColumn('trip_length', 
                         round((unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime'))/60, 4))

# filters out trips that are negative and less than 2 minutes in time length
yellow_credit = yellow_credit.where( (F.col('trip_length') > 2 ) )


In [9]:
# removed outliers for fare_amount
# amounts for the upper and lower quantile is from the percentile_approx
# for some reason it may change

yellow_credit.select( percentile_approx("fare_amount", [0.25, 0.75], 10000).alias("quantiles_fare") )


upper_q_fare = 15.0
lower_q_fare = 6.5

IQ_fare = upper_q_fare-lower_q_fare

borderline_upper_fare = upper_q_fare + (1.5 * IQ_fare)
borderline_lower_fare = lower_q_fare - (1.5 * IQ_fare)

yellow_credit = yellow_credit.where( (F.col('fare_amount') <= borderline_upper_fare ) & 
                                       (F.col('fare_amount') >= borderline_lower_fare) )




In [10]:
# number of records before being merged
yellow_credit.count()

                                                                                

33711796

In [11]:
# Here we load the curated nba_attendance and convert the Date column into date type
# Then we convert the whole pandas dataframe into a spark dataframe 
import pandas as pd
from pyspark.sql.types import *

nba_attendance = pd.read_csv('../../mast30034-project-1-dustintano10/data/curated/nba_attendance_new.csv')

nba_attendance['Date'] = pd.to_datetime(nba_attendance['Date'], format='%Y%m%d')

schema = StructType([
StructField("Date", DateType(), True),
StructField("Start(ET)", StringType(), True),
StructField("Attendance", IntegerType(), True),
StructField("Win", StringType(), True),
StructField("margin_victory/loss", IntegerType(), True),
])

nba_attendance_spark = spark.createDataFrame(nba_attendance, schema)

In [12]:
# joins the nba attendance with the yellow_credit dataframe
yellow_credit = yellow_credit.join(nba_attendance_spark, yellow_credit.pickup_date == nba_attendance_spark.Date, 'left')

In [13]:
# filters the data to only be records where the knicks are playing
yellow_credit = yellow_credit.filter( F.col('Date').isNotNull() )

# filter records where pickup_date and dropoff_date is not the same as we are focusing just on days the knicks are playing
# they also only make a small percentage of the data

yellow_credit = yellow_credit.where((F.col("pickup_date") == F.col("dropoff_date")))


In [14]:
#drop the even more irrelevant columns
yellow_credit = yellow_credit.drop("VendorID", "payment_type", "RatecodeID", "pickup_date", "dropoff_date")

In [15]:
# number of rows after preprocessing
yellow_credit.count()

                                                                                

6405294

In [17]:
# converts the booleans into 0 and 1's
# convert the location ID's into strings for one hot encoding

yellow_credit = yellow_credit.withColumn('pickup_hour', hour("tpep_pickup_datetime"))

yellow_credit = yellow_credit.withColumn('dropoff_hour', hour("tpep_dropoff_datetime"))

yellow_credit = yellow_credit.withColumn('is_weekend_binary', F.when(yellow_credit.is_weekend == 'false', 0).otherwise(1))

yellow_credit = yellow_credit.withColumn('Win_binary', F.when(yellow_credit.Win == 'No', 0).otherwise(1))

In [18]:
# converted Start (ET) into the values 1-6

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


categorical = ['Start(ET)']

indexers = []

for column in categorical:
    indexers.append(StringIndexer(inputCol=column, outputCol=column+"_NUMERIC").fit(yellow_credit))
    
pipeline = Pipeline(stages=indexers)
yellow_indexed = pipeline.fit(yellow_credit).transform(yellow_credit)
yellow_indexed.limit(10)

                                                                                

tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,PULocationID,DOLocationID,fare_amount,tip_amount,tolls_amount,total_amount,is_weekend,trip_length,Date,Start(ET),Attendance,Win,margin_victory/loss,pickup_hour,dropoff_hour,is_weekend_binary,Win_binary,Start(ET)_NUMERIC
2018-12-25 00:39:10,2018-12-25 01:04:33,2.56,162,263,16.5,4.58,0.0,22.88,False,25.3833,2018-12-25,12:00,19812,No,-14,0,1,0,0,1.0
2018-12-25 01:15:08,2018-12-25 08:45:04,7.85,140,213,26.0,5.56,0.0,33.36,False,449.9333,2018-12-25,12:00,19812,No,-14,1,8,0,0,1.0
2018-12-25 09:08:51,2018-12-25 09:13:27,0.9,163,162,5.0,1.45,0.0,7.25,False,4.6,2018-12-25,12:00,19812,No,-14,9,9,0,0,1.0
2018-12-25 10:29:03,2018-12-25 10:31:40,0.63,100,48,4.0,0.96,0.0,5.76,False,2.6167,2018-12-25,12:00,19812,No,-14,10,10,0,0,1.0
2018-12-25 10:58:19,2018-12-25 11:14:08,8.08,161,13,23.5,0.0,0.0,24.3,False,15.8167,2018-12-25,12:00,19812,No,-14,10,11,0,0,1.0
2018-12-25 15:52:23,2018-12-25 16:07:03,2.13,162,142,11.5,1.0,0.0,13.3,False,14.6667,2018-12-25,12:00,19812,No,-14,15,16,0,0,1.0
2018-12-25 00:31:15,2018-12-25 00:42:04,2.88,230,236,11.0,2.36,0.0,14.16,False,10.8167,2018-12-25,12:00,19812,No,-14,0,0,0,0,1.0
2018-12-25 00:51:53,2018-12-25 00:58:03,0.98,161,229,6.0,1.36,0.0,8.16,False,6.1667,2018-12-25,12:00,19812,No,-14,0,0,0,0,1.0
2018-12-25 03:53:56,2018-12-25 04:06:21,1.64,246,107,9.5,0.0,0.0,10.3,False,12.4167,2018-12-25,12:00,19812,No,-14,3,4,0,0,1.0
2018-12-25 17:03:49,2018-12-25 17:07:48,0.83,237,141,5.0,2.0,0.0,7.8,False,3.9833,2018-12-25,12:00,19812,No,-14,17,17,0,0,1.0


In [19]:
# saves the new yellow_credit dataframe
yellow_indexed.write.mode('overwrite').parquet('../../mast30034-project-1-dustintano10/data/curated/yellow/yellow_indexed')

                                                                                