In [160]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [27]:


# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "2g")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)

In [28]:
sdf_green = spark.read.parquet('../data/green_data/')

In [29]:
sdf_green.count()

868547

In [32]:
sdf_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [31]:
sdf_green.limit(5)

VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
2,2021-09-30 18:39:03,2021-09-30 18:39:06,N,5.0,37,37,1.0,0.02,10.0,0.0,0.0,0.0,0.0,,0.3,10.3,1.0,2.0,0.0
2,2021-10-01 00:47:50,2021-10-01 01:00:04,N,5.0,92,82,2.0,3.44,20.0,0.0,0.0,0.0,0.0,,0.3,20.3,2.0,2.0,0.0
2,2021-10-01 00:23:10,2021-10-01 00:34:26,N,5.0,41,167,2.0,2.44,13.0,0.0,0.0,0.0,0.0,,0.3,13.3,2.0,2.0,0.0
2,2021-10-01 00:37:35,2021-10-01 00:43:37,N,1.0,134,135,1.0,1.67,7.0,0.5,0.5,0.0,0.0,,0.3,8.3,1.0,1.0,0.0
2,2021-10-01 00:43:28,2021-10-01 00:48:26,N,1.0,119,247,1.0,0.99,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2.0,1.0,0.0


In [110]:
relevant_cols = ('lpep_pickup_datetime', 'PULocationID', 'total_amount', 'tip_amount')
sdf_green_cur = sdf_green.select(*relevant_cols)
sdf_green_cur

868547

In [151]:
# Split the datetime to days(Mon,Tue,...,Sun) and Day(3am-6pm)/ Night(6pm -3am)
# 1 - Sunday, 2-Monday, ..., 7-Saturday

sdf_green_pre = sdf_green_cur.withColumn('day', F.dayofweek('lpep_pickup_datetime'))
sdf_green_pre = sdf_green_pre.withColumn('time', F.date_format('lpep_pickup_datetime', 'HH:mm:ss'))
sdf_green_pre = sdf_green_pre.withColumn('fare', F.round(F.col('total_amount')-F.col('tip_amount'),2))

relevant_cols = ('lpep_pickup_datetime', 'PULocationID', 'fare', 'day')
sdf_green_pre2 = sdf_green_pre.select(*relevant_cols)
sdf_green_pre3 = sdf_green_pre2.filter(F.col('fare')>0)
sdf_green_pre3 = sdf_green_pre3.withColumn('Date',F.to_date('lpep_pickup_datetime'))
sdf_green_pre3 = sdf_green_pre3.withColumn('hour' , F.date_format('lpep_pickup_datetime',"yyyy-MM-dd HH"))

# Now we want data aggregated to hours based on location
sdf_green_pre4 = sdf_green_pre3.groupBy('PULocationID','hour') \
                 .agg(
                        F.mean('fare').alias('fare'),
                        F.last('day').alias('day'),
                        F.last('Date').alias('Date')
                    ) \
                 .orderBy('hour')
sdf_green_pre5 = sdf_green_pre4.filter(F.col('Date') >='2021-07-01')
sdf_green_pre5.limit(5)

PULocationID,hour,fare,day,Date
74,2021-07-01 00,10.15,5,2021-07-01
140,2021-07-01 00,37.39,5,2021-07-01
24,2021-07-01 00,12.05,5,2021-07-01
7,2021-07-01 00,73.35,5,2021-07-01
127,2021-07-01 00,16.05,5,2021-07-01


In [56]:
# Now we import the weather dataset to be added
weather = spark.read.option("header",True).csv("../data/Weather.csv")
weather.head()

Row(STATION='USC00280907', NAME='BOONTON 1 SE, NJ US', DATE='2021-07-01', PRCP='0.23', SNOW='0.0', SNWD='0.0', TAVG=None, TOBS='73', WESD=None, WT01=None, WT02=None, WT03=None, WT04=None, WT05=None, WT06=None, WT08=None, WT09=None, WT11=None)

In [81]:
# Only want the 3 stations near New York City
rel_cols = ('NAME', 'DATE', 'TAVG')
weather_NYC = weather.select(*rel_cols).filter((F.col('NAME') == 'JFK INTERNATIONAL AIRPORT, NY US')
                                               |(F.col('NAME') == 'NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US')
                                               |(F.col('NAME') == 'NY CITY CENTRAL PARK, NY US')
                                              )

weather_NYC.filter(F.col('NAME') == 'NY CITY CENTRAL PARK, NY US').limit(5)

NAME,DATE,TAVG
NY CITY CENTRAL P...,2021-07-01,
NY CITY CENTRAL P...,2021-07-02,
NY CITY CENTRAL P...,2021-07-03,
NY CITY CENTRAL P...,2021-07-04,
NY CITY CENTRAL P...,2021-07-05,


In [118]:
# As NYC Central Park does not have data for temperature, we will use Newark and JFK instead
# Using the appropriated percentage according to weatherspark, we will split the weightage of 58% of central park accordingly to Newark and JFK (2:1 ratio)
# So Newark is 67% while JFK contributes 33%

weather_NYC = weather.select(*rel_cols).filter((F.col('NAME') == 'JFK INTERNATIONAL AIRPORT, NY US')
                                               |(F.col('NAME') == 'NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US')
                                              )
weather_NYC = weather_NYC.withColumn('WTAVG', F.when((F.col('NAME') == 'NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US'),
                                                        (F.col('TAVG')*2/3).cast('int')
                                                    ).otherwise((F.col('TAVG')/3)).cast('int')
                                    )
weather_NYC = weather_NYC.withColumn('Date', F.to_date('DATE'))
weather_NYC.limit(5)

NAME,Date,TAVG,WTAVG
NEWARK LIBERTY IN...,2021-07-01,80,53
NEWARK LIBERTY IN...,2021-07-02,75,50
NEWARK LIBERTY IN...,2021-07-03,65,43
NEWARK LIBERTY IN...,2021-07-04,71,47
NEWARK LIBERTY IN...,2021-07-05,77,51


In [157]:
# Now we aggregate the weighted average together
cols = ('Date', 'WTAVG')
weather_NYC_cur = weather_NYC.select(*cols)
agg_NYC_weather = weather_NYC_cur.groupby('Date') \
                                 .agg(
                                    F.sum('WTAVG').alias('temp')
                                  )
agg_NYC_weather.limit(5)

Date,temp
2021-08-27,83
2021-10-11,66
2021-11-13,52
2021-12-18,45
2022-03-28,29


In [158]:
# Now we want to add the temperature accordingly to the dates in the main dataframe

merged_sdf = sdf_green_pre5.join(agg_NYC_weather, on='Date', how='left')
merged_sdf.orderBy('Date')

Date,PULocationID,hour,fare,day,temp
2021-07-01,174,2021-07-01 11,33.465,5,78
2021-07-01,236,2021-07-01 13,10.675,5,78
2021-07-01,241,2021-07-01 22,26.74,5,78
2021-07-01,121,2021-07-01 11,23.30666666666667,5,78
2021-07-01,166,2021-07-01 10,14.22,5,78
2021-07-01,116,2021-07-01 17,13.8,5,78
2021-07-01,222,2021-07-01 11,37.2,5,78
2021-07-01,228,2021-07-01 14,27.0,5,78
2021-07-01,61,2021-07-01 10,26.096666666666668,5,78
2021-07-01,254,2021-07-01 09,16.93,5,78


In [159]:
merged_sdf.write.mode('overwrite').parquet('../data/curated/result')