In [None]:
import pandas as _hex_pandas
import datetime as _hex_datetime
import json as _hex_json

In [None]:
hex_scheduled = _hex_json.loads("false")

In [None]:
hex_user_email = _hex_json.loads("\"example-user@example.com\"")

In [None]:
hex_run_context = _hex_json.loads("\"logic\"")

In [None]:
hex_timezone = _hex_json.loads("\"UTC\"")

In [None]:
hex_project_id = _hex_json.loads("\"9afb1a80-4c77-4dc4-872a-5bd11dc967b4\"")

In [None]:
hex_project_name = _hex_json.loads("\"Daily Visitors Forecasting\"")

In [None]:
hex_status = _hex_json.loads("\"\"")

In [None]:
hex_categories = _hex_json.loads("[]")

In [None]:
hex_color_palette = _hex_json.loads("[\"#4C78A8\",\"#F58518\",\"#E45756\",\"#72B7B2\",\"#54A24B\",\"#EECA3B\",\"#B279A2\",\"#FF9DA6\",\"#9D755D\",\"#BAB0AC\"]")

#### 1- Upload Datasets into Snowflake Tables

Use Snowflake UI to upload DAILY_VISITORS.csv & DAILY_VISITORS_NEW.csv into Snowflake Internal Stage then create 2 Tables

1. _**DAILY_VISITORS **_,which contains 5 years of historical daily visitors.
2. _**DAILY_VISITORS_NEW**_, which is a new month (November) for which we want to predict the number of daily visitors.



#### 2- Import necessary libraries

In [None]:
# Snowpark for Python
from snowflake.snowpark.functions import udf
from snowflake.snowpark import types as T
import snowflake.snowpark.functions as F
from snowflake.snowpark.version import VERSION

# Snowpark ML
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import OrdinalEncoder

# data science libs
import pandas as pd
import numpy as np
from snowflake.ml.modeling.metrics import r2_score

# misc
import joblib
import cachetools

#### 3- Create Snowpark Session

In [None]:
# Use HEX to establish a connection to your Snowflake Account
import hextoolkit
hex_snowflake_conn = hextoolkit.get_data_connection('MY_SNOWFLAKE_NEW')
session = hex_snowflake_conn.get_snowpark_session()

#### 4- Import Daily Visitors Historical Data as a Snowpark DF

In [None]:
# Create Snowpark DF for Table DAILY_VISITORS
DAILY_VISITORS_DF = session.table('DAILY_VISITORS')

# Show Snowpark DF 
DAILY_VISITORS_DF.show()

-------------------------------------------------------------------------------------------------------------------------------------------------------
|"CALENDAR_DATE"  |"DAY"      |"CALENDAR_MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY"  |"LAST_YEAR_DAILY_VISITORS"  |"DAILY_VISITORS"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------
|2018-06-16       |SATURDAY   |16                      |6               |2018             |1          |71                          |89                |
|2018-06-17       |SUNDAY     |17                      |6               |2018             |1          |79                          |99                |
|2018-06-18       |MONDAY     |18                      |6               |2018             |0          |97                          |108               |
|2018-06-19       |TUESDAY    |19                      |6               |2018           

___**NOTES**___ : 

- ',**DAY**,' is a Categorical column. You will need to transform it into Numerical column using OrdinalEncoder Transformer. 
- ',**CALENDAR_DATE**,' will be replaced by 'CALENDAR_MTH_DAY_NBR', 'CALENDAR_MTH' and 'CALENDAR_YEAR' columns.
- ',**HOLIDAY**,' column indicates if that day was a public Holiday or a Weekend. 



#### 5- Split the dataset into Training and Test DFs

In [None]:
# Since our dataset is a timeseries, we will split it based on a cut-off date (not a random split) to preserve the order and structure.  
split_date = '01-Sep-2022'

