Import the necessary Python modules.


In [1]:
from datetime import date, timedelta
import pandas as pd
import xgboost as xgb
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session

Let's establish a Snowpark session using hextoolkit.

In [2]:
session = Session.builder.configs(SnowflakeLoginOptions()).getOrCreate()

SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. 


In [3]:
session.use_schema('MOCKSERIES')
print(f"Role: {session.get_current_role()} | WH: {session.get_current_warehouse()} | DB.SCHEMA: {session.get_fully_qualified_current_schema()}")

Role: "SYSADMIN" | WH: "TIME_SERIES_M" | DB.SCHEMA: "CROMANO"."MOCKSERIES"


We will start with an existing table that we will want for our Time Series features. The table contains a `DATE` column which is the day the traffic was recorded. We will extract additional date features by joining it to a calendar table we've defined. 



In [5]:
store_traffic_info_df = session.table("DAILY_TS_5000_PARTITIONS_STARTING_2021").with_column_renamed('ORDER_TIMESTAMP', 'DATE').with_column_renamed('TARGET', 'TRAFFIC')
store_calendar_info_df = session.table("CALENDAR_INFO_2018")

Let's preview these tables. 

In [6]:
print(f'Number of Unique STORE_IDs: {store_traffic_info_df.select("STORE_ID").distinct().count()}')
store_traffic_info_df.show()

Number of Unique STORE_IDs: 5000
--------------------------------------------------------------------------------------------------
|"DATE"               |"TRAFFIC"          |"STORE_ID"  |"FEATURE_1"        |"FEATURE_2"          |
--------------------------------------------------------------------------------------------------
|2022-03-08 00:00:00  |666.8192663331051  |4444        |654.1491925989498  |410.4926509173015    |
|2022-03-09 00:00:00  |638.5786270901655  |4444        |669.8177918631981  |6.350771200553481    |
|2022-03-10 00:00:00  |635.7952662731936  |4444        |784.9884373702064  |-109.93505762901668  |
|2022-03-11 00:00:00  |660.7159818576009  |4444        |603.3515104411116  |640.4774506148209    |
|2022-03-12 00:00:00  |669.6342070799896  |4444        |767.9436650881438  |-134.48868364016857  |
|2022-03-13 00:00:00  |691.3206560432528  |4444        |893.4186285859912  |868.4772587556463    |
|2022-03-14 00:00:00  |675.0721310403344  |4444        |735.714068572056   |

In [7]:
store_traffic_info_df.count()

6735000

In [9]:
store_calendar_info_df.show()

--------------------------------------------------------------------------------------------------------
|"CALENDAR_DATE"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |
--------------------------------------------------------------------------------------------------------
|2018-01-01       |0               |1              |1               |2018             |New Year's Day  |
|2018-01-02       |1               |2              |1               |2018             |NULL            |
|2018-01-03       |2               |3              |1               |2018             |NULL            |
|2018-01-04       |3               |4              |1               |2018             |NULL            |
|2018-01-05       |4               |5              |1               |2018             |NULL            |
|2018-01-06       |5               |6              |1               |2018             |NULL            |
|2018-01-07       |6               |7              |1  

## Feature Engineering

Join the Calendar info table to the traffic table 

In [10]:
past_final = (
    store_traffic_info_df.join(
        store_calendar_info_df,
        (
            store_calendar_info_df.col("CALENDAR_DATE")
            == store_traffic_info_df.col("DATE")
        ),
        "left",
    )
    .select(
        F.col("DATE"),
        "STORE_ID",
        "WEEK_DAY_NBR",
        "MTH_DAY_NBR",
        "CALENDAR_MTH",
        "CALENDAR_YEAR",
        "HOLIDAY_NAME",
        "TRAFFIC",
    )
    .na.fill({"HOLIDAY_NAME": "No Holiday"})
)
past_final.show()

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"DATE"               |"STORE_ID"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"                        |"TRAFFIC"           |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2023-06-19 00:00:00  |4570        |0               |19             |6               |2023             |Juneteenth National Independence Day  |290.3111646293761   |
|2023-06-20 00:00:00  |4570        |1               |20             |6               |2023             |No Holiday                            |298.82095141159033  |
|2023-06-21 00:00:00  |4570        |2               |21             |6               |2023             |No Holiday                            |318.0628057275538   |
|2023-06-2

Since we will be forecasting out 4 weeks we need 4 weeks of blank calendar data

In [12]:
future_cal = (
    session.table("CALENDAR_INFO_2018")
    .select(
        "CALENDAR_DATE",
        "WEEK_DAY_NBR",
        "MTH_DAY_NBR",
        "CALENDAR_MTH",
        "CALENDAR_YEAR",
        "HOLIDAY_NAME",
    )
    .filter(
        (F.col("CALENDAR_DATE") >= F.current_date())
        & (F.col("CALENDAR_DATE") <= F.current_date() + 28)
    )
)

future_cal.show()

