## Initialization

### Import Libraries

In [1]:
import pandas as pd
import numpy as np
import pandas_gbq
import datetime as dt
from collections import defaultdict
import time
import datetime as dt
from pytz import timezone
tz = timezone('EST')
from tqdm import tqdm

In [2]:
# PySpark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [3]:
# FBProphet Modelling
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
from prophet.plot import add_changepoints_to_plot

In [4]:
# sklearn metrics
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, mean_squared_error

In [5]:
# data visualization
import seaborn as sns
%config InlineBackend.figure_format = 'retina'
%matplotlib inline
from matplotlib import pyplot as plt
from matplotlib import style
sns.set()

### Default constants

In [6]:
BUCKET_NAME = "homework_rl3154_data"
PROJECT_BUCKET = "project"
FOLDER_NAME = "forecast_data"
BIG_QUERY_TABLE_NAME = "project_dataset.agg_electricity_load_data_hourly"


### Spark initialization

In [7]:
conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName("spark-bigquery-prophet")
# BigQuery Connector
conf.set("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.22.0")
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)

In [8]:
sqlContext._sc.version

'3.1.2'

# Get Data

In [9]:
sql_query = f"""
    SELECT
        zone_name
        , hourly_timestamp AS ds
        , SUM(sum_rtd_actual_load) AS y
    FROM {BIG_QUERY_TABLE_NAME}
    /*
    WHERE
        hourly_timestamp >= '2018-01-01'
        AND hourly_timestamp <= '2020-12-31'
        AND zone_name in ('N.Y.C.', 'CAPITL')
    */
    GROUP BY 
        1,2
"""
load_df = pd.read_gbq(sql_query)

In [10]:
load_df['ds'] = pd.to_datetime(load_df['ds']).apply(lambda x: x.replace(tzinfo=None))

load_df

Unnamed: 0,zone_name,ds,y
0,WEST,2018-01-01 00:00:00,22213.3
1,WEST,2018-01-01 01:00:00,21540.5
2,WEST,2018-01-01 02:00:00,21201.3
3,WEST,2018-01-01 03:00:00,21163.6
4,WEST,2018-01-01 04:00:00,20885.4
...,...,...,...
1140882,N.Y.C.,2017-12-31 19:00:00,84250.0
1140883,N.Y.C.,2017-12-31 20:00:00,82599.6
1140884,N.Y.C.,2017-12-31 21:00:00,80392.6
1140885,N.Y.C.,2017-12-31 22:00:00,91022.5


In [11]:
## Convert to Spark RDD
spark_df = sqlContext.createDataFrame(load_df)

# Modelling

In [12]:
### Modelling function

def forecast_load(history_pd: pd.DataFrame) -> pd.DataFrame: 
    
    # remove missing values (more likely at high granularity level)
    history_pd = history_pd.dropna()
    
    # instantiate the model, configure the parameters
    model = Prophet(
        changepoint_prior_scale=0.5, 
        seasonality_mode='multiplicative', 
        interval_width=0.95, 
    )
    model.add_country_holidays(country_name='US')
    
    # fit the model
    model.fit(history_pd)
    
    # configure predictions
    future_pd = model.make_future_dataframe(
        periods=24*30, 
        freq='H',
        include_history=True
    )
    
    # make predictions
    forecast_pd = model.predict(future_pd)
    
    
    # ASSEMBLE EXPECTED RESULT SET
    # --------------------------------------
    # get relevant fields from forecast
    f_pd = forecast_pd[[
        'ds','yhat', 'yhat_upper', 'yhat_lower', 'trend', 'holidays', 'yearly', 'daily', 'weekly'
    ]]
    # get relevant fields from history
    h_pd = history_pd[['ds','zone_name','y']]
    
    # left join
    results_pd = pd.merge(f_pd, h_pd, how='left', on=['ds'])
    results_pd['zone_name'] = results_pd['zone_name'].fillna(method="ffill")
    
    # return predictions
    return results_pd

In [13]:
# define function to calculate metrics

def evaluate_forecast(evaluation_pd: pd.DataFrame) -> pd.DataFrame:
    
    # drop NA from in incoming data set
    evaluation_pd = evaluation_pd.dropna()
    
    #zone_name = (evaluation_pd['zone_name'].unique())[0]
    zone_name = evaluation_pd['zone_name'].iloc[0]
    
    training_date = dt.datetime.now(tz).strftime('%Y-%m-%d')

    # calulate evaluation metrics
    mae = mean_absolute_error(evaluation_pd['y'], evaluation_pd['yhat'])
    mse = mean_squared_error(evaluation_pd['y'], evaluation_pd['yhat'])
    mape = mean_absolute_percentage_error(evaluation_pd['y'], evaluation_pd['yhat'])
    rmse = np.sqrt(mse)
    
    # assemble result set
    results = {
        'training_date':[training_date], 
        'zone_name':[zone_name], 
        'mae':[mae], 
        'mse':[mse], 
        'mape':[mape], 
        'rmse':[rmse]
    }

    return pd.DataFrame(results)

