Now that we have cleaned the data and have it in our desired form, we can now begin joining with our external datasets.

These datasets include: 
- Weather (daily temperature) [https://www.ncdc.noaa.gov/cdo-web/search]
- Public/special Holidays 
- Covid-19 case dataset

In [1]:
# Open Spark session 
from pyspark.sql import SparkSession, functions as F


# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("Data_Explorer")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

22/08/23 00:19:32 WARN Utils: Your hostname, James-N580VD-DM229T resolves to a loopback address: 127.0.1.1; using 172.27.35.106 instead (on interface eth0)
22/08/23 00:19:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/23 00:19:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
taxi_data = spark.read.parquet('../data/curated/taxi_data.parquet')

                                                                                

# Join & reformat attributes Datasets
Now, we can join the weather/climate dataset, covid recordings as well as the public holiday one

In [3]:
# load the data 
import pandas as pd
daily_climate = pd.read_csv('../data/curated/Hourly_climate_processed.csv', index_col=[0])
holiday_data = pd.read_csv('../data/curated/holiday_data.csv', index_col=[0])
covid_data = pd.read_csv('../data/curated/COVID-19_7-AVG.csv', index_col=[0])

In [4]:
# convert to spark
daily_climate = spark.createDataFrame(daily_climate) 
holiday_data = spark.createDataFrame(holiday_data)
covid_data = spark.createDataFrame(covid_data)

In [5]:
#convert to universal datetime type for all datasets 
daily_climate = daily_climate.withColumnRenamed('DATE', 'Date_climate')
daily_climate = daily_climate.withColumnRenamed('Hour', 'Hour_climate')
holiday_data = holiday_data.withColumnRenamed('date', 'Date')
covid_data = covid_data.withColumnRenamed('date_of_interest', 'Date')
covid_data = covid_data.withColumnRenamed('ALL_CASE_COUNT_7DAY_AVG', 'Covid_7AVG')

In [6]:
# now we can left join on our dataset
taxi_dataset = taxi_data.join(daily_climate, on = 'hourly_timestamp', how = 'left')
taxi_dataset = taxi_dataset.join(holiday_data, on = 'Date', how = 'left')
taxi_dataset = taxi_dataset.join(covid_data, on = 'Date', how = 'left')

In [7]:
# need to remove the duplicate columns 
taxi_dataset = taxi_dataset.drop('Date_climate')

In [8]:
taxi_dataset = taxi_dataset.fillna({'Holiday': False})
# might also be good to just set all holidays exept christmas to public

In [9]:
# For Hour, as it will be treated as a factor variable (catergorical), this needs to be seperated
from pyspark.sql.functions import *
taxi_dataset = taxi_dataset.withColumn('Hour', hour('hourly_timestamp'))
taxi_dataset_final = taxi_dataset.drop('hourly_timestamp')

In [10]:
# Our table is in final form, and is ready for modeling
# Save the data

taxi_dataset_final.toPandas().to_csv('../Data/curated/model_data.csv', index=False)

                                                                                