--------------------------------------------------------------------------------------------------------
|"CALENDAR_DATE"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |
--------------------------------------------------------------------------------------------------------
|2024-09-11       |2               |11             |9               |2024             |NULL            |
|2024-09-12       |3               |12             |9               |2024             |NULL            |
|2024-09-13       |4               |13             |9               |2024             |NULL            |
|2024-09-14       |5               |14             |9               |2024             |NULL            |
|2024-09-15       |6               |15             |9               |2024             |NULL            |
|2024-09-16       |0               |16             |9               |2024             |NULL            |
|2024-09-17       |1               |17             |9  

In [13]:
df_date = session.range(32).select(F.dateadd("DAY", "ID", F.current_date()).as_("DATE"))

df_date = df_date.select(F.to_date(df_date["DATE"]).as_("DATE"))

## Cross join to make sure each store gets a value for the next 4 weeks
df_store = (
    store_traffic_info_df
    .select(F.col("STORE_ID").cast("string").alias("STORE_ID"))
    .distinct()
)
stores = df_date.cross_join(df_store)

In [14]:
future_cal = future_cal.na.fill({"HOLIDAY_NAME": "No Holiday"})

## Join store info and calendar data
future_df = stores.join(
    future_cal, stores.col("DATE") == future_cal.col("CALENDAR_DATE"), "right"
)
future_df = future_df.drop("CALENDAR_DATE")
future_df.show()

----------------------------------------------------------------------------------------------------------------
|"DATE"      |"STORE_ID"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |
----------------------------------------------------------------------------------------------------------------
|2024-09-13  |1478        |4               |13             |9               |2024             |No Holiday      |
|2024-09-13  |1525        |4               |13             |9               |2024             |No Holiday      |
|2024-09-13  |1526        |4               |13             |9               |2024             |No Holiday      |
|2024-09-13  |1541        |4               |13             |9               |2024             |No Holiday      |
|2024-09-13  |1544        |4               |13             |9               |2024             |No Holiday      |
|2024-09-13  |2757        |4               |13             |9               |2024             |N

Add a Traffic column pre-filled with zero that will be forecasted once our UDTF executes.

In [15]:
future_df = future_df.with_column("TRAFFIC", F.lit(0))

future_df = future_df.select(
    "DATE",
    "STORE_ID",
    "WEEK_DAY_NBR",
    "MTH_DAY_NBR",
    "CALENDAR_MTH",
    "CALENDAR_YEAR",
    "HOLIDAY_NAME",
    "TRAFFIC",
)
future_df.show()

----------------------------------------------------------------------------------------------------------------------------
|"DATE"      |"STORE_ID"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |"TRAFFIC"  |
----------------------------------------------------------------------------------------------------------------------------
|2024-09-13  |22          |4               |13             |9               |2024             |No Holiday      |0          |
|2024-09-13  |60          |4               |13             |9               |2024             |No Holiday      |0          |
|2024-09-13  |65          |4               |13             |9               |2024             |No Holiday      |0          |
|2024-09-13  |113         |4               |13             |9               |2024             |No Holiday      |0          |
|2024-09-13  |3037        |4               |13             |9               |2024             |No Holiday      |0          |


Union the historical and future tables together and write back to a Snowflake table.

In [16]:
unionDF = past_final.union(future_df)

unionDF.write.saveAsTable('FEATURES_TRAFFIC', mode='overwrite', create_temp_table=False)

## Creating the User Defined Table Function for multi-node parallelized model training

Create the stage, output schema and UDTF for training and forecasting.

In [17]:
# Add stage for UDFs and Stored Procs
session.sql(
    """
create stage if not exists pymodels
"""
).collect()

[Row(status='Stage area PYMODELS successfully created.')]

### Within the end partition you can put your raw python code, xgboost/prophet/arima etc..

In [24]:
schema = T.StructType([
    T.StructField("DATE1", T.DateType()),
    T.StructField("TRAFFIC_FORECAST", T.IntegerType())  
])

@F.udtf(output_schema = schema,
     input_types = [T.TimestampType(), T.FloatType(), T.StringType(), T.StringType(), T.StringType(), T.StringType()],
     name = "store_traffic_forecast", is_permanent=True, stage_location="@pymodels", session=session,
     packages=["pandas","xgboost","scikit-learn"], replace=True)