# Create Train DF 
train_df = DAILY_VISITORS_DF\
    .select('DAY',\
            'CALENDAR_MTH_DAY_NBR',\
            'CALENDAR_MTH',\
            'CALENDAR_YEAR',\
            'HOLIDAY',\
            'LAST_YEAR_DAILY_VISITORS',\
            'DAILY_VISITORS').\
    filter((F.col('CALENDAR_DATE') < split_date))

# Create Test DF Similar to Train_DF 
test_df = DAILY_VISITORS_DF\
    .select('DAY',\
            'CALENDAR_MTH_DAY_NBR',\
            'CALENDAR_MTH',\
            'CALENDAR_YEAR',\
            'HOLIDAY',\
            'LAST_YEAR_DAILY_VISITORS',\
            'DAILY_VISITORS').\
    filter((F.col('CALENDAR_DATE') >= split_date))

# NOTE : both 'CALENDAR_DATE' & 'HOLIDAY_NAME' columns are dropped from our DF and will not be used further. 
# 'CALENDAR_DATE' will be replaced by 'CALENDAR_MTH_DAY_NBR', 'CALENDAR_MTH' and 'CALENDAR_YEAR' columns.


# Show train_df
train_df.show()

-------------------------------------------------------------------------------------------------------------------------------------
|"DAY"      |"CALENDAR_MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY"  |"LAST_YEAR_DAILY_VISITORS"  |"DAILY_VISITORS"  |
-------------------------------------------------------------------------------------------------------------------------------------
|SATURDAY   |16                      |6               |2018             |1          |71                          |89                |
|SUNDAY     |17                      |6               |2018             |1          |79                          |99                |
|MONDAY     |18                      |6               |2018             |0          |97                          |108               |
|TUESDAY    |19                      |6               |2018             |0          |42                          |46                |
|WEDNESDAY  |20                      |6               |2018   

#### 6- Categorize columns & Create pipeline

In [None]:
# Categorize all the features for modeling
CATEGORICAL_COLUMNS = ["DAY"]
CATEGORICAL_COLUMNS_OE = ["CALENDAR_WEEK_DAY_NBR"]
NUMERICAL_COLUMNS = ['CALENDAR_MTH_DAY_NBR','CALENDAR_MTH','CALENDAR_YEAR','HOLIDAY','LAST_YEAR_DAILY_VISITORS']
LABEL_COLUMNS = ['DAILY_VISITORS']
OUTPUT_COLUMNS = ['FORECASTED_DAILY_VISITORS']

# Create categories to be used in the OrdinalEncoder transformer. 
categories = {
    "DAY": np.array(["MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"]),
}

# Define a pipeline that does the preprocessing (OrdinalEncoder) for column DAY and Regressor (using XGBRegressor model)
pipe = Pipeline(
    steps=[
        ("OE", OrdinalEncoder(
            input_cols= CATEGORICAL_COLUMNS,  
            output_cols= CATEGORICAL_COLUMNS_OE, 
            categories = categories,  
            drop_input_cols=True)
            ),
        ("regressor", XGBRegressor(
            input_cols=CATEGORICAL_COLUMNS_OE + NUMERICAL_COLUMNS, 
            label_cols=LABEL_COLUMNS, 
            output_cols=OUTPUT_COLUMNS, 
            n_jobs=-1)
            )
    ]
)

#### 7- Train the model and check its accuracy using R2

In [None]:
xgb_regressor = pipe.fit(train_df)

In [None]:
# Forecast daily visitors for test_df
result = xgb_regressor.predict(test_df)

# Show results
result.select("DAILY_VISITORS", "FORECASTED_DAILY_VISITORS").show()

# calculate Model Accuracy using R-2 Score 
print('Acccuracy:', r2_score(df=result,y_true_col_name="DAILY_VISITORS",y_pred_col_name="FORECASTED_DAILY_VISITORS"))

