In [1]:
import pandas as _hex_pandas
import datetime as _hex_datetime
import json as _hex_json

In [2]:
hex_scheduled = _hex_json.loads("false")

In [3]:
hex_user_email = _hex_json.loads("\"example-user@example.com\"")

In [4]:
hex_user_attributes = _hex_json.loads("{}")

In [5]:
hex_run_context = _hex_json.loads("\"logic\"")

In [6]:
hex_timezone = _hex_json.loads("\"UTC\"")

In [7]:
hex_project_id = _hex_json.loads("\"70dd2a3e-c68b-4b52-b832-56ae5d562af4\"")

In [8]:
hex_project_name = _hex_json.loads("\"CUMULATIVE CHALLENGE: Daily Visitors Forecasting_Learners Guide\"")

In [9]:
hex_status = _hex_json.loads("\"\"")

In [10]:
hex_categories = _hex_json.loads("[]")

In [11]:
hex_color_palette = _hex_json.loads("[\"#4C78A8\",\"#F58518\",\"#E45756\",\"#72B7B2\",\"#54A24B\",\"#EECA3B\",\"#B279A2\",\"#FF9DA6\",\"#9D755D\",\"#BAB0AC\"]")

#### 1- Upload Datasets into Snowflake Tables

Use Snowflake UI to upload DAILY_VISITORS.csv & DAILY_VISITORS_NEW.csv into Snowflake Internal Stage then create 2 Tables

1. _**DAILY_VISITORS **_,which contains 5 years of historical daily visitors.
2. _**DAILY_VISITORS_NEW**_, which is a new month (November) for which we want to predict the number of daily visitors.



#### 2- Import necessary libraries

In [15]:
# Snowpark for Python
from snowflake.snowpark.functions import udf
from snowflake.snowpark import types as T
import snowflake.snowpark.functions as F
from snowflake.snowpark.version import VERSION


# Snowpark ML
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import OrdinalEncoder

# data science libs
import pandas as pd
import numpy as np
from snowflake.ml.modeling.metrics import r2_score

# misc
import joblib
import cachetools

#### 3- Create Snowpark Session

In [16]:
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", 'VISITORS_CONNECTION',).create()

# Verify connectivity to Snowflake
snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : JAMESMCINTYRE
Role                        : "ACCOUNTADMIN"
Database                    : "VISITORS"
Schema                      : "PUBLIC"
Warehouse                   : "PC_HEX_WH"
Snowflake version           : 9.5.2
Snowpark for Python version : 1.28.0


#### 4- Import Daily Visitors Historical Data as a Snowpark DF

In [18]:
# Create Snowpark DF for Table DAILY_VISITORS
VISITORS_TABLE = 'DAILY_VISITORS'
input_tbl = f"{session.get_current_database()}.{session.get_current_schema()}.{VISITORS_TABLE}"
input_tbl

# Show Snowpark DF 
# First, we read-in the data from a Snowflake table into a Snowpark DataFrame
DAILY_VISITORS_DF = session.table(input_tbl)

# Let's visualise the Data
DAILY_VISITORS_DF.show()

# Describe Snowpark Datafarame
DAILY_VISITORS_DF.describe().show()


-------------------------------------------------------------------------------------------------------------------------------------------------------
|"CALENDAR_DATE"  |"DAY"      |"CALENDAR_MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY"  |"LAST_YEAR_DAILY_VISITORS"  |"DAILY_VISITORS"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------
|2018-06-16       |SATURDAY   |16                      |6               |2018             |1          |71                          |89                |
|2018-06-17       |SUNDAY     |17                      |6               |2018             |1          |79                          |99                |
|2018-06-18       |MONDAY     |18                      |6               |2018             |0          |97                          |108               |
|2018-06-19       |TUESDAY    |19                      |6               |2018           

___**NOTES**___ : 

- ',**DAY**,' is a Categorical column. You will need to transform it into Numerical column using OrdinalEncoder Transformer. 
- ',**CALENDAR_DATE**,' will be replaced by 'CALENDAR_MTH_DAY_NBR', 'CALENDAR_MTH' and 'CALENDAR_YEAR' columns.
- ',**HOLIDAY**,' column indicates if that day was a public Holiday or a Weekend. 



#### 5- Split the dataset into Training and Test DFs

In [27]:
# Since our dataset is a timeseries, we will split it based on a cut-off date (not a random split) to preserve the order and structure.  
split_date = '01-Sep-2022'

