### Prophet
<a href="https://facebook.github.io/prophet/" target="_blank">Facebook's Prophet</a> is widely considered the easiest way to forecast because it generally does all the heavy lifting for the user. Let's take a look at how Prophet works with a monthly sales revenue dataset.

References:
- <a href="https://cran.r-project.org/web/packages/prophet/prophet.pdf" target="_blank">Prophet Librady Docs (R)</a>

## Additional Prophet Features for the Final Project
- [Diagnostics](http://facebook.github.io/prophet/docs/diagnostics.html) will be helpful information to understand hyperparams to be tuned using an appropriate parallelization mechanism like threads
```
param_grid = {  
    'changepoint_prior_scale': [0.001, 0.01, 0.1, 0.5],
    'seasonality_prior_scale': [0.01, 0.1, 1.0, 10.0],
}
```

- [Regressors](https://facebook.github.io/prophet/docs/seasonality,_holiday_effects,_and_regressors.html) will be helpful for things like weather affect on forecast

In [0]:
import mlflow
import json
import pandas as pd
import numpy as np
from prophet import Prophet, serialize
from prophet.diagnostics import cross_validation, performance_metrics

# Visualization
import seaborn as sns
import matplotlib.pyplot as plt

# Hyperparameter tuning
import itertools


SOURCE_DATA = (
    "https://raw.githubusercontent.com/facebook/prophet/master/examples/example_retail_sales.csv"
)
ARTIFACT_PATH = "Gxx-model"
np.random.seed(12345)

## Helper routine to extract the parameters that were used to train a specific instance of the model
def extract_params(pr_model):
    return {attr: getattr(pr_model, attr) for attr in serialize.SIMPLE_ATTRIBUTES}


sales_data = pd.read_csv(SOURCE_DATA)
print(sales_data.head())
print(sales_data.shape)
print(f"{len(sales_data)} months of sales data loaded ({round(len(sales_data)/12,2)} years)")

# Visualize data using seaborn
sns.set(rc={'figure.figsize':(12,8)})
sns.lineplot(x=sales_data['ds'], y=sales_data['y'])
plt.legend(['Sales Data'])

In [0]:
sales_data.head(10)

In [0]:
df = spark.sql('select * from silver_station_status_dynamic')

In [0]:
display(df)

In [0]:
spark.sql('use database g04_db')#--------------------------------------------#
# Baseline Model Using Default Hyperparameters
# - Horizon - period over which we forecast
# - Initial - amount of initial training data
# - Period - time between cutoffs (usually H/2)
# - Cutoff - beginning of the Horizon forecast period
#--------------------------------------------#

# Initiate the model
baseline_model = Prophet()
print(sales_data.shape)
# Fit the model on the training dataset
baseline_model.fit(sales_data)

# Cross validation
baseline_model_cv = cross_validation(model=baseline_model, initial='710 days', period='180 days', horizon = '365 days', parallel="threads")
baseline_model_cv.head()

# Model performance metrics
baseline_model_p = performance_metrics(baseline_model_cv, rolling_window=1)
baseline_model_p.head()

# Get the performance value
print(f"MAPE of baseline model: {baseline_model_p['mape'].values[0]}")

In [0]:
#--------------------------------------------#
# Automatic Hyperparameter Tuning
#--------------------------------------------#

# Set up parameter grid
param_grid = {  
    'changepoint_prior_scale': [0.001],  # , 0.05, 0.08, 0.5
    'seasonality_prior_scale': [0.01],  # , 1, 5, 10, 12
    'seasonality_mode': ['additive', 'multiplicative']
}
  
# Generate all combinations of parameters
all_params = [dict(zip(param_grid.keys(), v)) for v in itertools.product(*param_grid.values())]

print(f"Total training runs {len(all_params)}")

# Create a list to store MAPE values for each combination
mapes = [] 

# Use cross validation to evaluate all parameters
for params in all_params:
    with mlflow.start_run(): 
        # Fit a model using one parameter combination + holidays
        m = Prophet(**params) 
        holidays = pd.DataFrame({"ds": [], "holiday": []})
        m.add_country_holidays(country_name='US')
        m.fit(sales_data) 

        # Cross-validation
        df_cv = cross_validation(model=m, initial='710 days', period='180 days', horizon = '365 days', parallel="threads")
        # Model performance
        df_p = performance_metrics(df_cv, rolling_window=1)

        metric_keys = ["mse", "rmse", "mae", "mape", "mdape", "smape", "coverage"]
        metrics = {k: df_p[k].mean() for k in metric_keys}
        params = extract_params(m)

        print(f"Logged Metrics: \n{json.dumps(metrics, indent=2)}")
        print(f"Logged Params: \n{json.dumps(params, indent=2)}")

        mlflow.prophet.log_model(m, artifact_path=ARTIFACT_PATH)
        mlflow.log_params(params)
        mlflow.log_metrics(metrics)
        model_uri = mlflow.get_artifact_uri(ARTIFACT_PATH)
        print(f"Model artifact logged to: {model_uri}")

        # Save model performance metrics for this combination of hyper parameters
        mapes.append((df_p['mape'].values[0],model_uri))
        

In [0]:
# Tuning results
tuning_results = pd.DataFrame(all_params)
tuning_results['mape'] = list(zip(*mapes))[0]
tuning_results['model']= list(zip(*mapes))[1]

best_params = dict(tuning_results.iloc[tuning_results[['mape']].idxmin().values[0]])

print(json.dumps(best_params, indent=2))

In [0]:
loaded_model = mlflow.prophet.load_model(best_params['model'])

forecast = loaded_model.predict(loaded_model.make_future_dataframe(36, freq="m"))

print(f"forecast:\n${forecast.tail(40)}")

In [0]:
prophet_plot = loaded_model.plot(forecast)

In [0]:
prophet_plot2 = loaded_model.plot_components(forecast)

In [0]:
results=forecast[['ds','yhat']].join(sales_data, lsuffix='_caller', rsuffix='_other')
results['residual'] = results['yhat'] - results['y']

In [0]:
#plot the residuals
fig = px.scatter(
    results, x='yhat', y='residual',
    marginal_y='violin',
    trendline='ols',
)
fig.show()

In [0]:
model_details = mlflow.register_model(model_uri=best_params['model'], name=ARTIFACT_PATH)

In [0]:
from mlflow.tracking.client import MlflowClient

client = MlflowClient()

In [0]:
client.transition_model_version_stage(

  name=model_details.name,

  version=model_details.version,

  stage='Staging',

)

In [0]:
model_version_details = client.get_model_version(
  name=model_details.name,
  version=model_details.version,
)
print("The current model stage is: '{stage}'".format(stage=model_version_details.current_stage))

In [0]:
latest_version_info = client.get_latest_versions(ARTIFACT_PATH, stages=["Staging"])

latest_staging_version = latest_version_info[0].version

print("The latest staging version of the model '%s' is '%s'." % (ARTIFACT_PATH, latest_staging_version))

In [0]:
model_staging_uri = "models:/{model_name}/staging".format(model_name=ARTIFACT_PATH)

print("Loading registered model version from URI: '{model_uri}'".format(model_uri=model_staging_uri))

model_staging = mlflow.prophet.load_model(model_staging_uri)

In [0]:
model_staging.plot(model_staging.predict(model_staging.make_future_dataframe(36, freq="m")))