--------------------------------------------------
|"DAILY_VISITORS"  |"FORECASTED_DAILY_VISITORS"  |
--------------------------------------------------
|38                |37.847991943359375           |
|47                |46.41679382324219            |
|98                |97.78035736083984            |
|110               |110.70214080810547           |
|118               |118.01464080810547           |
|50                |50.40774154663086            |
|44                |45.02611541748047            |
|41                |41.21802520751953            |
|48                |48.43382263183594            |
|99                |99.28539276123047            |
--------------------------------------------------

Acccuracy: 0.9997674159736952


#### ___NOTE : ___No need for GridSearchCV for hyper-parameters tuning  



#### 8- Convert pipeline to sklearn file and Save it into Snowflake Stage

In [None]:
# call pipe.fit(train_df) then convert Model into SKLEARN Object using to_sklearn()
daily_visitors_model = pipe.fit(train_df).to_sklearn()

# Save the model locally as joblib
MODEL_FILE = 'daily_visitors_model.joblib'
joblib.dump(daily_visitors_model, MODEL_FILE)

['daily_visitors_model.joblib']

In [None]:
# Upload the model's joblib file into the Snowflake stage ML_FILES 
session.file.put(MODEL_FILE, "@ML_FILES", overwrite=True, auto_compress=False)

[PutResult(source='daily_visitors_model.joblib', target='daily_visitors_model.joblib', source_size=375682, target_size=375696, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

#### 9- Create Vectorised UDF for Batch Inference

In [None]:
# Define a function to read the model from a file
@cachetools.cached(cache={})
def read_file(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m


# Create a vectorized UDF for forecasting
@F.udf(name="daily_visitors_forecasting",
        is_permanent=True,
        stage_location = '@ML_FILES',
        imports=['@ML_FILES/daily_visitors_model.joblib'],
        packages=['snowflake-ml-python', 'joblib', 'scikit-learn==1.2.2', 'xgboost==1.7.3', 'cachetools'],
        replace=True,
        session=session)
def daily_visitors_forecasting(pd_input: T.PandasDataFrame[str, float, float, float, float, float]) -> T.PandasSeries[float]:
        # Make sure you have the columns in the expected order in the Pandas DataFrame
    features = ['DAY','CALENDAR_MTH_DAY_NBR','CALENDAR_MTH','CALENDAR_YEAR','HOLIDAY','LAST_YEAR_DAILY_VISITORS']
    pd_input.columns = features
    model =  read_file('daily_visitors_model.joblib')   
    if model is not None:
        forecasting = model.predict(pd_input)            
        return forecasting
    else:
        raise ValueError('Model is None, check the model loading process')  

#### 10- Call UDF to forecast Daily Visitors for DAILY_VISITORS_NEW Table and save results into a Snowflake Table

In [None]:
# Load DAILY_VISITORS_NEW Table into a Snowpark DF
new_dates = session.table("DAILY_VISITORS_NEW")

# Apply the UDF on the Snowpark DF
daily_visitors_forecasting = new_dates.with_column(
    "FORECASTED_DAILY_VISITORS",
    F.call_function(
        "daily_visitors_forecasting",
        F.col("DAY"),
        F.col("CALENDAR_MTH_DAY_NBR"),
        F.col("CALENDAR_MTH"),
        F.col("CALENDAR_YEAR"),
        F.col("HOLIDAY"),
        F.col("LAST_YEAR_DAILY_VISITORS"),
    ),
)
# Show the result
daily_visitors_forecasting.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CALENDAR_DATE"  |"DAY"      |"CALENDAR_MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY"  |"LAST_YEAR_DAILY_VISITORS"  |"FORECASTED_DAILY_VISITORS"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2022-11-01       |TUESDAY    |1                       |11              |2022             |0          |42                          |47.10953903198242            |
|2022-11-02       |WEDNESDAY  |2                       |11              |2022             |0          |50                          |49.39727020263672            |
|2022-11-03       |THURSDAY   |3                       |11              |2022             |0          |42                          |46.25577926635742            |
|2022-11-04       |FRI

In [None]:
# Write forecasting to a Snowflake table
daily_visitors_forecasting.write.mode('overwrite').save_as_table('daily_visitors_new_forecasting')