class forecast:
    def __init__(self):
        self.DATE=[]
        self.DAYOFWEEK=[]
        self.MONTH=[]
        self.YEAR=[]
        self.HOLIDAY_NAME=[]
        self.TRAFFIC=[]
    
    def process(self, DATE, TRAFFIC, DAYOFWEEK, MONTH, YEAR, HOLIDAY_NAME):
        self.DATE.append(DATE)
        self.TRAFFIC.append(TRAFFIC)
        self.DAYOFWEEK.append(DAYOFWEEK)
        self.MONTH.append(MONTH)
        self.YEAR.append(YEAR)
        self.HOLIDAY_NAME.append(HOLIDAY_NAME)
    
    def end_partition(self):
        df = pd.DataFrame(zip(self.DATE, 
                              self.TRAFFIC, 
                              self.DAYOFWEEK, 
                              self.MONTH, 
                              self.YEAR, 
                              self.HOLIDAY_NAME), 
                          columns = ['DATE','TRAFFIC','WEEK_DAY_NBR',
                                     'CALENDAR_MTH','CALENDAR_YEAR','HOLIDAY_NAME'])
        
        # set the time column as our index 
        df2 = df.set_index('DATE') 
        df2.index = pd.to_datetime(df2.index)

         # Converting features to categories for get_dummies
        df2['WEEK_DAY_NBR'] = df2['WEEK_DAY_NBR'].astype("category")
        df2['CALENDAR_MTH'] = df2['CALENDAR_MTH'].astype("category")
        df2['CALENDAR_YEAR'] = df2['CALENDAR_YEAR'].astype("category")
        df2['HOLIDAY_NAME'] = df2['HOLIDAY_NAME'].astype("category")

        #Use get_dummies for categorical features
        final = pd.get_dummies(data=df2, columns=['HOLIDAY_NAME', 
                                                  'WEEK_DAY_NBR','CALENDAR_MTH','CALENDAR_YEAR'])
       
        #do the train & forecast split
        today = date.today()
        last_14 = today - timedelta(days=14)
        fourweek = today + timedelta(days = 28)

        train = final[(final.index >= pd.to_datetime('01-Jan-2018')) & (final.index < pd.to_datetime(last_14))]
        forecast = final[(final.index >= pd.to_datetime(last_14)) & (final.index <=pd.to_datetime(fourweek))]

        X_train = train.drop('TRAFFIC', axis = 1)
        y_train = train['TRAFFIC']

        X_forecast = forecast.drop('TRAFFIC', axis = 1)
        
        #Use XGBoost regressor model
        model = xgb.XGBRegressor(n_estimators=200,n_jobs=1)
        model.fit(X_train, y_train,
                verbose=False) 
        
        forecast['PREDICTION'] = model.predict(X_forecast)

        forecast['DATE'] = forecast.index
        forecast = forecast[["DATE","PREDICTION"]]
        forecast = forecast.sort_index()
        forecast.loc[forecast['PREDICTION'] < 0,'PREDICTION']=0
        
        # output prediction
        for idx, row in forecast.iterrows():
            DATE = row['DATE']
            PREDICTION = row['PREDICTION']
            yield DATE, PREDICTION

Create the query to call to the UDTF and partition by Store ID

In [25]:
store_forecast_test = F.table_function(
    "store_traffic_forecast"
)

df = session.table('FEATURES_TRAFFIC')

forecast = df.select(
    df["STORE_ID"],
    (
        store_forecast_test(
            df["DATE"],
            df["TRAFFIC"],
            df["WEEK_DAY_NBR"],
            df["CALENDAR_MTH"],
            df["CALENDAR_YEAR"],
            df["HOLIDAY_NAME"],
        ).over(partition_by=df["STORE_ID"])
    ),
)

Write the output to a table esentially calling the UDTF, training and forecasting for the next 4 weeks.  This cell will actually execute the UDTF.

Also you can change your compute to a larger warehouse if you want for more parallelization 

In [26]:
#session.use_warehouse('time_series_l')

On a Large we were able to train and generate 5,000 forecast in 22.4 seconds

In [27]:
forecast.with_column_renamed("DATE1", "DATE").write.save_as_table(
    "FORECAST", mode="overwrite"
)

Join forecast to the Actual and visualize the results

In [28]:
traffic = session.table("FEATURES_TRAFFIC")
forecast = session.table("FORECAST")

act_vs_pred = (
    traffic.join(
        forecast,
        (traffic.col("DATE") == forecast.col("DATE"))
        & (traffic.col("STORE_ID") == forecast.col("STORE_ID")),
    )
    .select(
        F.cast(traffic.col("STORE_ID"), T.IntegerType()).alias("STORE_ID"),
        traffic.col("DATE").alias("DATE"),
        F.cast(
            F.when(traffic.col("DATE") >= F.current_date(), F.lit(None)).otherwise(
                traffic.col("TRAFFIC")
            ),
            T.IntegerType(),
        ).alias("ACTUAL"),
        forecast.col("TRAFFIC_FORECAST").alias("FORECAST"),
    )
    .filter(traffic.col("DATE") > F.current_date() - 15)
)

In [29]:
act_vs_pred.show(3)

------------------------------------------------------------
|"STORE_ID"  |"DATE"               |"ACTUAL"  |"FORECAST"  |
------------------------------------------------------------
|1567        |2024-09-02 00:00:00  |126       |152         |
|921         |2024-08-29 00:00:00  |477       |466         |
|921         |2024-08-30 00:00:00  |474       |475         |
------------------------------------------------------------



In [30]:
act_vs_pred.write.saveAsTable('actual_vs_forecast', mode='overwrite', create_temp_table=False)
act_v_pred = session.table('actual_vs_forecast')

____