# Create Train DF 
train_df = DAILY_VISITORS_DF\
    .select('DAY',\
            'CALENDAR_MTH_DAY_NBR',\
            'CALENDAR_MTH',\
            'CALENDAR_YEAR',\
            'HOLIDAY',\
            'LAST_YEAR_DAILY_VISITORS',\
            'DAILY_VISITORS').\
    filter((F.col('CALENDAR_DATE') < split_date))

# Create Test DF Similar to Train_DF 
test_df = DAILY_VISITORS_DF\
    .select('DAY',\
            'CALENDAR_MTH_DAY_NBR',\
            'CALENDAR_MTH',\
            'CALENDAR_YEAR',\
            'HOLIDAY',\
            'LAST_YEAR_DAILY_VISITORS',\
            'DAILY_VISITORS').\
    filter((F.col('CALENDAR_DATE') >= split_date))

# NOTE : both 'CALENDAR_DATE' & 'HOLIDAY_NAME' columns are dropped from our DF and will not be used further. 
# 'CALENDAR_DATE' will be replaced by 'CALENDAR_MTH_DAY_NBR', 'CALENDAR_MTH' and 'CALENDAR_YEAR' columns.


# Show train_df
test_df.describe().show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------
|"SUMMARY"  |"DAY"      |"CALENDAR_MTH_DAY_NBR"  |"CALENDAR_MTH"      |"CALENDAR_YEAR"  |"HOLIDAY"            |"LAST_YEAR_DAILY_VISITORS"  |"DAILY_VISITORS"   |
----------------------------------------------------------------------------------------------------------------------------------------------------------------
|count      |61         |61.0                    |61.0                |61.0             |61.0                 |61.0                        |61.0               |
|mean       |NULL       |15.754098               |9.508197            |2022.0           |0.327869             |61.95082                    |72.672131          |
|stddev     |NULL       |8.880044538176596       |0.5040813426422367  |0.0              |0.47333286384953244  |25.63553927655902           |32.05657567489079  |
|max        |WEDNESDAY  |31.0     

#### 6- Categorize columns & Create pipeline

In [20]:
# Categorize all the features for modeling
CATEGORICAL_COLUMNS = ["DAY"]
CATEGORICAL_COLUMNS_OE = ["CALENDAR_WEEK_DAY_NBR"]
NUMERICAL_COLUMNS = ['CALENDAR_MTH_DAY_NBR','CALENDAR_MTH','CALENDAR_YEAR','HOLIDAY','LAST_YEAR_DAILY_VISITORS']
LABEL_COLUMNS = ['DAILY_VISITORS']
OUTPUT_COLUMNS = ['FORECASTED_DAILY_VISITORS']

# Create categories to be used in the OrdinalEncoder transformer. 
categories = {
    "DAY": np.array(["MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"]),
}

# Define a pipeline that does the preprocessing (OrdinalEncoder) for column DAY and Regressor (using XGBRegressor model)
pipe = pipe = Pipeline(
    steps=[
        ("OE", OrdinalEncoder(
            input_cols= CATEGORICAL_COLUMNS,  
            output_cols= CATEGORICAL_COLUMNS_OE, 
            categories = categories,  
            drop_input_cols=True)
            ),
        ("regressor", XGBRegressor(
            learning_rate = 0.1, # Add Best best_params_ Results here 
            n_estimators = 200,  # Add Best best_params_ Results here 
            input_cols=CATEGORICAL_COLUMNS_OE + NUMERICAL_COLUMNS, 
            label_cols=LABEL_COLUMNS, 
            output_cols=OUTPUT_COLUMNS, 
            n_jobs=-1)
            )
    ]
)

#### 7- Train the model and check its accuracy using R2

In [21]:
# Train the model by calling .fit()
pipe.fit(train_df)

Package 'snowflake-telemetry-python' is not installed in the local environment. Your UDF might not work when the package is installed on the server but not on your local environment.


<snowflake.ml.modeling.pipeline.pipeline.Pipeline at 0x2e901c09c40>

In [23]:
# Forecast daily visitors for test_df
result = pipe.predict(test_df)

# Show results
result.select("DAILY_VISITORS", "FORECASTED_DAILY_VISITORS").show()

# calculate Model Accuracy using R-2 Score 
r2_score(df=result, y_true_col_name="DAILY_VISITORS", y_pred_col_name='FORECASTED_DAILY_VISITORS')

