## Generate UDTF for Parallel Hyperparameter Tuning in Snowflake

This notebook is based on an example described in [Building and deploying a time series forecast with Hex + Snowflake](https://quickstarts.snowflake.com/guide/hex/index.html#0). This entire example higlights how we can use Snowflake to perform parallel hyperparameter to forecast store foot traffic. Please take a look at Chase Romano's article [Parallel Hyperparameter tuning using Snowpark](https://medium.com/snowflake/parallel-hyperparameter-tuning-using-snowpark-53cdec2faf77) for more information.

We will begin by establishing our Snowflake connection and Snowpark session. This demo assumes the user has access to the `SYSADMIN` role and a virtual warehouse named `COMPUTE_WH` exists and is available for usage. 

We're assuming the tables `CALENDAR_INFO`, `HOURLY_TRAFFIC`, and `MODEL_FEATURES` have already been created and populated with data based on the [1-data-ingestion.ipynb](1-data-ingestion.ipynb) and [2-create-feature-table.ipynb](2-create-feature-table.ipynb) notebook in this repository.

In [None]:
import os
from datetime import date, datetime, timedelta

import pandas as pd
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
import xgboost as xgb
from snowflake.snowpark import Session

connection_params = {
    "account": os.environ.get("SNOWFLAKE_ACCOUNT"),
    "user": os.environ.get("SNOWFLAKE_USER"),
    "password": os.environ.get("SNOWFLAKE_PASSWORD"),
    "database": os.environ.get("SNOWFLAKE_DATABASE"),
    "schema": os.environ.get("SNOWFLAKE_SCHEMA"),
    "role": "SYSADMIN",
    "warehouse": "COMPUTE_WH",
}

session = Session.builder.configs(connection_params).create()

Let's look at the table we're using for our features.

In [None]:
session.table("MODEL_FEATURES").show()

### Create our UDTF for parallel hyperparameter tuning. 

Since this will be a permanant function, we're going to need to use a stage. We'll use an internal stage named `python_models` where the Python file and its dependencies will be uploaded. 

We will go ahead and create it first in the event it does not exist. 

In [None]:
session.sql("CREATE STAGE IF NOT EXISTS PYTHON_MODELS").collect()

Now we can create our UDTF. As a friendly reminder, we will need to have accepted the Anaconda terms and conditions to use Anaconda 3rd party packages. 

In [None]:
class forecast:
    def __init__(self):
        self.date_hour = []
        self.from_hour = []
        self.COLLEGE_TOWN = []
        self.DAYOFWEEK = []
        self.MONTH = []
        self.YEAR = []
        self.HOLIDAY_NAME = []
        self.HOURLY_TRAFFIC = []

    def process(
        self,
        date_hour,
        HOURLY_TRAFFIC,
        from_hour,
        COLLEGE_TOWN,
        DAYOFWEEK,
        MONTH,
        YEAR,
        HOLIDAY_NAME,
    ):
        self.date_hour.append(date_hour)
        self.HOURLY_TRAFFIC.append(HOURLY_TRAFFIC)
        self.from_hour.append(from_hour)
        self.COLLEGE_TOWN.append(COLLEGE_TOWN)
        self.DAYOFWEEK.append(DAYOFWEEK)
        self.MONTH.append(MONTH)
        self.YEAR.append(YEAR)
        self.HOLIDAY_NAME.append(HOLIDAY_NAME)

    def end_partition(self):
        df = pd.DataFrame(
            zip(
                self.date_hour,
                self.HOURLY_TRAFFIC,
                self.from_hour,
                self.COLLEGE_TOWN,
                self.DAYOFWEEK,
                self.MONTH,
                self.YEAR,
                self.HOLIDAY_NAME,
            ),
            columns=[
                "DATE_HOUR",
                "HOURLY_TRAFFIC",
                "HOUR",
                "COLLEGE_TOWN",
                "CALENDAR_WEEK_DAY_NBR",
                "CALENDAR_MTH",
                "CALENDAR_YEAR",
                "HOLIDAY_NAME",
            ],
        )

        # set the time column as our index
        df2 = df.set_index("DATE_HOUR")
        df2.index = pd.to_datetime(df2.index)

        # Converting features to categories for get_dummies
        df2["CALENDAR_WEEK_DAY_NBR"] = df2["CALENDAR_WEEK_DAY_NBR"].astype("category")
        df2["CALENDAR_MTH"] = df2["CALENDAR_MTH"].astype("category")
        df2["CALENDAR_YEAR"] = df2["CALENDAR_YEAR"].astype("category")
        df2["HOUR"] = df2["HOUR"].astype("category")
        df2["HOLIDAY_NAME"] = df2["HOLIDAY_NAME"].astype("category")
        df2["COLLEGE_TOWN"] = df2["COLLEGE_TOWN"].astype("category")

        # Use get_dummies for categorical features
        final = pd.get_dummies(
            data=df2,
            columns=[
                "HOLIDAY_NAME",
                "COLLEGE_TOWN",
                "CALENDAR_WEEK_DAY_NBR",
                "CALENDAR_MTH",
                "CALENDAR_YEAR",
                "HOUR",
            ],
        )

        # do the train & forecast split
        today = date.today()
        yesterday = today - timedelta(days=1)
        fourweek = today + timedelta(days=28)
        tomorrow = today + timedelta(days=1)

        train = final[
            (final.index >= pd.to_datetime("16-Jun-2018"))
            & (final.index <= pd.to_datetime(yesterday))
        ]
        forecast = final[
            (final.index >= pd.to_datetime(tomorrow))
            & (final.index <= pd.to_datetime(fourweek))
        ]

        X_train = train.drop("HOURLY_TRAFFIC", axis=1)
        y_train = train["HOURLY_TRAFFIC"]

        X_forecast = forecast.drop("HOURLY_TRAFFIC", axis=1)

        # Use XGBoost regressor model
        model = xgb.XGBRegressor(n_estimators=200, n_jobs=1)
        model.fit(X_train, y_train, verbose=False)

        forecast["PREDICTION"] = model.predict(X_forecast)

        hours = forecast.index.hour
        forecast = pd.concat(
            [forecast, pd.DataFrame(hours, index=forecast.index)], axis=1
        )
        forecast = forecast[["DATE_HOUR", "PREDICTION"]]
        forecast = forecast.sort_index()
        forecast.loc[forecast["PREDICTION"] < 0, "PREDICTION"] = 0
        forecast["DATE"] = forecast.index.date

        # output prediction
        for idx, row in forecast.iterrows():
            DATE = row["DATE"]
            DATE_HOUR = row["DATE_HOUR"]
            PREDICTION = row["PREDICTION"]
            yield DATE, DATE_HOUR, PREDICTION

Register the UDTF. 

In [None]:
store_forecast = F.udtf(
    forecast,
    output_schema=T.StructType(
        [
            T.StructField("DATE", T.DateType()),
            T.StructField("HOUR_OF_DAY", T.IntegerType()),
            T.StructField("HOURLY_FORECAST", T.FloatType()),
        ]
    ),
    input_types=[
        T.TimestampType(),
        T.LongType(),
        T.LongType(),
        T.BooleanType(),
        T.LongType(),
        T.LongType(),
        T.LongType(),
        T.StringType(),
    ],
    name="store_forecast",
    is_permanent=True,
    stage_location="@python_models",
    packages=["pandas", "xgboost"],
    replace=True,
    session=session,
)

### Call the UDTF on Snowpark Optimized WH to run models in parallel and get forecast

In [None]:
df = session.table("MODEL_FEATURES")

In [None]:
forecast = df.select(
    df["STORE_ID"],
    (
        store_forecast(
            df["TIME_POINTS"],
            df["HOURLY_TRAFFIC"],
            df["HOUR"],
            df["COLLEGE_TOWN"],
            df["CALENDAR_WEEK_DAY_NBR"],
            df["CALENDAR_MTH_DAY_NBR"],
            df["CALENDAR_YEAR"],
            df["HOLIDAY_NAME"],
        ).over(partition_by=df["STORE_ID"])
    ),
)

In [None]:
forecast.show()

### Persist the forecast table to Snowflake.

In [None]:
forecast.write.save_as_table("FOUR_WEEK_FORECAST", mode="overwrite")