# Training loop

## Get Forecast Results

In [14]:
# Schema of FBProphet Results
result_schema =StructType([
    StructField('ds', TimestampType()),
    StructField('zone_name', StringType()),
    StructField('y', FloatType()),
    StructField('yhat', FloatType()),
    StructField('yhat_upper', FloatType()),
    StructField('yhat_lower', FloatType()),
    StructField('trend', FloatType()),
    StructField('holidays', FloatType()),
    StructField('yearly', FloatType()),
    StructField('daily', FloatType()),
    StructField('weekly', FloatType()),
])

In [15]:
forecast = (
    spark_df
    .groupBy("zone_name")
    .applyInPandas(forecast_load, schema=result_schema)
)
forecast.createOrReplaceTempView('new_forecasts')

In [None]:
forecast_df = forecast.toPandas()

21/12/09 17:31:47 WARN org.apache.spark.scheduler.TaskSetManager: Stage 0 contains a task of very large size (12282 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [None]:
forecast_df

Unnamed: 0,ds,zone_name,y,yhat,yhat_upper,yhat_lower,trend,holidays,yearly,daily,weekly
0,2010-01-01 00:00:00,CAPITL,12518.099609,12613.607422,16561.296875,9120.713867,14812.665039,-0.095238,0.056130,-0.130641,0.021291
1,2010-01-01 01:00:00,CAPITL,13092.299805,11989.854492,15666.069336,8186.646973,14813.125000,-0.095238,0.056180,-0.172363,0.020830
2,2010-01-01 02:00:00,CAPITL,12562.200195,11588.465820,15455.477539,7901.891602,14813.585938,-0.095238,0.056229,-0.199018,0.020314
3,2010-01-01 03:00:00,CAPITL,12101.299805,11466.584961,15152.679688,7877.807129,14814.046875,-0.095238,0.056278,-0.206743,0.019738
4,2010-01-01 04:00:00,CAPITL,11989.799805,11715.201172,15239.740234,8260.172852,14814.506836,-0.095238,0.056327,-0.189391,0.019095
...,...,...,...,...,...,...,...,...,...,...,...
1148802,2021-11-30 20:00:00,MILLWD,,4885.288086,6073.982422,3612.519531,4080.098145,0.000000,0.001962,0.172233,0.023151
1148803,2021-11-30 21:00:00,MILLWD,,4621.094238,5895.692383,3381.287354,4080.104492,0.000000,0.002109,0.107065,0.023418
1148804,2021-11-30 22:00:00,MILLWD,,4264.531250,5530.609863,2964.978516,4080.110840,0.000000,0.002257,0.019235,0.023708
1148805,2021-11-30 23:00:00,MILLWD,,3896.691162,5133.661621,2598.658447,4080.117188,0.000000,0.002405,-0.071380,0.024019


In [None]:
# Save to GCS
forecast_df.to_pickle(f"gs://{BUCKET_NAME}/{PROJECT_BUCKET}/{FOLDER_NAME}/forecast_res-{dt.datetime.now(tz).strftime('%Y-%m-%d')}.pkl")


## Forecast Evaluation

In [19]:
eval_df = forecast_df.groupby('zone_name').apply(evaluate_forecast).reset_index(drop=True)

In [20]:
eval_df

Unnamed: 0,training_date,zone_name,mae,mse,mape,rmse
0,2021-12-09,CAPITL,1338.88147,3543972.0,0.079469,1882.543945
1,2021-12-09,CENTRL,1629.996704,5235488.0,0.07201,2288.118896
2,2021-12-09,DUNWOD,801.376038,1314109.0,0.092441,1146.345947
3,2021-12-09,GENESE,1127.091919,2572215.0,0.0801,1603.812744
4,2021-12-09,HUD VL,1307.080322,3498391.0,0.093496,1870.398682
5,2021-12-09,LONGIL,3057.967041,19472320.0,0.100375,4412.745605
6,2021-12-09,MHK VL,874.259399,1425834.0,0.07986,1194.082764
7,2021-12-09,MILLWD,464.994049,407818.3,0.117714,638.606567
8,2021-12-09,N.Y.C.,5754.780762,68619350.0,0.077383,8283.679688
9,2021-12-09,NORTH,356.26062,290110.3,0.049971,538.618896


In [21]:
# Save to GCS
eval_df.to_pickle(f"gs://{BUCKET_NAME}/{PROJECT_BUCKET}/{FOLDER_NAME}/forecast_eval-{dt.datetime.now(tz).strftime('%Y-%m-%d')}.pkl")