--------------------------------------------------
|"DAILY_VISITORS"  |"FORECASTED_DAILY_VISITORS"  |
--------------------------------------------------
|38                |37.7777099609375             |
|47                |46.33698272705078            |
|98                |97.64281463623047            |
|110               |110.20867156982422           |
|118               |117.42937469482422           |
|50                |50.233856201171875           |
|44                |44.536285400390625           |
|41                |41.11422348022461            |
|48                |48.12749481201172            |
|99                |99.13192749023438            |
--------------------------------------------------



0.9998043185608769

#### ___NOTE : ___No need for GridSearchCV for hyper-parameters tuning  



#### 8- Convert pipeline to sklearn file and Save it into Snowflake Stage

In [25]:
# call pipe.fit(train_df) then convert Model into SKLEARN Object using to_sklearn()
model = pipe.fit(train_df).to_sklearn()

# Save the model locally as joblib
MODEL_FILE = 'daily_visitors_model.joblib'

joblib.dump(model, MODEL_FILE)

# Upload the model's joblib file into the Snowflake stage ML_FILES 
session.file.put(MODEL_FILE, "@ML_FILES", overwrite=True, auto_compress=False)

Package 'snowflake-telemetry-python' is not installed in the local environment. Your UDF might not work when the package is installed on the server but not on your local environment.


[PutResult(source='daily_visitors_model.joblib', target='daily_visitors_model.joblib', source_size=701851, target_size=701856, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

#### 9- Create Vectorised UDF for Batch Inference

In [33]:
# Define a function to read the model from a file
@cachetools.cached(cache={})
def read_file(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m


# Create a vectorized UDF for batch inference
@F.udf(name="predict_daily_visitors",
        is_permanent=True,
        stage_location = '@ML_FILES',
        imports=['@ML_FILES/daily_visitors_model.joblib'],
        packages=['snowflake-ml-python', 'joblib', 'scikit-learn==1.5.2', 'xgboost==2.1.1', 'cachetools'],
        replace=True,
        session=session)
def predict_daily_visitors(pd_input: T.PandasDataFrame[str, float, float, float, float, float]) -> T.PandasSeries[float]:
        # Make sure you have the columns in the expected order in the Pandas DataFrame
    features = ['DAY', 'CALENDAR_MTH_DAY_NBR', 'CALENDAR_MTH', 'CALENDAR_YEAR', 'HOLIDAY', 'LAST_YEAR_DAILY_VISITORS']
    pd_input.columns =  features
    model = read_file('daily_visitors_model.joblib')             
    prediction = model.predict(pd_input)       
    return prediction              

#### 10- Call UDF to forecast Daily Visitors for DAILY_VISITORS_NEW Table and save results into a Snowflake Table

In [34]:
# Load DAILY_VISITORS_NEW Table into a Snowpark DF
NEW_VISITORS_TABLE = session.table('DAILY_VISITORS_NEW') 

# Apply the UDF on the Snowpark DF

new_visitors_w_prediction = NEW_VISITORS_TABLE.with_column("FORECASTED_DAILY_VISITORS", F.call_function("predict_daily_visitors", 
                                    F.col('DAY'),
                                    F.col('CALENDAR_MTH_DAY_NBR'),
                                    F.col('CALENDAR_MTH'),
                                    F.col('CALENDAR_YEAR'),
                                    F.col('HOLIDAY'),
                                    F.col('LAST_YEAR_DAILY_VISITORS'))
                                    )

# Show the result
new_visitors_w_prediction.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CALENDAR_DATE"  |"DAY"      |"CALENDAR_MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY"  |"LAST_YEAR_DAILY_VISITORS"  |"FORECASTED_DAILY_VISITORS"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2022-11-01       |TUESDAY    |1                       |11              |2022             |0          |42                          |47.17447280883789            |
|2022-11-02       |WEDNESDAY  |2                       |11              |2022             |0          |50                          |49.20104217529297            |
|2022-11-03       |THURSDAY   |3                       |11              |2022             |0          |42                          |46.12674331665039            |
|2022-11-04       |FRI

In [35]:
# Write forecasting to a Snowflake table
new_visitors_w_prediction.write.mode('overwrite').save_as_table('new_visitors_w_prediction')

In [36]:
session.close()

# ------- END OF THE CUMULATIVE CHALLENGE -------

