### Package Installation

In [None]:
!pip install tabulate

In [None]:
!pip install numpy==1.23.5

In [None]:
!pip install sqlglot

# SAMPLE FEATURE STORE

This Notebook walks through an example of creating a Snowflake Feature Store over a multiple table relational data-model using Sample data available in all Snowflake accounts. As such this Notebook should run as is in any Snowflake account assuming the user has the required privileges to access the Sample data schema, and create the underlying database objects used by Feature Store (Schema, Tags, Dynamic Tables, Tables, Views & Datasets).  You can read more about the required privileges, and the sample script provided to grant them in the Feaure Store documentation [here](https://docs.snowflake.com/en/developer-guide/snowflake-ml/feature-store/rbac).  


We will work through creating the key components of a feature-store:
- creating mutiple Entities representing the different things we collect data about
- deriving different types of features using dataframes within feature-views
- retrieving datasets for training combining (joining) feature-views from multiple entities and key considerations when doing so


## Entities
It is common for organisations to model and store features for differINent business entities (units of analysis) within their Feature Store so they can build ML models and make predictions on the behavior of those things.  These entities have relationships with each other and have a business entity hierarchy.

For example, a retail organisation may need to store features representing a product-hierarchy.  At the lowest level we might have PART, rolling up to PART_SUB_CATEGORY, PART_CATEGORY, DEPARTMENT, and finally COMPANY.   There may be a geographic/location hierarchy which might roll up from PART, through SHELF, UNIT, AISLE, STORE, REGION and COMPANY.  We may also want to create features at the intersection of some of the levels in these hierarchies. For example, the features representing sales, and stocking levels at STORE-DEPARTMENT level.  Within the Snowflake Feature Store we represent these business-entities using the Entity class and its functions.  

### Parameters, Packages & Snowpark Session
Firstly, we will define some parameters used within the Notebook, import the required packages and create a session for Snowpark.  Feel free to adjust the parameters to suit your need.

#### Notebook Parameters



In [4]:
# Source Data Database and Schema
src_database = 'SNOWFLAKE_SAMPLE_DATA'
src_schema = 'TPCH_SF1' # <-- Modify this if you want to test with one of the larger data scale-factors. e.g. TPCH_SF1, TPCH_SF10, TPCH_SF100, TPCH_SF1000

# Database to use to create Schemas
sess_db = 'SIMON' # The database within which we will create our Feature Store (schema), and data-source schema.

org = 'TPCHSF1'                       # Name for working Schema and used to derive Feature Store Name
fs_name = f"{org}_FEATURE_STORE"      # Feature Store Name.  This will create a Schema to contain our Feature Store database objects
mr_name = f"{org}_MODEL_REGISTRY"     # Model-Registry Name.
num_spine_rows = 10                   # Maximum number of rows to use when sampling source data for Spine Entity Keys.

#### Packages

In [None]:
### Packages
## Python packages
import os
from os import listdir
from os.path import isfile, join
from datetime import date, datetime
import time 
from decimal import Decimal
import pkg_resources
pkg_resources.require("numpy==1.23.5")
import numpy as np
import pandas as pd
pd.set_option('display.max_colwidth', 500)
import streamlit as st
import tabulate
from pathlib import Path
import json

## SNOWFLAKE

## Snowpark
#import snowflake.snowpark as S
#from snowflake.snowpark import Session
#from snowflake.snowpark import Analytics
from snowflake.snowpark.version import VERSION
from snowflake.snowpark import functions as F, types as T
from snowflake.snowpark.types import StringType
# Snowpark functions representing some SQL functions we need
tryparsejson = F.builtin('TRY_PARSE_JSON')
timestampadd = F.builtin('TIMESTAMPADD')

# Snowflake Feature Store
from snowflake.ml.feature_store import FeatureStore, CreationMode
from snowflake.ml.feature_store import Entity, FeatureView
from snowflake.ml.utils import connection_params

# Snowflake ML preprocessing
from snowflake.ml.modeling.preprocessing import OrdinalEncoder

# Model Registry
from snowflake.ml.registry import registry
from snowflake.ml.model import custom_model
from typing import Optional
from snowflake.ml.model.model_signature import FeatureSpec, DataType, ModelSignature



#### Session
Setup Snowflake connection and database and return execution enviroment state.


In [None]:
# CREATE SESSION
# ## Using Snowflake Notebook
from snowflake.snowpark.context import get_active_session

session = get_active_session()

In [6]:
session.sql_simplifier_enabled = True

# Capture and Print the Current Environment Details
snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION
session_role = session.get_current_role().replace('"', "")
session_database = session.get_current_database().replace('"', "")
session_schema = session.get_current_schema().replace('"', "")
session_vw = session.get_current_warehouse().replace('"', "")

vw_status = session.sql(f"""show warehouses like '{session_vw}' """).collect()[0]
vw_type = vw_status['type']
vw_state = vw_status['state']
vw_size = vw_status['size'].upper()
vw_available = vw_status['available']
print('================================================================================')
print('\nConnection Established with the following parameters:')
print(f'Account                      : {session.sql("select current_account()").collect()[0][0]}')
print(f'User                         : {snowflake_environment[0][0]}')
print(f'Role                         : {session_role}')
print(f'Database                     : {session_database}')
print(f'Schema                       : {session_schema}')
print(f'Warehouse Name               : {session_vw}')
print(f'Warehouse Type               : {vw_type}')
print(f'Warehouse State              : {vw_state}')
print(f'Warehouse Size               : {vw_size}')
print(f'Warehouse Available Resource :{vw_available}')
print(f'Snowflake version            : {snowflake_environment[0][1]}')
print(f'Snowpark for Python version  : {snowpark_version[0]}.{snowpark_version[1]}.{snowpark_version[2]} \n')

SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. 


Running in non-Snowflake Notebook

Connection Established with the following parameters:
Account                      : AK32940
User                         : SIMON
Role                         : SYSADMIN
Database                     : SIMON
Schema                       : SCRATCH
Warehouse Name               : SIMON_XS
Warehouse Type               : STANDARD
Warehouse State              : STARTED
Warehouse Size               : X-SMALL
Warehouse Available Resource : 100
SPCS Compute Name            : NA
Notebook Client Execution    : USER_CLIENT
Snowflake version            : 8.44.2
Snowpark for Python version  : 1.25.0 



In [None]:
# Name of the schema where we will persist our generated training datasets
session.sql(f''' Create schema if not exists {sess_db}.{org}''').collect()

To make a start, we need some easily accessible data to work with that contains recognisable business entities and hierarchy. We will make use of the Transaction Processing Councils Adhoc (TPCH) dataset that is made available as a data-share in the `SNOWFLAKE_SAMPLE_DATA` database in all accounts. You can read more about the TPCH Data [here](https://docs.snowflake.com/en/user-guide/sample-data-tpch) and see the ER Diagram that represents that data below.

In [None]:
st.image("https://docs.snowflake.com/en/_images/sample-data-tpch-schema.png") # 


For this example, we will assume that all of the tables contain raw-features that will form the foundation of our Feature Store.  The Primary Key for each table will be used as the Entity key of our Entities, and the other columns will form the Features.  Data-shared Snowflake tables do not have Primary Key constraints visible on them within Snowflake metadata. The following contraints show the Primary Key for each table.

```sql
ALTER TABLE PART
  ADD CONSTRAINT part_kpey
     PRIMARY KEY (P_PARTKEY);

ALTER TABLE SUPPLIER
  ADD CONSTRAINT supplier_pkey
     PRIMARY KEY (S_SUPPKEY);

ALTER TABLE PARTSUPP
  ADD CONSTRAINT partsupp_pkey
     PRIMARY KEY (PS_PARTKEY, PS_SUPPKEY);

ALTER TABLE CUSTOMER
  ADD CONSTRAINT customer_pkey
     PRIMARY KEY (C_CUSTKEY);

ALTER TABLE ORDERS
  ADD CONSTRAINT orders_pkey
     PRIMARY KEY (O_ORDERKEY);

ALTER TABLE LINEITEM
  ADD CONSTRAINT lineitem_pkey
     PRIMARY KEY (L_ORDERKEY, L_LINENUMBER);

ALTER TABLE NATION
  ADD CONSTRAINT nation_pkey
     PRIMARY KEY (N_NATIONKEY);

ALTER TABLE REGION
  ADD CONSTRAINT region_pkey
     PRIMARY KEY (R_REGIONKEY);
```       

Two of the tables contain multi-column (compound) Primary Keys; `LINEITEM` and `PARTSUPP`.

Data models often have a mix of time-varying and non-time-varying tables. This TPCH data-model is simpler and not time-varying, in that there are no timestamps (e.g. LAST_MODIFIED_TS) to record multiple values for the features through time for a specific Entity. Each table records only the current/as-is values for an Entity so there is record of changes (updates) to the data per primary-key.  Therefore, we do not need to include a Timestamp column when creating the FeatureViews for this data. 

We will create a dataframe for each table that we will use as a proxy to represent the base level FeatureViews in our Feature Store.  In each dataframe we will rename any of the primary or secondary key columns, removing the table-name acronym prefix present in the source tables, in order to make the key column names consistent across all tables. e.g. 

`P_PARTKEY & PS_PARTKEY --> PARTKEY`

### Create base level dataframes for FeatureViews

In [83]:
# CUSTOMER
customer_table = 'CUSTOMER'
customer_sdf = session.table(f'{src_database}.{src_schema}.{customer_table}') \
                        .with_columns(['CUSTKEY','NATIONKEY'],[F.col('C_CUSTKEY'), F.col('C_NATIONKEY')]) \
                        .drop('C_CUSTKEY', 'C_NATIONKEY')
customer_nosk_sdf = customer_sdf.drop('NATIONKEY')


# LINEITEM
lineitem_table = 'LINEITEM'
lineitem_sdf = session.table(f'{src_database}.{src_schema}.{lineitem_table}') \
                        .with_columns(['ORDERKEY','LINENUMBER', 'SUPPKEY', 'PARTKEY'],[F.col('L_ORDERKEY'), F.col('L_LINENUMBER'),  F.col('L_SUPPKEY'), F.col('L_PARTKEY')]) \
                        .drop('L_ORDERKEY', 'L_LINENUMBER', 'L_PARTKEY', 'L_SUPPKEY')
lineitem_nosk_sdf = lineitem_sdf.drop('SUPPKEY','PARTKEY')


# NATION
nation_table = 'NATION'
nation_sdf = session.table(f'{src_database}.{src_schema}.{nation_table}') \
                        .with_columns(['NATIONKEY','REGIONKEY'],[F.col('N_NATIONKEY'),F.col('N_REGIONKEY')]) \
                        .drop('N_NATIONKEY', 'N_REGIONKEY')
nation_nosk_sdf = nation_sdf.drop('REGIONKEY')


# ORDER
orders_table = 'ORDERS'
orders_sdf = session.table(f'{src_database}.{src_schema}.{orders_table}') \
                .with_columns(['ORDERKEY','CUSTKEY'],[F.col('O_ORDERKEY'),F.col('O_CUSTKEY')]) \
                .drop('O_ORDERKEY', 'O_CUSTKEY')
orders_nosk_sdf = orders_sdf.drop('CUSTKEY')


# PART
part_table = 'PART'
part_sdf = session.table(f'{src_database}.{src_schema}.{part_table}') \
                        .with_columns(['PARTKEY'],[F.col('P_PARTKEY')]) \
                        .drop('P_PARTKEY')


# PART SUPPLIER
partsupp_table = 'PARTSUPP'
partsupp_sdf = session.table(f'{src_database}.{src_schema}.{partsupp_table}') \
                        .with_columns(['SUPPKEY','PARTKEY'],[F.col('PS_SUPPKEY'), F.col('PS_PARTKEY')]) \
                        .drop('PS_SUPPKEY', 'PS_PARTKEY')


# REGION
region_table = 'REGION'
region_sdf = session.table(f'{src_database}.{src_schema}.{region_table}') \
                        .with_columns(['REGIONKEY'],[F.col('R_REGIONKEY')]) \
                        .drop('R_REGIONKEY')


# SUPPLIER
supplier_table = 'SUPPLIER'
supplier_sdf = session.table(f'{src_database}.{src_schema}.{supplier_table}') \
                        .with_columns(['SUPPKEY','NATIONKEY'],[F.col('S_SUPPKEY'),F.col('S_NATIONKEY')]) \
                        .drop('S_SUPPKEY', 'S_NATIONKEY')
supplier_nosk_sdf = supplier_sdf.drop('S_SUPPKEY', 'S_NATIONKEY','NATIONKEY')

## Feature Store

We now need to create a Feature Store to develop in.  FeatureStore creates and/or returns an instance of the FeatureStore class in python.  Within Snowflake this creates a Schema and some database tags that are used to identify it as a FeatureStore.

In [84]:
# session.sql(f''' Drop schema if exists {org}_FEATURE_STORE''').collect()

fs =  FeatureStore(
        session=session,
        database=sess_db,
        name=fs_name,
        default_warehouse="SIMON_XS",
        creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

## Entities

We now need create Entities for the various units-of-analysis and their identifying keys that are represented in our Source Tables and ER model.

You can think of Entities as the 'glue' that enables features stored across multiple FeatureViews in the Feature Store to be combined to return data for training/inference etc.

In [85]:
# Customer Entity Definition 
customer_entity = Entity(
    name="CUSTOMER",
    join_keys=["CUSTKEY"],
    desc="Customer entity"
    )
fs.register_entity(customer_entity)


# Lineitem Entity Definition 
lineitem_entity = Entity(
    name="LINEITEM",
    join_keys=['ORDERKEY','LINENUMBER'],
    desc="Lineitem entity"
    )
#    join_keys=['ORDERKEY','PARTKEY', 'SUPPKEY','LINENUMBER'],
fs.register_entity(lineitem_entity)


# Nation Entity Definition 
nation_entity = Entity(
    name="NATION",
    join_keys=["NATIONKEY"],
    desc="Nation entity"
    )
fs.register_entity(nation_entity)

# Order Entity Definition 
order_entity = Entity(
    name="ORDERS",
    join_keys=["ORDERKEY"],
    desc="Order entity"
    )
fs.register_entity(order_entity)


# Part Entity Definition 
part_entity = Entity(
    name="PART",
    join_keys=["PARTKEY"],
    desc="Part entity"
    )
fs.register_entity(part_entity)


# Part Supplier Entity Definition 
part_supplier_entity = Entity(
    name="PART_SUPPLIER",
    join_keys=["PARTKEY", "SUPPKEY"],
    desc="Part Supplier entity"
    )
fs.register_entity(part_supplier_entity)


# Region Entity Definition 
region_entity = Entity(
    name="REGION",
    join_keys=["REGIONKEY"],
    desc="Region entity"
    )
fs.register_entity(region_entity)


# Supplier Entity Definition 
supplier_entity = Entity(
    name="SUPPLIER",
    join_keys=["SUPPKEY"],
    desc="Supplier entity"
    )
fs.register_entity(supplier_entity)

  return f(self, *args, **kargs)
  return f(self, *args, **kargs)
  return f(self, *args, **kargs)
  return f(self, *args, **kargs)
  return f(self, *args, **kargs)
  return f(self, *args, **kargs)
  return f(self, *args, **kargs)
  return f(self, *args, **kargs)


Entity(name=SUPPLIER, join_keys=['SUPPKEY'], owner=None, desc=Supplier entity)

We can check out the Entities that we have just created.

In [87]:
fs.list_entities().sort('NAME').show()

-----------------------------------------------------------------------------
|"NAME"         |"JOIN_KEYS"              |"DESC"                |"OWNER"   |
-----------------------------------------------------------------------------
|CUSTOMER       |["CUSTKEY"]              |Customer entity       |SYSADMIN  |
|LINEITEM       |["ORDERKEY,LINENUMBER"]  |Lineitem entity       |SYSADMIN  |
|NATION         |["NATIONKEY"]            |Nation entity         |SYSADMIN  |
|ORDERS         |["ORDERKEY"]             |Order entity          |SYSADMIN  |
|PART           |["PARTKEY"]              |Part entity           |SYSADMIN  |
|PART_SUPPLIER  |["PARTKEY,SUPPKEY"]      |Part Supplier entity  |SYSADMIN  |
|REGION         |["REGIONKEY"]            |Region entity         |SYSADMIN  |
|SUPPLIER       |["SUPPKEY"]              |Supplier entity       |SYSADMIN  |
-----------------------------------------------------------------------------



## FeatureViews

FeatureViews contain a collection of Features that we want to organise, derive and store together.  When a FeatureView is registered it creates a Snowflake tabular object, either a Dynamic Table or View,  within the Feature Store schema with some tags used to identify metadata related to it.

We will create a FeatureView for each data-source base dataframe and entity. In each FeatureView definition we add all the applicable `entities`, and for compound keys, also include the primary `entities` that they are made of.  For example for the Part-Suppler FeatureView, we add the Part and Supplier entities.

In [88]:
# CUSTOMER FEATUREVIEW
customer_features = customer_sdf.drop("CUSTKEY").columns

# Create Customer FeatureView in Feature Store
customer_fv = FeatureView(
    name = f"FV_CUSTOMER",
    entities = [customer_entity],
    feature_df = customer_nosk_sdf,
    desc = f"Customer feature view"
)
# Register the Customer FeatureView in the schema, and add the python featureview to our list
customer_fv_v1 = fs.register_feature_view(
    feature_view = customer_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)


# LINEITEM FEATUREVIEW
lineitem_features = lineitem_sdf.drop("ORDERKEY", "PARTKEY", 'SUPPKEY', 'LINENUMBER').columns

# Create Lineitem FeatureView in Feature Store
lineitem_fv = FeatureView(
    name = f"FV_LINEITEM",
    entities = [order_entity, lineitem_entity],
    feature_df = lineitem_nosk_sdf,
    desc = f"Lineitem feature view"
)
# Register the Lineitem FeatureView in the schema, and add the python featureview to our list
lineitem_fv_v1 = fs.register_feature_view(
    feature_view = lineitem_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)


# NATION FEATUREVIEW
nation_features = nation_sdf.drop('NATIONKEY').columns

# Create Nation FeatureView in Feature Store
nation_fv = FeatureView(
    name = f"FV_NATION",
    entities = [nation_entity],
    feature_df = nation_nosk_sdf,
    desc = f"Nation feature view"
)
# Register the Nation FeatureView in the schema, and add the python featureview to our list
nation_fv_v1 = fs.register_feature_view(
    feature_view = nation_fv,   # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)


# ORDERS FEATUREVIEW
orders_features = orders_sdf.drop("ORDERKEY", "CUSTKEY").columns

# Create Order FeatureView in Feature Store
orders_fv = FeatureView(
    name = f"FV_ORDERS",
    entities = [order_entity],
    feature_df = orders_nosk_sdf,
    desc = f"Order feature view"
)
# Register the Order FeatureView in the schema, and add the python featureview to our list
orders_fv_v1 = fs.register_feature_view(
    feature_view = orders_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)


# PART FEATUREVIEW 
part_features = part_sdf.drop("PARTKEY").columns

# Create Part FeatureView in Feature Store
part_fv = FeatureView(
    name = f"FV_PART",
    entities = [part_entity],
    feature_df = part_sdf,
    desc = f"Part feature view"
)
# Register the Part FeatureView in the schema, and add the python featureview to our list
part_fv_v1 = fs.register_feature_view(
    feature_view = part_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)


# PART SUPPLIER FEATUREVIEW
part_supp_features = partsupp_sdf.drop("PARTKEY", "SUPPKEY").columns

# Create Part Supplier FeatureView in Feature Store
part_supplier_fv = FeatureView(
    name = f"FV_PART_SUPPLIER",
    entities = [part_entity, supplier_entity],
    feature_df = partsupp_sdf,
    desc = f"Part Supplier feature view"
)
# Register the Part Supplier FeatureView in the schema, and add the python featureview to our list
part_supplier_fv_v1 = fs.register_feature_view(
    feature_view = part_supplier_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)


# REGION FEATUREVIEW
region_features = region_sdf.drop('REGIONKEY').columns

# Create Region FeatureView in Feature Store
region_fv = FeatureView(
    name = f"FV_REGION",
    entities = [region_entity],
    feature_df = region_sdf,
    desc = f"Region feature view"
)
# Register the Region FeatureView in the schema, and add the python featureview to our list
region_fv_v1 = fs.register_feature_view(
    feature_view = region_fv,   # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)


# SUPPLIER FEATUREVIEW
supplier_features = supplier_sdf.drop("SUPPKEY", "NATIONKEY").columns

# Create Supplier FeatureView in Feature Store
supplier_fv = FeatureView(
    name = f"FV_SUPPLIER",
    entities = [supplier_entity],
    feature_df = supplier_nosk_sdf,
    desc = f"Supplier feature view"
)
# Register the Supplier FeatureView in the schema, and add the python featureview to our list
supplier_fv_v1 = fs.register_feature_view(
    feature_view = supplier_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

We can list the base level FeatureViews we have just created to see the metadata related to them.

In [90]:
fs.list_feature_views().sort('NAME').show()

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"NAME"            |"VERSION"  |"DATABASE_NAME"  |"SCHEMA_NAME"              |"CREATED_ON"                |"OWNER"   |"DESC"                      |"ENTITIES"    |"REFRESH_FREQ"  |"REFRESH_MODE"  |"SCHEDULING_STATE"  |"WAREHOUSE"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|FV_CUSTOMER       |V1         |SIMON            |FS_ENTITIES_FEATURE_STORE  |2024-11-26 06:45:16.579000  |SYSADMIN  |Customer feature view       |[             |NULL            |NULL            |NULL                |NULL         |
|                  |           |                 |                      

## Retrieve Data from the Feature Store

Now we have setup the Feature Store objects we need representing our Entities and source tables using FeatureViews, we are ready to use it to retrieve data for training or inference. 

To do this we need to create a spine dataframe containing the key-columns and their values we want returned in our training data set. The spine could be created with SQL or Snowpark Dataframe.  We create a simple dataframe to retrieve the required columns from the source table and sample rows from the result to provide a sample of keys.  At this step for model-training, you will also typically want to join the entity keys to a table containing the label/target column that we will use for training our model. We can also add any other data into the Spine that we want to appear in our returned Dataset.

We step through a number of different examples for retrieving a training Dataset from single through multiple entities and FeatureViews

#### Single Key Column Entity - Customer Training Data 

First create a Spine dataframe.

> **_NOTE:_**  We save the Spine dataframe as a Temporary Table so we can get reproducible results when we use the same Spine.

Our spine contains a single Entity entity-key (CUSTKEY).

In [92]:
customer_spine_tbl = [sess_db,org,'CUSTOMER_SPINE']

(customer_sdf.select('CUSTKEY')  
    .distinct()  
    .sample(n=num_spine_rows)  
    .write.saveAsTable(customer_spine_tbl,mode = 'overwrite', table_type = 'temp')
)

customer_spine_df = session.table(customer_spine_tbl)
    
customer_spine_df.sort('CUSTKEY').show()

-------------
|"CUSTKEY"  |
-------------
|40357      |
|48403      |
|58610      |
|64255      |
|66672      |
|82805      |
|88259      |
|92587      |
|112440     |
|138444     |
-------------



Now we have a Spine we can generate a training set for a single FeatureView (customer_fv_v1), which returns a Dataframe object.

In [94]:
customer_training_df = fs.generate_training_set(
    customer_spine_df,
    features = [customer_fv_v1]
)

customer_training_df.sort('CUSTKEY').show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTKEY"  |"C_NAME"            |"C_ADDRESS"                              |"C_PHONE"        |"C_ACCTBAL"  |"C_MKTSEGMENT"  |"C_COMMENT"                                         |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|40357      |Customer#000040357  |vAsvErZPb1i                              |14-707-231-4746  |3429.18      |HOUSEHOLD       |nto beans. slyly bold excuses wake slyly sly        |
|48403      |Customer#000048403  |LqPenCvrCPrz9AuB,MKIqGLMuHPlBt           |12-894-249-5575  |-466.51      |FURNITURE       |ges sleep furiously express, ironic theodolites...  |
|58610      |Customer#000058610  |ZliIen 3HmZWYsbGCbd ckuB1jr5             |29-787-573-8778  |4542.85    

We can see the CUSTKEY we supplied in the Spine dataframe, and the Features from the FeatureView are returned.

#### Multi Key Column Entity - Lineitem Training Data 

As before we create a Spine dataframe, this time including the two key columns that we need for the Lineitem FeatureView and its corresponding Entity. This is because the unique indentifier for Lineitems, is a compound-key consisting of both the ORDERKEY, and the LINENUMBER

> **_NOTE:_** We also need to provide the secondary keys that were explicitly added to the lineitem FeatureView. 

In [69]:
lineitem_spine_tbl = [sess_db,org,'LINEITEM_SPINE']

( lineitem_sdf.select('ORDERKEY','LINENUMBER') 
    .distinct() 
    .sample(n=num_spine_rows) 
    .write.saveAsTable(lineitem_spine_tbl,mode = 'overwrite', table_type = 'temp')
)

lineitem_spine_df = session.table(lineitem_spine_tbl)
    
lineitem_spine_df.sort('ORDERKEY', 'LINENUMBER').show()

-----------------------------
|"ORDERKEY"  |"LINENUMBER"  |
-----------------------------
|173280      |4             |
|564964      |5             |
|718275      |2             |
|1026627     |5             |
|1757382     |5             |
|1789542     |1             |
|3042596     |1             |
|4413249     |7             |
|4955171     |3             |
|5831846     |2             |
-----------------------------



Now we can retreive the training dataframe from the single FeatureView `lineitem_fv_v1`, using the compound key-columns.

In [95]:
lineitem_training = fs.generate_training_set(
    lineitem_spine_df,
    features = [lineitem_fv_v1]
)

lineitem_training.sort('ORDERKEY', 'LINENUMBER').show()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDERKEY"  |"LINENUMBER"  |"L_QUANTITY"  |"L_EXTENDEDPRICE"  |"L_DISCOUNT"  |"L_TAX"  |"L_RETURNFLAG"  |"L_LINESTATUS"  |"L_SHIPDATE"  |"L_COMMITDATE"  |"L_RECEIPTDATE"  |"L_SHIPINSTRUCT"   |"L_SHIPMODE"  |"L_COMMENT"                                 |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|173280      |4             |35.00         |68351.85           |0.07          |0.06     |R               |F               |1994-01-29    |1994-03-12      |1994-02-23       |TAKE BACK RETURN   |SHIP          | sly foxes. acc               

In the result we can see the entity-key columns that we explicitly added to our Spine and FeatureView definition and the 'features' all prefixed with `L_`.

####  Multiple Entities - Using Primary and Secondary Keys derived from a single Source table

Let's try adding additional FeatureViews into the `generate_training_set` function to return features that match on the additional secondary keys provided in the lineitem table. 

This will enable us to return features from the `part`, `supplier`, and `partsupp` FeatureViews, using the Secondary Keys supplied in the `lineitem` table, using the Entity key-columns of those.

As before we need create a Spine to explicitly provide the keys for the additional entities; `part`, `supplier`

In [96]:
lineitem_orders_part_supplier_spine_tbl = [sess_db, org,'LINEITEM_ORDERS_PART_SUPPLIER_SPINE']

lineitem_sdf.select('ORDERKEY','LINENUMBER','PARTKEY', 'SUPPKEY') \
    .distinct() \
    .sample(n=num_spine_rows) \
    .write.saveAsTable(lineitem_orders_part_supplier_spine_tbl, mode = 'overwrite', table_type = 'temp')

lineitem_orders_part_supplier_spine_df = session.table(lineitem_orders_part_supplier_spine_tbl)

lineitem_orders_part_supplier_spine_df.sort('ORDERKEY', 'LINENUMBER','PARTKEY', 'SUPPKEY').show()

-----------------------------------------------------
|"ORDERKEY"  |"LINENUMBER"  |"PARTKEY"  |"SUPPKEY"  |
-----------------------------------------------------
|173090      |6             |118494     |8495       |
|2360514     |6             |147980     |5523       |
|2365956     |3             |133959     |6473       |
|2644133     |3             |63027      |8040       |
|3242246     |4             |94382      |9401       |
|3502467     |4             |116181     |6182       |
|4045989     |3             |36656      |6657       |
|4700679     |1             |198776     |3815       |
|5288550     |2             |60542      |5555       |
|5371170     |4             |165853     |886        |
-----------------------------------------------------



If we just use the Lineitem featureview, as before, we can see that our result now includes the additional keys that we provided in the Spine.  The Spine is left-joined to each FeatureView and the keys supplied in the Spine are included and the result.  For each FeatureView we want to retrive features from, we only need to provide the specific entity key-columns for that FeatureView within the Spine.  In effect, the Spine contains the entity-hierarchy information within, based on the relations between those entity key-columns.

In [74]:
line_item_training = fs.generate_training_set(
    lineitem_orders_part_supplier_spine_df,
    features = [lineitem_fv_v1]
)

line_item_training.sort('ORDERKEY', 'LINENUMBER','PARTKEY', 'SUPPKEY').show()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDERKEY"  |"LINENUMBER"  |"PARTKEY"  |"SUPPKEY"  |"L_QUANTITY"  |"L_EXTENDEDPRICE"  |"L_DISCOUNT"  |"L_TAX"  |"L_RETURNFLAG"  |"L_LINESTATUS"  |"L_SHIPDATE"  |"L_COMMITDATE"  |"L_RECEIPTDATE"  |"L_SHIPINSTRUCT"   |"L_SHIPMODE"  |"L_COMMENT"                                 |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|46822       |7             |112822     |2823       |12.00         |22017.84           |0.02          |0.07     |N               |O               |1997-04-16    |1997

If we now include the additional FeatureViews we want the features from, that are at the higher entity-hierarchy levels from lineitem the required joins will be created using the Spine key-columns to each of those.

In [97]:
lineitem_orders_part_supplier_training = fs.generate_training_set(
    lineitem_orders_part_supplier_spine_df,
    features = [lineitem_fv_v1, orders_fv_v1, part_fv_v1, supplier_fv_v1, part_supplier_fv_v1]
)

lineitem_orders_part_supplier_training.sort('ORDERKEY', 'LINENUMBER','PARTKEY', 'SUPPKEY').show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDERKEY"  |"LINENUMBER"  |"PARTKEY"  |"SUPPKEY"  |"L_QUANTITY"  |"L_EXTENDEDPRICE"  |"L_DISCOUNT"  |"L_TAX"  |"L_RETURNFLAG"  |"L_LINESTATUS"  |"L_SHIPDATE"  |"L_

We can see in the result that the tablename prefix for each of the columns denotes the source table those features have been derived from.
- `L_` = lineitem
- `O_` = orders
- `P_` = part
- `S_` = supplier
- `PS_` = part-supplier

####  Multiple Entities - Using Primary and Secondary Keys derived from a multiple Source tables

Let's further extend our Spine by joining to the `orders`, `part` and `supplier` tables and filtering out the non-key columns.  i.e. any columns without an `_` in the column-name.

This will add the customer (`CUSTKEY`) from `orders`, the nation (`NATIONKEY`) from `customer` and the region (`REGIONKEY`) from `nation`.

In [98]:
lineitem_orders_part_supplier_spine_tbl = [sess_db, org,'LINEITEM_ORDERS_PART_SUPPLIER_CUSTOMER_NATION_REGION_SPINE']

lineitem_order_part_supplier_customer_nation_region_spine_allcols_df = lineitem_sdf.select('ORDERKEY','LINENUMBER','PARTKEY', 'SUPPKEY') \
      .join(orders_sdf, on = 'ORDERKEY') \
      .join(customer_sdf, on = 'CUSTKEY') \
      .join(nation_sdf, on = 'NATIONKEY') \

# Get key columns from result
keycols = [col for col in lineitem_order_part_supplier_customer_nation_region_spine_allcols_df.columns if '_' not in col]
keycols.sort()

lineitem_order_part_supplier_customer_nation_region_spine_allcols_df.select(keycols) \
      .distinct() \
      .sample(n=num_spine_rows) \
      .write.saveAsTable(lineitem_orders_part_supplier_spine_tbl, mode = 'overwrite', table_type = 'temp')

lineitem_order_part_supplier_customer_nation_region_spine_df  = session.table(lineitem_orders_part_supplier_spine_tbl)

lineitem_order_part_supplier_customer_nation_region_spine_df.sort(keycols).show(100)

---------------------------------------------------------------------------------------------
|"CUSTKEY"  |"LINENUMBER"  |"NATIONKEY"  |"ORDERKEY"  |"PARTKEY"  |"REGIONKEY"  |"SUPPKEY"  |
---------------------------------------------------------------------------------------------
|15610      |4             |1            |2635969     |171803     |1            |9355       |
|27628      |4             |12           |2610436     |72078      |2            |7093       |
|32033      |1             |4            |5555648     |51463      |4            |3969       |
|62255      |1             |0            |3290948     |184948     |0            |4949       |
|63613      |4             |20           |2016578     |52590      |4            |7601       |
|78599      |1             |0            |4834368     |35063      |0            |2573       |
|80230      |4             |7            |2766945     |189850     |3            |4887       |
|81821      |1             |2            |824896      |62093

This Spine includes all the key-column information across all FeatureViews so we should now be able to retrieve features from all the FeatureViews we have created, joining them together, and returning a Lineitem level set of features.  The higher entity-hierarchy level features have been duplicated appropriately down to Lineitem level.

In [None]:
lineitem_order_part_supplier_customer_nation_region_spine_df_training = fs.generate_training_set(
    lineitem_order_part_supplier_customer_nation_region_spine_df,
    features = [lineitem_fv_v1, nation_fv_v1, part_fv_v1, supplier_fv_v1, part_supplier_fv_v1, orders_fv_v1, region_fv_v1, customer_fv_v1]
)
lineitem_order_part_supplier_customer_nation_region_spine_df_training.sort(keycols).show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

We can see all the key-columns and all the features, prefixed by table synonym.

In [100]:
lineitem_order_part_supplier_customer_nation_region_spine_df_training.columns

['CUSTKEY',
 'LINENUMBER',
 'NATIONKEY',
 'ORDERKEY',
 'PARTKEY',
 'REGIONKEY',
 'SUPPKEY',
 'L_QUANTITY',
 'L_EXTENDEDPRICE',
 'L_DISCOUNT',
 'L_TAX',
 'L_RETURNFLAG',
 'L_LINESTATUS',
 'L_SHIPDATE',
 'L_COMMITDATE',
 'L_RECEIPTDATE',
 'L_SHIPINSTRUCT',
 'L_SHIPMODE',
 'L_COMMENT',
 'N_NAME',
 'N_COMMENT',
 'P_NAME',
 'P_MFGR',
 'P_BRAND',
 'P_TYPE',
 'P_SIZE',
 'P_CONTAINER',
 'P_RETAILPRICE',
 'P_COMMENT',
 'S_NAME',
 'S_ADDRESS',
 'S_PHONE',
 'S_ACCTBAL',
 'S_COMMENT',
 'PS_AVAILQTY',
 'PS_SUPPLYCOST',
 'PS_COMMENT',
 'O_ORDERSTATUS',
 'O_TOTALPRICE',
 'O_ORDERDATE',
 'O_ORDERPRIORITY',
 'O_CLERK',
 'O_SHIPPRIORITY',
 'O_COMMENT',
 'R_NAME',
 'R_COMMENT',
 'C_NAME',
 'C_ADDRESS',
 'C_PHONE',
 'C_ACCTBAL',
 'C_MKTSEGMENT',
 'C_COMMENT']

####  Multiple Entities - Working with lower-level FeatureViews from a higher Entity level

Sometimes we may be building an ML model and working at an Entity level in the hierarchy, where we have Entities below it at a lower-level.  Recall, an Entity is essentially the Primary Key for a Business Entity.  That Primary Key, maybe used as the Secondary Key in other tables, or as part of a Compound Primary-Key in a lower-level.  It is highly likely that we will have multiple rows of data in the lower-level FeatureViews for each distinct Entity key-value from the higher level we are working at.

Let's illustrate this with an example from our data.  We have an Orders table (FeatureView), and a Lineitem table (FeatureView).  We want to build a model to predict something related to Orders, so we need a record of training data per Order.  Within the Lineitem table we have mutliple records per Order, as it is part of the Compound Primary Key, where Linenumber provides uniqueness, within the ORDERKEY.  Lineitem includes additional raw features that we may want to include within our Training data.

Let's check this out within the data, and look at a few different ways we might handle this.

In [131]:
fs.list_entities().filter(F.in_([F.col('NAME')], ['ORDERS','LINEITEM'])).show()

-------------------------------------------------------------------
|"NAME"    |"JOIN_KEYS"              |"DESC"           |"OWNER"   |
-------------------------------------------------------------------
|LINEITEM  |["ORDERKEY,LINENUMBER"]  |Lineitem entity  |SYSADMIN  |
|ORDERS    |["ORDERKEY"]             |Order entity     |SYSADMIN  |
-------------------------------------------------------------------



We can see the compound-key includes ORDERKEY in the Lineitem entity above.

Lets get a sample of ORDERKEY's that have multiple rows in LINEITEM.

In [132]:
multi_linenumber_orders_tbl = [sess_db, org,'MULTI_LINENUMBER_ORDERS']

lineitem_sdf.select('ORDERKEY','LINENUMBER') \
    .distinct() \
    .group_by("ORDERKEY") \
    .agg((F.col("LINENUMBER"), "count")) \
    .with_column_renamed( F.col("COUNT(LINENUMBER)"),'LINEITEM_COUNT') \
    .filter(F.col("LINEITEM_COUNT") >1) \
    .sample(n= num_spine_rows) \
    .write.saveAsTable(multi_linenumber_orders_tbl, mode = 'overwrite', table_type = 'temp')

multi_linenumber_orders_df = session.table(multi_linenumber_orders_tbl)
multi_linenumber_orders_df.sort(F.col('LINEITEM_COUNT').desc() ).show()

---------------------------------
|"ORDERKEY"  |"LINEITEM_COUNT"  |
---------------------------------
|328135      |7                 |
|827271      |6                 |
|1959973     |6                 |
|2890464     |6                 |
|3659910     |3                 |
|3745763     |5                 |
|4000801     |2                 |
|4136992     |6                 |
|4210402     |5                 |
|4556836     |7                 |
---------------------------------



We create an Order Spine containing those ORDERKEYs with multiple Lineitem's.

In [160]:
order_spine_tbl =  [sess_db, org,'ORDERS_SPINE']

lineitem_sdf.select('ORDERKEY','LINENUMBER') \
    .join(multi_linenumber_orders_df, on = 'ORDERKEY') \
    .write.saveAsTable(order_spine_tbl, mode = 'overwrite', table_type = 'temp')

order_spine_df = session.table(order_spine_tbl)

order_spine_df.sort('ORDERKEY','LINENUMBER').drop('LINEITEM_COUNT').show(100)

-----------------------------
|"ORDERKEY"  |"LINENUMBER"  |
-----------------------------
|328135      |1             |
|328135      |2             |
|328135      |3             |
|328135      |4             |
|328135      |5             |
|328135      |6             |
|328135      |7             |
|827271      |1             |
|827271      |2             |
|827271      |3             |
|827271      |4             |
|827271      |5             |
|827271      |6             |
|1959973     |1             |
|1959973     |2             |
|1959973     |3             |
|1959973     |4             |
|1959973     |5             |
|1959973     |6             |
|2890464     |1             |
|2890464     |2             |
|2890464     |3             |
|2890464     |4             |
|2890464     |5             |
|2890464     |6             |
|3659910     |1             |
|3659910     |2             |
|3659910     |3             |
|3745763     |1             |
|3745763     |2             |
|3745763  

We can use the Spine to generate a training dataframe from the orders and lineitem FeatureViews, and check out the result.

In [136]:
order_training = fs.generate_training_set(
    order_spine_df,
    features = [orders_fv_v1, lineitem_fv_v1]
).sort('ORDERKEY', 'LINENUMBER').show(20)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDERKEY"  |"LINENUMBER"  |"LINEITEM_COUNT"  |"O_ORDERSTATUS"  |"O_TOTALPRICE"  |"O_ORDERDATE"  |"O_ORDERPRIORITY"  |"O_CLERK"        |"O_SHIPPRIORITY"  |"O_COMMENT"                                         |"L_QUANTITY"  |"L_EXTENDEDPRICE"  |"L_DISCOUNT"  |"L_TAX"  |"L_RETURNFLAG"  |"L_LINESTATUS"  |"L_SHIPDATE"  |"L_COMMITDATE"  |"L_RECEIPTDATE"  |"L_SHIPINSTRUCT"   |"L_SHIPMODE"  |"L_COMMENT"                                |
--------------------------------------------------------------------------------------------------------------------------------------

We can see that we have multiple rows of training data per ORDERKEY that includes the order (`O_`) features repeated (denormalised) for each distinct Linenumber and row of lineitem (`L_`) features.  This might be OK for our needs as-is, or we may need to perform some post-processing on this dataset to prepare the data further for our needs.  For example, maybe we want to pivot the lineitem features to columns, or more likely create derived aggregated features from them.

We can of course create additional FeatureViews within our Feature Store, that perform and maintain this processing for us.  Lets try creating some derived features at the Order entity level from the lineitem features.  We can easily create FeatureViews derived from other FeatureViews in our Feature Store to build data pipelines.

Firstly we create some new derived Lineitem level features ; Discount, Tax Due, and Return / Non-Return indicators.

In [163]:
lineitem_derived_features = lineitem_fv_v1.feature_df.with_columns(['L_DISCOUNT', 'L_TAX_DUE', 'L_RETURN_IND', 'L_NONRETURN_IND'],
                                                                   [F.col('L_EXTENDEDPRICE') * F.col('L_DISCOUNT'),  
                                                                    F.col('L_EXTENDEDPRICE') * F.col('L_TAX'),
                                                                    (F.when(F.col('L_RETURNFLAG') == 'R', F.lit(1)).otherwise(F.lit(0))),
                                                                    (F.when(F.col('L_RETURNFLAG') == 'R', F.lit(0)).otherwise(F.lit(1)))
                                                                    ]) \
    .select([ 'ORDERKEY', 'LINENUMBER', 'L_DISCOUNT', 'L_TAX_DUE', 'L_RETURN_IND', 'L_NONRETURN_IND', 'L_QUANTITY', 'L_EXTENDEDPRICE'])

lineitem_derived_features.show()

----------------------------------------------------------------------------------------------------------------------------------
|"ORDERKEY"  |"LINENUMBER"  |"L_DISCOUNT"  |"L_TAX_DUE"  |"L_RETURN_IND"  |"L_NONRETURN_IND"  |"L_QUANTITY"  |"L_EXTENDEDPRICE"  |
----------------------------------------------------------------------------------------------------------------------------------
|2400001     |1             |400.8900      |267.2600     |0               |1                  |10.00         |13363.00           |
|2400001     |2             |805.0056      |603.7542     |1               |0                  |14.00         |20125.14           |
|2400001     |3             |235.3014      |1647.1098    |0               |1                  |18.00         |23530.14           |
|2400001     |4             |301.9770      |268.4240     |0               |1                  |2.00          |3355.30            |
|2400001     |5             |1384.5923     |593.3967     |0               |1       

And we create a new FeatureView for these derived lineitem features.

In [None]:
# LINEITEM DERIVED FEATUREVIEW

# Create Lineitem Derived FeatureView in Feature Store
lineitem_derived_fv = FeatureView(
    name = f"FV_LINEITEM_DERIVED",
    entities = [order_entity, lineitem_entity],
    feature_df = lineitem_derived_features,
    desc = f"Order/Lineitem level, Derived features"
)
# Register the Order Lineitem aggregated FeatureView in the schema
lineitem_derived_fv_v1 = fs.register_feature_view(
    feature_view = lineitem_derived_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

We will now aggregate these Lineitem level features up to the Order level and use it to create a new FeatureView derived from the Lineitem FeatureView.  See how `lineitem_derived_fv_v1.feature_df`, is used as the source dataframe for the Order level aggregated features.

In [None]:
order_lineitem_aggregated_features = lineitem_derived_fv_v1.feature_df.group_by('ORDERKEY') \
    .agg(F.sum("L_DISCOUNT").alias("OLA_ORDER_TOTAL_DISCOUNT_COST"),
        F.sum("L_TAX_DUE").alias("OLA_TOTAL_TAX_DUE"),
        F.avg("L_DISCOUNT").alias("OLA_ORDER_AVG_DISCOUNT_COST"),
        F.avg("L_TAX_DUE").alias("OLA_AVG_TAX_DUE"),
        F.count("*").alias("OLA_LINEITEM_COUNT"),         
        F.sum("L_NONRETURN_IND").alias("OLA_SUM_RETURN"),
        F.sum("L_RETURN_IND").alias("OLA_SUM_NONRETURN")
        ) \
    .with_column("OLA_RETURN_RATIO", F.div0(F.col("OLA_SUM_RETURN"), F.col("OLA_SUM_NONRETURN")))

order_lineitem_aggregated_features.show()

We can use the Order Lineitem aggregated features dataframe to create our new Order level FeatureView.

In [None]:
# ORDER_AGGREGATED_LINEITEM FEATUREVIEW

# Create Order Lineitem aggregated FeatureView in Feature Store
order_lineitem_aggregated_fv = FeatureView(
    name = f"FV_ORDER_LINEITEM_AGG",
    entities = [order_entity],
    feature_df = order_lineitem_aggregated_features,
    desc = f"Order level, Lineitem aggregated features"
)
# Register the Order Lineitem aggregated FeatureView in the schema
order_lineitem_aggregated_fv_v1 = fs.register_feature_view(
    feature_view = order_lineitem_aggregated_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

We create new Orders Spine containing only the distinct ORDERKEYs, and then combine the Orders and Lineitem Aggregated features to create a training dataframe result.

In [165]:
order_multiline_spine_tbl =  [sess_db, org,'ORDERS_MULTILINE_SPINE']

order_spine_df.select('ORDERKEY').distinct().sample(n= num_spine_rows) \
    .write.saveAsTable(order_multiline_spine_tbl, mode = 'overwrite', table_type = 'temp')

order_multiline_spine_df = session.table(order_multiline_spine_tbl)

order_training = fs.generate_training_set(
    order_multiline_spine_df,
    features = [orders_fv_v1
                , order_lineitem_aggregated_fv_v1
                ]
).sort('ORDERKEY').show(20)

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDERKEY"  |"O_ORDERSTATUS"  |"O_TOTALPRICE"  |"O_ORDERDATE"  |"O_ORDERPRIORITY"  |"O_CLERK"        |"O_SHIPPRIORITY"  |"O_COMMENT"                                         |"LA_ORDER_TOTAL_DISCOUNT_COST"  |"LA_TOTAL_TAX_DUE"  |"LA_ORDER_AVG_DISCOUNT_COST"  |"LA_AVG_TAX_DUE"  |"LA_SUM_RETURN"  |"LA_SUM_NONRETURN"  |"LA_RETURN_RATIO"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

We have produced a training dataframe containing a row per ORDERKEY value with the Order (`O_`) and Order Lineitem Aggregated (`OLA_`) features.

### Summary 
So far we have created :
- base level Feature Views from source tables with 'raw' features (columns) as they appear in the source data.
- Feature Views containing additional row-level derived and computed features like `L_DISCOUNT`, `L_TAX_DUE` and boolean indicators like `L_RETURN_IND`, `L_NONRETURN_IND`.
- Feature Views using aggregated features, aggregating from an entity-key lower in the entity-hierarchy (e.g. `LINEITEM`) upto a higher level (e.g `ORDER`).

We have shown how we can join these Feature Views together to retrieve data for training, combining features from different Entities, and ensuring we get valid outputs.

Next we will show additional feature preprocessing to prepare data into a more consumable model-ready state.  

# Derived Features - PreProcessing

A common question (or assumption) we here is _“Where should the feature pre-processing required for a model reside?”_  or _“Can I store pre-processed features in the Feature Store?”_

__Feature pre-processing : a definition__
It is an important step, maybe the most important, in preparing raw data for machine-learning and is the process of treating and standardising features such that they can be directly used within an ML model and are better suited to the modelling problem.   For example,  many ML models make assumptions about the form of their input features, or may not perform well with untreated features.


### Model-specific data processing (transformations) - FeatureStore or Model-Pipeline
_How should we think about this, and where should model specific feature processing reside?_   
In general, model-specific processing transformations should be done as part of the model training pipeline, and not as a general transform applied to all data within the Feature Store using a FeatureView.   This is likely not so much an issue for encoders (e.g categorical), assuming the categories are very stable, or fixed, in the data, but can be an issue for scaling type processors that capture volatile state from the sample e.g. min/max scaler.  The scaler fit needs to be applied to each distinct sample of data at model training/testing/validation time to avoid data leakage. Hence it will differ for each new sample of data used in training, testing or cross-validation.   If this was done within feature-views, specific to each sample of data used for a model execution (training, testing, inference), you could end up with potentially 100's of feature-views, 1 per model-version.  Hence it is generally more appropriate for those state values to be captured by fitting to be saved within the model-pipeline, which can then be stored within the Snowflake model-registry, providing end-to-end lineage from ; raw source data -> model-generic feature-views -> model-specific preprocessing pipeline -> model.  Nevertheless, we will show an example of how these transformations can be performed within the FeatureStore should it be required.


### Encoding Categorical Data
It is common for categorical data to be encoded into numeric values for use within a model.  There are various common approaches for this:
- Ordinal Encoding - Each distinct value is assigned an integer value used to represent the categorical value. Simplist feature encoding scheme.
- Label Encoding - As for Ordinal Encoding, but no inherent ordering in values and their index.  Typically used for encoding target variables rather than features.  
- One-Hot Encoding - Each distinct categorical value is represented as new feature of boolean or 0/1 numeric type. This technique can result in significant feature expansion where a large number of distinct values exist in the unencoded categorical feature and there are various techniques used to avoid this. e.g. only applying encoding to features with distinct values below a set level, or bucketing distinct values with low cardinality in a feature into a new single category.

There are many possible approaches that can be used. Here's a couple: 
- use Snowpark (or SQL) to generate dataframe code that represents the transformation directly, which is used as the input dataframe for a FeatureView.
- use the preprocessing functions available in scikit-learn or the scalable equivalents in snowflake.ml.modelling. Fit a preprocessing model/pipeline with the data from FeatureViews.  Save the model into model-registry.  Reuse the model/pipeline function within a dataframe that is then used as the input to derive a FeatureView.

We will show the first approach showing how a One-Hot Encoding schema can be achieved using Snowpark Dataframe code alone.  For this, we capture the distinct values in categorical columns, derive new columns (via case expressions) using these values to derive the one-hot-encoded columns.  For simplicity we have assumed that string based columns are categorical.  We have also assigned a limit of 100 as the maximum number of distinct values that we will accept in a categorical column for one-hot encoding before an alternative treatment is necessary. This prevents the column (feature) explosion issue mentioned earlier.

The code below :
- identifies string (categorical) feature in our base lineitem featureview dataframe.
- gets the distinct value count for each string feature
- filters out string features with a higher distinct value count than our threshold to derive the categorical feature list
- gets the distinct values for each categorical feature
- creates a select clause consisting of a new column for each categorical feature/distinct value combination plus an additional column to represent unknown values

#### One Hot Encode

In [None]:
# Use the schema to retrieve only the String columns we want to one-hot-encode
li_fv_v1_schema = lineitem_fv_v1.feature_df.schema
string_fields = [field for field in li_fv_v1_schema.fields if isinstance(field.datatype, T.StringType)]
print(string_fields)
print(' ')

# Get the distinct value count for each field as a Dict.
field_dv_counts = lineitem_fv_v1.feature_df.select([F.count_distinct(F.col(field.name)).alias(field.name)  for field in string_fields]) \
    .select(F.object_construct('*').alias('FIELD_DV_COUNTS')) \
    .collect()[0][0]
field_dv_counts_dict = json.loads(field_dv_counts)
print('Field Distinct Value Counts: ',field_dv_counts_dict)
print(' ')

# Filter the Fields based on a threshold that determines whether we want to treat as Categorical, or leave untreated as String field
cat_fields_threshold = 100
cat_fields = [key for key, value in field_dv_counts_dict.items() if value <= cat_fields_threshold]
print('Categorical Fields: ', cat_fields)
print(' ')

# Get a table of distinct values for each categorical column
dv_fields = lineitem_fv_v1.feature_df.select([F.array_agg(F.replace(F.col(field)," ", "") , is_distinct = True).alias(field)  for field in cat_fields])
dv_fields.show()
print(' ')

# and create a Dict of sorted categorical values for each Field from the table
dv_fields_dict = json.loads(dv_fields.select(F.object_construct('*').alias('string_dv_dict')).collect()[0]['STRING_DV_DICT'])
dv_fields_dict = {key: sorted(value) for key, value in dv_fields_dict.items()}
print(dv_fields_dict)
print(' ')

# Create the select case expressions for the new columns 
select_onehenc_expr = ['ORDERKEY','LINENUMBER']
# For each Categorical Field
for field in dv_fields_dict:
    # For each distinct value
    for idx , v in enumerate(dv_fields_dict[field]):
        # Create a case expression to test for value in Field
        select_onehenc_expr.append(F.when(F.col(field) == v, F.lit(1)).otherwise(F.lit(0)).alias(f'{field}_{v}'))
    # And create an additional Case Expression to handle and UNKNOWN values that appear in the table at a later date that we have not already handled    
    select_onehenc_expr.append(F.when(F.col(field).in_(dv_fields_dict[field]), F.lit(0)).otherwise(F.lit(1)).alias(f'{field}_UNKNOWN'))    
print('Snowpark Case Expressions: ', select_onehenc_expr)
print(' ')

Now that we have built the Select case-expression column list we can use it to process the raw categorical-features and derive our one-hot encoded feature columns.

In [None]:
lineitem_categorical_onehotenc_features = lineitem_fv_v1.feature_df.select(select_onehenc_expr)
lineitem_categorical_onehotenc_features.show()

We can now use the dataframe to create a new FeatureView with the One Hot Encoded features.  Our new dataframe is derived from the base `lineitem_fv_v1` FeatureView.  We could of equally chosen to derive it directly from the underlying source table dataframe (`lineitem_sdf`), but using the base-level FeatureView might give us additional useful lineage information.

In [None]:
# LINEITEM CATEGORICAL ONE-HOT_ENCODED FEATURES FEATUREVIEW

# Create Lineitem one-hot encoded FeatureView in Feature Store
lineitem_onehotenc_fv = FeatureView(
    name = f"FV_LINEITEM_CAT_1HENC",
    entities = [order_entity, lineitem_entity],
    feature_df = lineitem_categorical_onehotenc_features,
    desc = f"Order / Lineitem level - Lineitem categorical features treated with one-hot encoding scheme"
)
# Register the Lineitem one-hot encoded FeatureView in the schema
lineitem_onehotenc_fv_v1 = fs.register_feature_view(
    feature_view = lineitem_onehotenc_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,                            # whether function call blocks until initial data is available
    overwrite = True,                        # whether to replace existing feature view with same name/version
)

In [None]:
print(lineitem_onehotenc_fv_v1.feature_df.columns)

We will use the Spine we created earlier which includes all the entity keys `lineitem_order_part_supplier_customer_nation_region_spine_df` and use it to generate a training dataset containing the raw source, derived and categorical features fpr our lineitem FeatureViews.

In [None]:
lineitem_order_part_supplier_customer_nation_region_spine_df.sort(keycols).show(10)

In [None]:
lineitem_derived_and_categorical_df_training = fs.generate_training_set(
    lineitem_order_part_supplier_customer_nation_region_spine_df,
    features = [lineitem_fv_v1, lineitem_derived_fv_v1, lineitem_onehotenc_fv_v1]
)
lineitem_derived_and_categorical_df_training.sort(keycols).show()

In [None]:
lineitem_derived_and_categorical_df_training.columns

#### Ordinal Encode

In [None]:
def ordinal_to_snowpark_expr(encoded_fit):
    case_expressions = []
    # Loop over each column we want to encode
    for input_col, output_col in zip(encoded_fit.input_cols, encoded_fit.output_cols):
        # Fetch categories and ensure conversion to list
        categories_ndarray = encoded_fit.categories_.get(input_col, [])
        categories = categories_ndarray.tolist() if isinstance(categories_ndarray, np.ndarray) else categories_ndarray
        if not categories:
            raise ValueError(f"No categories found for column: {input_col}")
        
        # Start a fresh case expression for this column
        case_expr = None
        # Build the ordinal when chain for this column
        for idx, category in enumerate(categories):
            if case_expr is None:
                case_expr = F.when(F.col(input_col) == F.lit(category), F.lit(idx))
            else:
                case_expr = case_expr.when(F.col(input_col) == F.lit(category), F.lit(idx))

        # Handle unknown categories (assign -1 if not found in the categories list)
        if case_expr is not None:
            case_expr = case_expr.otherwise(F.lit(-1))
            case_expressions.append(case_expr.alias(output_col))
        else:
            raise ValueError(f"Failed to construct case expression for column: {input_col}")

    return case_expressions

def ss_snowpark_expression_builder(ss_pdf, array_colnames):
    ss_snowpark_exp = [] 
    for r in ss_pdf:
        for a in array_colnames:    
            ss = pickle.loads(codecs.decode(r['SS_MODEL_PKL'].encode(), "base64"))
            res_row.append({'PARTITION_COL':r['PARTITION_COL'], 'SCALES' : ss.scale_.tolist(), 'MEAN' : ss.mean_.tolist(), 'VARIANCE' : ss.var_.tolist(), 'NROWS_IN_FIT_GROUP' : int(ss.n_samples_seen_), 'N_FEATURES_IN_ARRAY' : ss.n_features_in_, 'PKL_SS': r['SS_MODEL_PKL']})
    return res_row     

In [None]:
# Lineitem FeatureView V1 : Categorical Coluns
lineitem_fv_v1_sdf = (lineitem_fv_v1.feature_df
    .select(F.col("ORDERKEY"),
            F.col("LINENUMBER"),
            F.col("L_RETURNFLAG"),
            F.col("L_LINESTATUS"), 
            F.col("L_SHIPINSTRUCT"), 
            F.col("L_SHIPMODE")
           )
    )
# Input and Output Columns
li_input_cols = ["L_RETURNFLAG", "L_LINESTATUS", "L_SHIPINSTRUCT","L_SHIPMODE"]
li_output_cols = [f"ORDINAL_{col}" for col in li_input_cols]

# Fit Ordinal Encoder Model
li_encoder = OrdinalEncoder(
    input_cols=li_input_cols,
    output_cols=li_output_cols
).fit(lineitem_fv_v1_sdf)

In [None]:
# Create Dataframe using Ordinal Encoded model, transformed to CASE expressions
lineitem_ordinalEncodedCats_df = (lineitem_fv_v1.feature_df
    .select(F.col("ORDERKEY"),
            F.col("LINENUMBER"),
            F.col("L_RETURNFLAG"),
            F.col("L_LINESTATUS"), 
            F.col("L_SHIPINSTRUCT"), 
            F.col("L_SHIPMODE"))
    .with_columns(li_encoder.output_cols, ordinal_to_snowpark_expr( li_encoder)))

# Create Lineitem Ordinal Encoded FeatureView in Feature Store
lineitem_ordinalEncodedCats_fv = FeatureView(
    name = f"FV_LINEITEM_ORDINALENCODEDCATS",
    entities = [order_entity, lineitem_entity],
    feature_df = lineitem_ordinalEncodedCats_df,
    desc = f"Lineitem categorical columns original and ordinal encoded feature view"
)

# Register the FeatureView in the schema
lineitem_ordinalEncodedCats_fv_v1 = fs.register_feature_view(
    feature_view = lineitem_ordinalEncodedCats_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

lineitem_ordinalEncodedCats_fv_v1.feature_df.show()

### Numeric Scaling Example

Numeric scaling is more typically applied as model-specific transformations within the Model pipeline.  Once fitted these can be stored within the Snowflake Model-Registry and used on new data when needed. Numeric Scaling techniques store global-state related to the data-set used to FIT them. e.g. min and max values for a feature.  

> **Note:**
> Snowflake Model-Registry supports transform only pipelines. You no longer need an ML model as the final step of the pipeline. If you have a common set of transformations you want to apply consistently, you can create a transformation only pipeline, fit and store it as a model in Model-Registry, and use it to transform whenever needed. 

That said, it may also be useful to maintain and persist a version of these features computed over the global state, rather than training sample. e.g. for exploratory analysis, and to support 'quick/approximate' experimentation.  Below we show an approach to deriving and maintaining these within the Feature Store via a downstream _statistical summary_ FeatureView.  As the upstream FeatureView changes the downstream _statistical summary_ FeatureView global-state values will change to reflect those. We will show an example of this approach below.

Like before for the Categorical Encoding example we will use the `LINEITEM` table to demonstrate. We first need to identify the candidate numeric columns that we want to apply the numeric scaling on.

In [None]:
# Use the schema to retrieve only the Numeric columns we want to scale
li_fv_v1_schema = lineitem_fv_v1.feature_df.schema
numeric_types = (T.DecimalType, T.DoubleType, T.FloatType, T.IntegerType) # Numeric types we want to scale
numeric_fields = [field.name for field in li_fv_v1_schema.fields if isinstance(field.datatype,  numeric_types )]
print(numeric_fields)
print(' ')

Now we need to compute global summary level statistics for the `LINEITEM` FeatureView.  There are several ways we can do this, for example we can use the`describe()` helper function that returns the 5 number statistical summary for the columns.  We can add other summary statistics to this where needed for other scaling techniques. For example, the max-absolute value for use in max absolute scaling.

We will convert the output into a format that will allow us to use it dynamically to derive the numeric-scaled features for every row in our data.  We package the statistical values up into a Snowflake object keyed by feature-name, with an object containing each of the statistics.  

In [None]:
# Compute the COUNT, MIN, MAX, MEAN & STDEV for each Numeric field
lineitem_numericstatsummary_features = lineitem_fv_v1.feature_df.describe([field for field in numeric_fields ])
# Add in the max-abs statistic for Max Absolute Scaling
lineitem_numericstatsummary_features = lineitem_numericstatsummary_features.union(lineitem_fv_v1.feature_df.select([F.lit('maxabs')]+[F.max(F.abs(F.col(field))).alias(field) for field in numeric_fields ]))
lineitem_numericstatsummary_features.show()

# Convert the statistical summary from describe into an object of objects (dict of dicts)
lineitem_numericstatsummary_features_dict = (  lineitem_numericstatsummary_features 
    .select([F.object_agg(F.upper(F.col("SUMMARY")), field).alias(field)  for field in numeric_fields]) 
    .select(F.lit(f'{lineitem_fv_v1.name}${lineitem_fv_v1.version}').alias('FEATUREVIEW$VERSION'), F.object_construct('*').alias('NUMCOL_STAT_DICT')))

lineitem_numericstatsummary_features_dict.show()

We could use this dataframe directly to join to our `LINEITEM` FeatureView data and derive the scaled features.  Another option is to create a new FeatureView using this Dataframe within our Feature Store. This will ensure the data is generally available and visible within the Feature Store and that the state values are always up to date as changes occur in the `LINEITEM` FeatureView. We have added an additional column to the data containing the FeatureView name and Version, to ensure we can identify what source FeatureView this relates to. That may be useful if we want to UNION all of these statistical summary FeatureViews together for exploratory analysis. We will also use this `FEATUREVIEW$VERSION` as the ENTITY key for the FeatureView.

##### METADATA : FEATUREVIEW ENTITY
We define the `FEATUREVIEW$VERSION` entity

In [None]:
# Metadata FEATUREVIEW Entity Definition 
featureview_version_entity = Entity(
    name="_METADATA_FEATUREVIEW$VERSION",
    join_keys=["FEATUREVIEW$VERSION"],
    desc="Metadata Entity to represent versioned FeatureViews within the FeatureStore"
    )
fs.register_entity(featureview_version_entity)

##### METADATA : LINEITEM NUMERIC STATISTICS
We use the statistical summary dataframe we created above as the source dataframe to create our FeatureView.  We will create this as a View based FeatureView to avoid the cost of recomputing it every time the upstream (`FV_LINEITEM$V1`) changes.  

In [None]:
# Metadata  LINEITEM NUMERIC AGGREGATE STATISTICS FEATURES FEATUREVIEW
# Create Lineitem numeric statistical summary FeatureView in Feature Store
lineitem_numericstatsummary_fv = FeatureView(
    name = f"_METADATA_FV_LINEITEM_NUMERICSTATS",
    entities = [featureview_version_entity],
    feature_df = lineitem_numericstatsummary_features_dict,
    desc = f"Metadata FeatureView : Order / Lineitem level - Lineitem Statistical Summary for Numeric columns"
)
# Register the Lineitem numeric statistical summary FeatureView in the schema
lineitem_numericstatsummary_fv_v1 = fs.register_feature_view(
    feature_view = lineitem_numericstatsummary_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,                            # whether function call blocks until initial data is available
    overwrite = True,                        # whether to replace existing feature view with same name/version
)

lineitem_numericstatsummary_fv_v1.feature_df.show()

We could build the expressions (formula) to scale the numerics directly within our dataframe, but given we might use these expressions a lot, we will create some new Snowflake SQL functions that encapsulate this logic.  These are effectively inline inserted into the SQL whenever they are called, so very efficient. We create a SQL function for each of our scaling functions; standard scaling, min max scaling and max absolute scaling.  Note: we have registered these as IMMUTABLE and MEMOIZABLE that should also optimise their efficiency.

In [None]:
CREATE OR REPLACE FUNCTION {{sess_db}}.{{org}}.min_max_scale(feature FLOAT, X_min float, X_max float)
  RETURNS FLOAT
  IMMUTABLE MEMOIZABLE
  AS
  $$
    div0((feature - X_min) , (X_max - X_min))
  $$
  ;

In [None]:
CREATE OR REPLACE FUNCTION {{sess_db}}.{{org}}.standard_scale(feature FLOAT, X_mean float, X_stdev float)
  RETURNS FLOAT
  IMMUTABLE MEMOIZABLE  
  AS
  $$
    div0((feature - X_mean) , X_stdev)
  $$
  ;

In [None]:
CREATE OR REPLACE FUNCTION {{sess_db}}.{{org}}.maxabs_scale(feature FLOAT, X_maxabs float)
  RETURNS FLOAT
  IMMUTABLE MEMOIZABLE 
  AS
  $$
    div0(feature , X_maxabs)
  $$
  ;

We register these new SQL functions to our Snowpark session so we can use them easily in our dataframe code.

In [None]:
min_max_scale = F.builtin(f'{sess_db}.{org}.min_max_scale')
standard_scale = F.builtin(f'{sess_db}.{org}.standard_scale')
maxabs_scale = F.builtin(f'{sess_db}.{org}.maxabs_scale')

We build up a list of expressions for each numeric columns for each of the Scaling functions, and  combine them into a single list.

This is then used to perform the scaling on the LINEITEM featureview source data.  We perform a cross (product) join from our statistical summary feature-view to ensure we have the required statistics passed into every row for computation.


In [None]:
# Build up scaling expressions
min_max_expressions = [min_max_scale(F.col(field),F.col('NUMCOL_STAT_DICT')[field]['MIN'],F.col('NUMCOL_STAT_DICT')[field]['MAX']).alias(f'{field}_MNMX_SCALED') for field in numeric_fields]
standard_scaled_expressions = [standard_scale(F.col(field),F.col('NUMCOL_STAT_DICT')[field]['MEAN'],F.col('NUMCOL_STAT_DICT')[field]['STDDEV']).alias(f'{field}_STD_SCALED') for field in numeric_fields]
max_abs_scaled_expressions = [maxabs_scale(F.col(field),F.col('NUMCOL_STAT_DICT')[field]['MAXABS']).alias(f'{field}_MAXABS_SCALED') for field in numeric_fields]
scaled_numeric_expressions = min_max_expressions + standard_scaled_expressions + max_abs_scaled_expressions                     

# Apply the scaling expressions to derive the scaled values.  We perform a cross (product) join from our statistical summary feature-view to ensure we have the required statistics passed into every row for computation.
lineitem_numeric_scaled_features = lineitem_fv_v1.feature_df.join(lineitem_numericstatsummary_fv_v1.feature_df, how = "cross") \
              .select([F.col('ORDERKEY'), F.col('LINENUMBER')] + scaled_numeric_expressions)
lineitem_numeric_scaled_features.show()

We can use this dataframe to create a new FeatureView with the scaled features which will be maintained as the LINEITEM source data changes.

In [None]:
# LINEITEM NUMERIC SCALED FEATURES FEATUREVIEW

# Create Lineitem numeric scaled FeatureView in Feature Store
lineitem_numeric_scaled_fv = FeatureView(
    name = f"FV_LINEITEM_NUM_SCALED",
    entities = [order_entity, lineitem_entity],
    feature_df = lineitem_numeric_scaled_features,
    desc = f"Order / Lineitem level - Lineitem numeric features treated with standard, min/max and maxabs scaling"
)
# Register the Lineitem numeric scaled FeatureView in the schema
lineitem_numeric_scaled_fv_v1 = fs.register_feature_view(
    feature_view = lineitem_numeric_scaled_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,                            # whether function call blocks until initial data is available
    overwrite = True,                        # whether to replace existing feature view with same name/version
)

lineitem_numeric_scaled_fv_v1.feature_df.show()

Finally, lets combine the new Scaled FeatureView with the others we created earlier to generate a training dataset containing all the LINEITEM level features we have created including the raw untreated features.

In [None]:
lineitem_order_part_supplier_customer_nation_region_spine_df.sort(keycols).show(10)

lineitem_derived_and_categorical_and_numericscaled_df_training = fs.generate_training_set(
    lineitem_order_part_supplier_customer_nation_region_spine_df,
    features = [lineitem_fv_v1, lineitem_derived_fv_v1, lineitem_onehotenc_fv_v1, lineitem_numeric_scaled_fv_v1]
)
lineitem_derived_and_categorical_and_numericscaled_df_training.sort(keycols).show()

We can see all of the columns with the additional pre-processed columns appended to the end of our dataframe.

In [None]:
lineitem_derived_and_categorical_and_numericscaled_df_training.columns

## Feature Store & snowflake.ml.preprocessing

Below we show how you can use snowflake.ml.preprocessing functions to perform feature pre-processing with the Snowflake Feature Store.

These functions provide scalable 'drop-in' replacements for the equivalent functions in scikit-learn.  They achieve this scalability by generating SQL that performs the fitting and transformation functions, as opposed to retrieving the data into python (pandas).   In this regard they are not dissimilar to the approach we have applied earlier for our Categorical and Numeric preprocessing examples from first-principle.

In [None]:
# Recreate a dataframe.  We will derive one from our Customer Feauture View
customer_training_df = fs.generate_training_set(
    customer_spine_df,
    features = [customer_fv_v1]
)
customer_training_df.sort('CUSTKEY').show()

### Fit & Transform an Ordinal Encoder 

In [None]:
from snowflake.ml.modeling.preprocessing import OrdinalEncoder
encoder = OrdinalEncoder(
    input_cols=["C_MKTSEGMENT"],
    output_cols=["ORDINAL_C_MKTSEGMENT"]
)
encoded_fit = encoder.fit(customer_fv_v1.feature_df)

If we look at our Query History in Snowsight we should see that the `fit` has executed a couple of queries similar to this to captured the category state of the 'C_MKTSEGMENT' column.

```
CREATE  OR  REPLACE  TEMPORARY  TABLE  SNOWPARK_TEMP_TABLE_RFC3HM61NN("CUSTKEY" BIGINT NOT NULL , "C_NAME" STRING(25), "C_ADDRESS" STRING(40), "C_PHONE" STRING(15), "C_ACCTBAL" NUMBER(12, 2), "C_MKTSEGMENT" STRING(10), "C_COMMENT" STRING(117), "ORDINAL_C_MKTSEGMENT" DOUBLE)    AS  SELECT  *  FROM ( SELECT "CUSTKEY", "C_NAME", "C_ADDRESS", "C_PHONE", "C_ACCTBAL", "C_MKTSEGMENT", "C_COMMENT", "ORDINAL_C_MKTSEGMENT" FROM ( SELECT  *  FROM (( SELECT "CUSTKEY" AS "CUSTKEY", "C_NAME" AS "C_NAME", "C_ADDRESS" AS "C_ADDRESS", "C_PHONE" AS "C_PHONE", "C_ACCTBAL" AS "C_ACCTBAL", "C_MKTSEGMENT" AS "C_MKTSEGMENT", "C_COMMENT" AS "C_COMMENT" FROM (
                    SELECT
                        l_0.*,
                        r_0.* EXCLUDE (CUSTKEY)
                    FROM (SELECT  *  FROM SIMON.TPCHSF1.CUSTOMER_SPINE) l_0
                    LEFT JOIN (
                        SELECT CUSTKEY, C_NAME, C_ADDRESS, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT
                        FROM SIMON.TPCHSF1_FEATURE_STORE.FV_CUSTOMER$V1
                    ) r_0
                    ON l_0.CUSTKEY = r_0.CUSTKEY
                )) AS SNOWPARK_LEFT LEFT OUTER JOIN ( SELECT "_CATEGORY" AS "_CATEGORY", "ORDINAL_C_MKTSEGMENT" AS "ORDINAL_C_MKTSEGMENT" FROM ( SELECT "_CATEGORY", "_INDEX" AS "ORDINAL_C_MKTSEGMENT" FROM (( SELECT  *  FROM SNOWPARK_TEMP_TABLE_D1FK378MCT WHERE "_CATEGORY" IS NOT NULL) UNION ( SELECT "_COLUMN_NAME", "_CATEGORY", 'NAN' :: FLOAT AS "_INDEX" FROM SNOWPARK_TEMP_TABLE_D1FK378MCT WHERE "_CATEGORY" IS NULL)) WHERE ("_COLUMN_NAME" = 'C_MKTSEGMENT'))) AS SNOWPARK_RIGHT ON EQUAL_NULL( CAST ("C_MKTSEGMENT" AS STRING), "_CATEGORY"))))
```
and
```
( SELECT 'C_MKTSEGMENT' AS "_COLUMN_NAME",  CAST ("C_MKTSEGMENT" AS STRING) AS "_CATEGORY",  CAST ((dense_rank() OVER (  ORDER BY "C_MKTSEGMENT" ASC NULLS FIRST ) - 1) AS FLOAT) AS "_INDEX" FROM ( SELECT  *  FROM ( SELECT "C_MKTSEGMENT" FROM ( SELECT "C_MKTSEGMENT" FROM (
                    SELECT
                        l_0.*,
                        r_0.* EXCLUDE (CUSTKEY)
                    FROM (SELECT  *  FROM SIMON.TPCHSF1.CUSTOMER_SPINE) l_0
                    LEFT JOIN (
                        SELECT CUSTKEY, C_NAME, C_ADDRESS, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT
                        FROM SIMON.TPCHSF1_FEATURE_STORE.FV_CUSTOMER$V1
                    ) r_0
                    ON l_0.CUSTKEY = r_0.CUSTKEY
                )) GROUP BY "C_MKTSEGMENT") WHERE "C_MKTSEGMENT" IS NOT NULL ORDER BY "C_MKTSEGMENT" ASC NULLS FIRST)) UNION ( SELECT 'C_MKTSEGMENT' AS "_COLUMN_NAME",  CAST ("C_MKTSEGMENT" AS STRING) AS "_CATEGORY", 'NAN' :: FLOAT AS "_INDEX" FROM ( SELECT "C_MKTSEGMENT" FROM ( SELECT "C_MKTSEGMENT" FROM (
                    SELECT
                        l_0.*,
                        r_0.* EXCLUDE (CUSTKEY)
                    FROM (SELECT  *  FROM SIMON.TPCHSF1.CUSTOMER_SPINE) l_0
                    LEFT JOIN (
                        SELECT CUSTKEY, C_NAME, C_ADDRESS, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT
                        FROM SIMON.TPCHSF1_FEATURE_STORE.FV_CUSTOMER$V1
                    ) r_0
                    ON l_0.CUSTKEY = r_0.CUSTKEY
                )) GROUP BY "C_MKTSEGMENT") WHERE "C_MKTSEGMENT" IS NULL)
```                

If we then transform the input dataframe with the encoder, and look at the SQL generated in Query History

In [None]:
encoded_df = encoder.transform(customer_fv_v1.feature_df)
encoded_df.show()

In Snowsight we can see that the SQL is referring to the temporary table created during the `fit`.
```
SELECT "'C_MKTSEGMENT'" AS "COLUMN_NAME", "C_MKTSEGMENT" AS "UNKNOWN_VALUE" 
FROM 
  ( SELECT 'C_MKTSEGMENT', "C_MKTSEGMENT" 
  FROM 
    ( SELECT "ORDINAL_C_MKTSEGMENT", "C_MKTSEGMENT" 
     FROM 
        ( SELECT "ORDINAL_C_MKTSEGMENT", "C_MKTSEGMENT" 
          FROM SNOWPARK_TEMP_TABLE_YXMJ72O95D) 
     GROUP BY "ORDINAL_C_MKTSEGMENT", "C_MKTSEGMENT") 
     WHERE "ORDINAL_C_MKTSEGMENT" IS NULL)
```     

This is a problem if we want to use this as a Feature View as the temproary table will only persist for the duration of the session we are working in.  We need that captured data to be persisted so we can use it in a Feature View.

`snowflake.ml.modelling.preprocessing` generates SQL that executes against Snowflake tables to ensure that the preprocessing functions can scale over very large datasets.  This is different to the way that the `sklearn.preprocessing` equivalent functions work, as they execute over Pandas data in-memory.

We can see the issue that the original code is hitting as we have a `CREATE  OR  REPLACE  TEMPORARY  TABLE` statement created to temporarily capture the information need for the `transform` step

The temporary table cannot be used within a View/Dynamic Table created via the FeatureView, as it only persists for the duration of the session it was executed in.

There are several ways that we can solve this.  We could extract an sklearn object from the fitted object ( `encoded_fit.to_sklearn()` ) , and then use that to create a Python UDF that performs the `transform` step.  

Or we could create the required SQL using Snowpark, that will likely be a bit more efficient. There are a number of attributes of our original fitted object.  Three of these we can make use of to generate the required encoded column expressions that we are after.

In [None]:
print(encoded_fit.input_cols)
print(encoded_fit.output_cols)
print(encoded_fit.categories_)

We can create a function that returns a list of Case Expressions for each of the Ordinal Encoded columns that we have fitted.

In [None]:
def ordinal_to_snowpark_expr(encoded_fit):
    case_expressions = []
    # Loop over each column we want to encode
    for input_col, output_col in zip(encoded_fit.input_cols, encoded_fit.output_cols):
        # Fetch categories and ensure conversion to list
        categories_ndarray = encoded_fit.categories_.get(input_col, [])
        categories = categories_ndarray.tolist() if isinstance(categories_ndarray, np.ndarray) else categories_ndarray
        if not categories:
            raise ValueError(f"No categories found for column: {input_col}")
        
        # Start a fresh case expression for this column
        case_expr = None
        # Build the ordinal when chain for this column
        for idx, category in enumerate(categories):
            if case_expr is None:
                case_expr = F.when(F.col(input_col) == F.lit(category), F.lit(idx))
            else:
                case_expr = case_expr.when(F.col(input_col) == F.lit(category), F.lit(idx))

        # Handle unknown categories (assign -1 if not found in the categories list)
        if case_expr is not None:
            case_expr = case_expr.otherwise(F.lit(-1))
            case_expressions.append(case_expr.alias(output_col))
        else:
            raise ValueError(f"Failed to construct case expression for column: {input_col}")

    return case_expressions

We can then use this in a dataframe, for example with `with_columns` to add the ordinal encoded columns to our original dataframe.

In [None]:
customer_ordinalEncodedCats_df = customer_fv_v1.feature_df.with_columns(encoded_fit.output_cols, ordinal_to_snowpark_expr( encoded_fit))['CUSTKEY','C_MKTSEGMENT', 'ORDINAL_C_MKTSEGMENT']
customer_ordinalEncodedCats_df.show(5)

In [None]:
# CUSTOMER ENCODED FEATUREVIEW

# Create Customer Encoded FeatureView in Feature Store
customer_ordinalEncodedCats_fv = FeatureView(
    name = f"FV_CUSTOMER_ORDINALENCODEDCATS",
    entities = [customer_entity],
    feature_df = customer_ordinalEncodedCats_df,
    desc = f"Customer categorical columns original and ordinal encoded feature view"
)

# Register the Customer Encoded  FeatureView in the schema, and add the python featureview to our list
customer_ordinalEncodedCats_fv_v1 = fs.register_feature_view(
    feature_view = customer_ordinalEncodedCats_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

In [None]:
customer_ordinalEncodedCats_fv_v1.feature_df.show()

Let's check that our function also works when we supply multiple categorical columns to be encoded.  We will use the lineitem training dataframe we created earlier.

In [None]:
lineitem_training = fs.generate_training_set(
    lineitem_spine_df,
    features = [lineitem_fv_v1]
)

lineitem_training.select(F.col("ORDERKEY"),F.col("LINENUMBER"),F.col("L_RETURNFLAG"),F.col("L_LINESTATUS"), F.col("L_SHIPINSTRUCT"), F.col("L_SHIPMODE")).sort('ORDERKEY', 'LINENUMBER').show()

In [None]:
from snowflake.ml.modeling.preprocessing import OrdinalEncoder
li_input_cols = ["L_RETURNFLAG", "L_LINESTATUS", "L_SHIPINSTRUCT","L_SHIPMODE"]
li_output_cols = [f"ORDINAL_{col}" for col in li_input_cols]
li_encoder = OrdinalEncoder(
    input_cols=li_input_cols,
    output_cols=li_output_cols
)
li_encoded_fit = li_encoder.fit(lineitem_training)
print(li_encoded_fit.input_cols)
print(li_encoded_fit.output_cols)
print(li_encoded_fit.categories_)

In [None]:
lineitem_ordinalEncodedCats_df = ( lineitem_training
    .select(F.col("ORDERKEY"),F.col("LINENUMBER"),F.col("L_RETURNFLAG"),F.col("L_LINESTATUS"), F.col("L_SHIPINSTRUCT"), F.col("L_SHIPMODE"))
    .with_columns(li_encoder.output_cols, ordinal_to_snowpark_expr( li_encoder))
                                 )
lineitem_ordinalEncodedCats_df.show()

In [None]:
# LINEITEM FEATUREVIEW

# Create Lineitem FeatureView in Feature Store
lineitem_ordinalEncodedCats_fv = FeatureView(
    name = f"FV_LINEITEM_ORDINALENCODEDCATS",
    entities = [order_entity, lineitem_entity],
    feature_df = lineitem_ordinalEncodedCats_df,
    desc = f"Lineitem categorical columns original and ordinal encoded feature view"
)

# Register the Lineitem FeatureView in the schema, and add the python featureview to our list
lineitem_ordinalEncodedCats_fv_v1 = fs.register_feature_view(
    feature_view = lineitem_ordinalEncodedCats_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

In [None]:
lineitem_ordinalEncodedCats_fv_v1.feature_df.show()

## Preprocessing on partitioned sub-samples.

Rather than pre-processing over an entire dataset, we might need to partition the dataset in some way, and pre-process the data for each partition.  For example if we are training many models, one per partition, or we know that there is a high-degree of variance across the partitions. 

There are a number of different ways of achieving this. Firstly, we can use scikit-learn or snowflake.ml to fit a preprocessing function on each sub-sample and capture the fitted model. For example we could use a partitioned table function to fit on each subset of data.

Once we have the fitted models we can make use of them via Snowpark or Snowflake.ml with model-registry. We can then use them with Snowpark/SQL to 'transform' data for each partition.

In [None]:
from snowflake.snowpark.types import PandasDataFrame
from sklearn.preprocessing import StandardScaler
import pickle
import cloudpickle
import codecs
import base64
import ast

# Create a stage for our Python function code to be stored in.  
# This is only needed if the Function is created as a permanent function.
session.sql('''create stage if not exists py_code;''').collect()

# Create functions to encode and decode a model so that it can be saved/reused
## Encode
def pickle_encode_model(model):
    return codecs.encode(cloudpickle.dumps(model), "base64").decode()
## Decode
def pickle_decode_model(model):
    return cloudpickle.loads(codecs.decode(model.encode(), "base64"))

# Create Python Class to be used in UDTF for fitting a model on sub-samples of data.
# The data will be partitioned by Snowflake using a partitioned table function call.
class pp_std_scaler:
    def end_partition(self, df: PandasDataFrame[str, list]) -> PandasDataFrame[str, str, str, str, int, int, str]:
        # Directly convert the array column to a proper 2D numpy array
        array_data = np.array(df['NUMERIC_COL_ARRAY'].tolist())

        # Fit the scaler on the array data
        scaler = StandardScaler()
        scaler.fit(array_data)

        # Encode the scaler model into a base64 pickle string
        return pd.DataFrame(data={'PARTITION_COL':df['PARTITION_COL'].iloc[0], 
                                  'SCALES' : str(scaler.scale_.tolist()),
                                  'MEAN' : str(scaler.mean_.tolist()),
                                  'VARIANCE' : str(scaler.var_.tolist()),
                                  'NROWS_IN_FIT_GROUP' : int(scaler.n_samples_seen_),
                                  'N_FEATURES_IN_ARRAY' : scaler.n_features_in_,
                                  'SS_MODEL_PKL': [pickle_encode_model(scaler)] })

# Create a Snowflake Python UDTF from the class (pp_std_scaler)
pp_standard_scaler_udtf_input_cols = ['PARTITION_COL','NUMERIC_COL_ARRAY']
pp_standard_scaler_udtf_output_cols = ['PARTITION_COL', 'SCALES', 'MEAN', 'VARIANCE','N_ROWS_IN_FIT_GROUP', 'N_FEATURES_IN_ARRAY', 'SS_MODEL_PKL']
pp_standard_scaler_udtf = session.udtf.register(
  pp_std_scaler,
  is_permanent = True,
  name = 'PP_STD_SCALER',
  replace = True,  
  stage_location = 'PY_CODE',  
  packages = ['numpy==1.23.5', 'pandas', 'scikit-learn==1.5.2'],
  input_names = pp_standard_scaler_udtf_input_cols, 
  output_schema = pp_standard_scaler_udtf_output_cols
)

#### Use the Scaler Fitting Function to Fit on subsets of data

We will use our `LINEITEM` table (FeatureView) as to demonstrate with.  We will use the `L_RETURNFLAG` field to create a fitted model per return flag category. We can see there is a somewhat uneven distribution of rows per `L_RETURNFLAG` value.

In [None]:
lineitem_fv_v1.feature_df.group_by("L_RETURNFLAG").agg(F.count("*").alias("COUNT"))

Define the Entity-Key columns, column for sub-sample (partitionkey), and the numeric features we want to work with.

In [None]:
# Entity Keys
entitykeys = ['ORDERKEY','LINENUMBER']

# Partition key - (standard-scaler model created for each partition)
partitionkey = 'L_RETURNFLAG'

# Feature columns - we can standard-scale any/all numeric features

## Identify all numeric columns programmatically
numeric_types = [T.DecimalType, T.DoubleType, T.FloatType, T.IntegerType, T.LongType, T.ShortType]
numeric_columns = [field.name for field in lineitem_fv_v1.feature_df.schema.fields if type(field.datatype) in numeric_types]

## Remove key columns from list
numeric_feature_columns = [numcol for numcol in numeric_columns if '_' in numcol] # only include Feature columns (e.g. those with L_ as the column name)

## Create a comma delimited string form of Numeric columns ASIS
nfc_string = ', '.join(numeric_feature_columns)  

## Comma delimited string form of Numeric columns including cast as float and column rename
nfc_float_string = ', '.join([f'{nfc}::FLOAT {nfc}_FLT' for nfc in numeric_feature_columns ]) 

## Comma delimited string form of Numeric columns renamed for float type
nfc_float_colname_string = ', '.join([f'{nfc}_FLT' for nfc in numeric_feature_columns ])

Here we show the SQL method of using the Python UDTF.  Rather than hardcoding the column names we are generating them programmatically and adding them to the SQL code using templating `{{ }}`

In [None]:
-- Using SQL with table function
-- # Prepare source data for input
with t as (select {{ ", ".join(entitykeys) }} , -- Key-Columns
                  {{partitionkey}},             -- Partition Column
                  {{nfc_float_string}}          -- Feature-Columns
             from {{lineitem_fv_v1.fully_qualified_name()}} 
             limit 1000 -- Limit the rows for demonstration purposes
             )
-- # Execute table-function over input data to FIT standard-scaler per L_RETURNFLAG category             
select 
  PARTITION_COL,                       -- The partitioning column - L_RETURNFLAG in our demonstration data
  parse_json(SCALES)   SCALES_ARRAY,   -- Contains an array of the scales needed to scale each input-column
  parse_json(MEAN)     MEAN_ARRAY,     -- Contains an array of the means for each input-column 
  parse_json(VARIANCE) VARIANCE_ARRAY, -- Contains an array of the Variance for each input-column 
  N_ROWS_IN_FIT_GROUP,                 -- Number of rows passed into each partition for the model fit.
  N_FEATURES_IN_ARRAY,                 -- Number of features being fitted.
  SS_MODEL_PKL                         -- Pickle encoded form of the StandardScaler model
from t
  , table(
      PP_STD_SCALER({{partitionkey}}, 
                 -- Create an array of feature columns.  
                 -- Using an array allows the function to operate over a variable number of feature-columns
                    array_construct({{nfc_float_colname_string}}))  
                 -- This is the column that the UDTF will use for partitioning the data.                      
                    over ( partition by {{partitionkey}} ) 
          );

Here is the Snowpark dataframe equivalent of the SQL statement above. 

In [None]:
# FIT - Using Snowpark with Table Function

# Prepare source data for input
in_df = (
  lineitem_fv_v1.feature_df.select(*[[F.col(ek) for ek in entitykeys] + # Entity Key columns
                                    [partitionkey] + # Partitioning Key column
                                    [F.col(nfc) for nfc in  numeric_feature_columns] + # AsIs Numeric Columns
                                    [ F.col(nfc).cast(T.FloatType()).as_(f'{nfc}_FLT') for nfc in  numeric_feature_columns] + # Numeric columns cast as Float
                                    [F.array_construct( *[F.col(f'{nfc}_FLT') for nfc in  numeric_feature_columns]).as_('NUMERIC_COL_ARRAY')] # Array of Numeric Column Float values
                                   ]) 
)
# Reference to Table function to Snowpark 
pp_standard_scaler_udtf = F.table_function("PP_STD_SCALER")
# Execute table-function over input data to FIT standard-scaler per L_RETURNFLAG category
rf_fit_ss = (
    in_df.selectExpr(f'''* exclude ({', '.join(entitykeys)})''').
    join_table_function(pp_standard_scaler_udtf(in_df[partitionkey], in_df["NUMERIC_COL_ARRAY"])
                                                      .over(partition_by=partitionkey) ) \
    .select(*[
          # [F.col(ek) for ek in entitykeys] +
           [F.col('PARTITION_COL'), 
            F.parse_json(F.col('SCALES')).as_('SCALES_ARRAY'),
            F.parse_json(F.col('MEAN')).as_('MEAN_ARRAY'),
            F.parse_json(F.col('VARIANCE')).as_('VARIANCE_ARRAY'),
            F.col('N_ROWS_IN_FIT_GROUP'),
            F.col('N_FEATURES_IN_ARRAY'),         
            F.col('SS_MODEL_PKL')] 
          ] )
)
rf_fit_tname = f'''{sess_db}.{fs_name}.LINEITEM_RETURNFLAG_STD_SCALER_FIT'''
rf_fit_ss.write.mode("overwrite").save_as_table(rf_fit_tname, table_type="")

rf_fit_sdf = session.table(rf_fit_tname)
rf_fit_sdf.show()

We can use the table containing the fitted values above to transform the source-data (`lineitem_fv_v1`) via a join and some simple transformations.

In [None]:
# Standard Scaler Expressions.
# For each feature (array by index) we apply the scaling value (obtained from array by index)
ss_expr_dict = { f'{nfc}_SS' : (F.col('NUMERIC_COL_ARRAY')[i]/F.col('SCALES_ARRAY')[i]) for i, nfc in enumerate(numeric_feature_columns) }

# TRANSFORM 
# We join the source data to the fitted-models results using the scales extracted from the model within the UDTF
rf_transform_ss =  (
     rf_fit_sdf
    .join(in_df, (rf_fit_sdf['PARTITION_COL'] == in_df[partitionkey]) )
    .with_columns(list(ss_expr_dict.keys()), list(ss_expr_dict.values())) 
    .select(*[
              [F.col(ek) for ek in entitykeys] + # Entity Key columns
              [F.col(partitionkey)] + # Partitioning Key column
              [F.col(nfc) for nfc in  numeric_feature_columns] + # AsIs Numeric Feature Columns
              [F.col(nfc).cast(T.FloatType()).as_(f'{nfc}_FLT') for nfc in  numeric_feature_columns] + # Numeric Feature Columns cast as Float
              list(ss_expr_dict.keys()) # Standard Scaled Numeric Features
              ])
)

rf_transform_ss.sort('ORDERKEY', 'LINENUMBER', 'L_RETURNFLAG').show(20)

As we have seen in the prior examples, we can use the Dataframe definition above to create a FeatureView in our Feature Store with the preprocessed values.  If the range of data in the source table is changing frequently we may want to retrain the standard-scaler FIT on the new data regularly.  This could be done via a scheduled TASK for example.  We may also need to keep a history of these values through time so that we can apply the correct values at a given point in time, which we can do by adding a timestamp column to the maintained value table. 

We can also combine the fit and transform steps into a single operation returning a dataframe. If this is used to create a `view` based FeatureView, it will always return the scaled results, as of the latest fitted state from the source FeatureView. 

In [None]:
# FIT & TRANSFORM 
# We join the source data to the fitted-models results using the scales extracted from the model within the UDTF
rf_fit_and_transform_ss =  (
     rf_fit_ss
    .join(in_df, (rf_fit_sdf['PARTITION_COL'] == in_df[partitionkey]) )
    .with_columns(list(ss_expr_dict.keys()), list(ss_expr_dict.values())) 
    .select(*[
              [F.col(ek) for ek in entitykeys] + # Entity Key columns
              [F.col(partitionkey)] + # Partitioning Key column
              [F.col(nfc) for nfc in  numeric_feature_columns] + # AsIs Numeric Feature Columns
              [F.col(nfc).cast(T.FloatType()).as_(f'{nfc}_FLT') for nfc in  numeric_feature_columns] + # Numeric Feature Columns cast as Float
              list(ss_expr_dict.keys()) # Standard Scaled Numeric Features
              ])
)

rf_fit_and_transform_ss.sort('ORDERKEY', 'LINENUMBER', 'L_RETURNFLAG').show(20)

#### Model Registry - Many Model Inference

We can make use of the trained scaler models with Snowflake Model Registrys many-model inference capabilities.



In [None]:
# Name of the schema where we will persist our generated training datasets
session.sql(f''' Create schema if not exists {sess_db}.{mr_name}''').collect()

# Log model
reg = registry.Registry(session=session, 
                        database_name=sess_db, 
                        schema_name=mr_name)

reg.show_models()

#### Define the Partitioned Model
We will now define a custom model. The partitoned custom model class inherits from `snowflake.ml.model.custom_model.CustomModel`, and inference methods are declared with the `@custom_model.partitioned_inference_api` decorator. Writing the model in this way allows it to run in parallel for each partition.

To make this flexible and generic to support varying numbers of numeric input columns we will pass the columns in as an `ARRAY`, rather than as individual columns.

In [None]:
partitionkey = 'L_RETURNFLAG'
pp_standard_scaler_udtf_input_cols = ['PARTITION_COL','NUMERIC_COL_ARRAY']
pp_standard_scaler_udtf_output_cols = ['PARTITION_COL', 'SCALES', 'MEAN', 'VARIANCE','N_ROWS_IN_FIT_GROUP', 'N_FEATURES_IN_ARRAY', 'SS_MODEL_PKL']
LReturnFlag_StandardScaler_input_cols =  ['PARTITION_COL','NUMERIC_COL_ARRAY']
LReturnFlag_StandardScaler_output_cols = ['PARTITION_COL', 'SCALES', 'MEAN', 'VARIANCE','N_ROWS_IN_FIT_GROUP', 'N_FEATURES_IN_ARRAY', 'SS_MODEL_PKL']
LReturnFlag_SS_NumFeatures = [ f'{nfc}_SS' for nfc in numeric_feature_columns ]

We will first create a many-model Custom Model where the models are persisted and maintained in a table by partitioning key (e.g. L_RETURNFLAG).  The models are joined to each row at query time, to perform the transformation.

In [None]:
# Create custom model class.
## Note the use of the  @custom_model.partitioned_inference_api to denote the model as partitioned

class LReturnFlag_StandardScaler_Table(custom_model.CustomModel):
    def __init__(self, context: Optional[custom_model.ModelContext] = None) -> None:
        super().__init__(context)
        self.partition_id = None
        self.model = None
        self._cache = {}  # Add a cache to store models

    @custom_model.partitioned_inference_api
    def predict(self, input: pd.DataFrame) -> pd.DataFrame:
        current_flag = input['L_RETURNFLAG'].iloc[0]
        
        # Check the cache first.  This should only need to run for the first row in each partition
        if current_flag != self.partition_id:
            if current_flag not in self._cache:
                # Only decode and load model if not in cache
                model_pickle = input['SS_MODEL_PICKLE'].iloc[0]
                self._cache[current_flag] = cloudpickle.loads(
                    codecs.decode(model_pickle.encode(), "base64")
                )
            self.model = self._cache[current_flag]
            self.partition_id = current_flag

        # Convert numeric arrays more efficiently using numpy
        numeric_arrays = np.array([json.loads(x) for x in input['NUMERIC_COL_ARRAY']])
        transformed_arrays = self.model.transform(numeric_arrays)

        # Create result DataFrame more efficiently
        result = pd.DataFrame({
            'ORDERKEY_OUT': input['ORDERKEY'],
            'LINENUMBER_OUT': input['LINENUMBER'],
            'NUMERIC_COL_ARRAY': input['NUMERIC_COL_ARRAY'],
            'NUMERIC_COL_ARRAY_SS': [json.dumps(row.tolist()) for row in transformed_arrays],
        })

        return result


In [None]:
# Create model signatures 
# - Note we are passing in the entity-key columns for the data so that we can join these transformed features to other featureviews if needed
input_signature= [
  FeatureSpec(dtype=DataType.INT64 , name='ORDERKEY', nullable=True),
  FeatureSpec(dtype=DataType.INT64 , name='LINENUMBER', nullable=True),    
  FeatureSpec(dtype=DataType.STRING , name=partitionkey, nullable=True),
  FeatureSpec(dtype=DataType.STRING , name='NUMERIC_COL_ARRAY', nullable=True),
  FeatureSpec(dtype=DataType.STRING,  name='SS_MODEL_PICKLE', nullable=True),
]
# - Note we are passing out the entity-key columns for the data so that we can join these transformed features to other featureviews if needed.
# - We give them a different name to avoid a result set error with ambiguous names.
output_signature = [
  FeatureSpec(dtype=DataType.INT64 , name='ORDERKEY_OUT', nullable=True),
  FeatureSpec(dtype=DataType.INT64 , name='LINENUMBER_OUT', nullable=True),     
  FeatureSpec(dtype=DataType.STRING , name='NUMERIC_COL_ARRAY_OUT', nullable=True), 
  FeatureSpec(dtype=DataType.STRING , name='NUMERIC_COL_ARRAY_SS', nullable=True),     
]

signature = ModelSignature(
    inputs=input_signature,
    outputs=output_signature,
)

LReturnFlag_StandardScaler_Table_instance = LReturnFlag_StandardScaler_Table()
LReturnFlag_StandardScaler_Table_instance

We can test the many-model class locally with a dataframe.

In [None]:
# Convert Snowpark Dataframe of source data to be standard-scaled to Pandas
in10_pdf = in_df.limit(10).to_pandas()

# Convert Snowpark Dataframe containing models to pandas
rf_fit_pdf = rf_fit_sdf.select(F.col('PARTITION_COL'), F.col('SS_MODEL_PKL').as_('SS_MODEL_PICKLE') ).to_pandas()

# Join to Pandas Dataframes together
predict_input_pdf = pd.merge(rf_fit_pdf, in10_pdf, left_on='PARTITION_COL', right_on='L_RETURNFLAG')

# Test predict method of class works on Pandas Dataframe, which is what the data gets coverted to to execute in database.
rf_transform_scaled_pdf = LReturnFlag_StandardScaler_Table_instance.predict(predict_input_pdf)

rf_transform_scaled_pdf

####  Log Model to Model Registry
Next we will log the model to Snowflake Model Registry. We will first define the signature for our prediction method, then define the registry, and finally log the model.

In [None]:
try:
    reg.delete_model("LReturnFlag_StandardScaler_Table")
except:
    print('Model does not exist in model registry')

In [None]:
start = time.time() 

options = {
    "function_type": "TABLE_FUNCTION",
    "relax_version": False
}

mv_table = reg.log_model(
    LReturnFlag_StandardScaler_Table_instance,
    model_name="LReturnFlag_StandardScaler_Table",
    version_name="V1",
    options=options,
    conda_dependencies=['numpy==1.23.5', 'pandas', 'scikit-learn==1.5.2', "cloudpickle==2.2.1"], # cloudpickle version should be greater than 2.0.0 in notebook as well
    signatures={"predict": signature}
)

end = time.time() 
print(end - start)

reg.show_models()

### Transform with Table many-model custom model.
We can now use the registered model to transform new input data.

We join the source data to the fitted-models results to attach the model to each row based on the partitioning key.


In [None]:

rf_transform_input_df =  (
     rf_fit_sdf
    .join(in_df, (rf_fit_sdf['PARTITION_COL'] == in_df[partitionkey]) )
    .select(F.col('ORDERKEY'),  F.col('LINENUMBER'), # Primary Key Columns
            F.col('L_RETURNFLAG'), # Partitioning Key
            #F.col('L_QUANTITY_FLT'), F.col('L_EXTENDEDPRICE_FLT'), F.col('L_DISCOUNT_FLT'), F.col('L_TAX_FLT'), # Source Columns
            F.col('NUMERIC_COL_ARRAY').cast(T.StringType()).as_('NUMERIC_COL_ARRAY'),  # Source Columns as Numeric Array
            F.col('SS_MODEL_PKL').as_('SS_MODEL_PICKLE'),
           )
)

rf_transform_input_df.sort('ORDERKEY', 'LINENUMBER', 'L_RETURNFLAG').show(2)

We can now use the `mv_table` model instance we created to transform our data.

In [None]:
rf_transform_input_df_10000 = rf_transform_input_df.limit(10000)

transformed_mm_result = mv_table.run(rf_transform_input_df_10000, 
                                     partition_column="L_RETURNFLAG") \
    .select('ORDERKEY_OUT', 'LINENUMBER_OUT', 'NUMERIC_COL_ARRAY_OUT', 'NUMERIC_COL_ARRAY_SS')

transformed_mm_result.show(5,100)

### MEMORY MANY-MODEL CUSTOM MODEL
We can also create a many-model Custom Model where the models are persisted in the model-registry via the custom model-class at registration time. In this case the models are stored in a Dict keyed on the partitioning key (`L_RETURNFLAG`) values, which we use to retrieve the relevant model for transformation for each partition key value. 

In [None]:
try:
    reg.delete_model("LReturnFlag_StandardScaler_Memory")
except:
    print('Model does not exist in model registry')

In [None]:
# Convert Snowpark Dataframe containing models to model-dictionary

# Create dict of models keyed on partitioning key
models = {
    row['PARTITION_COL']: pickle_decode_model(row['SS_MODEL_PICKLE'])
    for _, row in 
      rf_fit_sdf.select(F.col('PARTITION_COL'), F.col('SS_MODEL_PKL').as_('SS_MODEL_PICKLE') ).to_pandas().iterrows()
}
# create model context 
# Note: Which is used in the __init__ class definition below
mem_mc = custom_model.ModelContext(
  models=models
)

# Create model class
class LReturnFlag_StandardScaler_Memory(custom_model.CustomModel):
    def __init__(self, context: Optional[custom_model.ModelContext] = mem_mc) -> None: 
        super().__init__(context)
        self.partition_id = None
        #self.model = None
        self._cache = {}  # Add a cache to store models

    @custom_model.partitioned_inference_api
    def predict(self, input: pd.DataFrame) -> pd.DataFrame:
        current_flag = input['L_RETURNFLAG'].iloc[0]
        model = self.context.model_ref(current_flag)
 
        # Convert numeric arrays more efficiently using numpy
        transformed_arrays = model.transform(
            np.array([json.loads(x) for x in input['NUMERIC_COL_ARRAY']])
        )

        # Create result DataFrame more efficiently
        result = pd.DataFrame({
            'ORDERKEY_OUT': input['ORDERKEY'],
            'LINENUMBER_OUT': input['LINENUMBER'],
            'NUMERIC_COL_ARRAY': input['NUMERIC_COL_ARRAY'],
            'NUMERIC_COL_ARRAY_SS': [json.dumps(row.tolist()) for row in transformed_arrays],
        })

        return result

# Create model signatures 
# - Note we are passing in the entity-key columns for the data so that we can join these transformed features to other featureviews if needed
input_signature= [
  FeatureSpec(dtype=DataType.INT64 , name='ORDERKEY', nullable=True),
  FeatureSpec(dtype=DataType.INT64 , name='LINENUMBER', nullable=True),    
  FeatureSpec(dtype=DataType.STRING , name=partitionkey, nullable=True),
  FeatureSpec(dtype=DataType.STRING , name='NUMERIC_COL_ARRAY', nullable=True),
]
# - Note we are passing out the entity-key columns for the data so that we can join these transformed features to other featureviews if needed.
# - We give them a different name to avoid a result set error with ambiguous names.
output_signature = [
  FeatureSpec(dtype=DataType.INT64 , name='ORDERKEY_OUT', nullable=True),
  FeatureSpec(dtype=DataType.INT64 , name='LINENUMBER_OUT', nullable=True),     
  FeatureSpec(dtype=DataType.STRING , name='NUMERIC_COL_ARRAY_OUT', nullable=True), 
  FeatureSpec(dtype=DataType.STRING , name='NUMERIC_COL_ARRAY_SS', nullable=True),     
]

signature = ModelSignature(
    inputs=input_signature,
    outputs=output_signature,
)

# Create instance of class
LReturnFlag_StandardScaler_Memory_instance = LReturnFlag_StandardScaler_Memory()

start = time.time() 

# Define Model Options
options = {
    "function_type": "TABLE_FUNCTION",
    "relax_version": False
}

mv_memory = reg.log_model(
    LReturnFlag_StandardScaler_Memory_instance,
    model_name="LReturnFlag_StandardScaler_Memory",
    version_name="V1",
    options=options,
    conda_dependencies=['numpy==1.23.5', 'pandas', 'scikit-learn==1.5.2', "cloudpickle==2.2.1"], # cloudpickle version should be greater than 2.0.0 in notebook as well
    signatures={"predict": signature}
)

end = time.time() 
print(end - start)

reg.show_models()

We can test the class function locally on a pandas dataframe

In [None]:
# Convert Snowpark Dataframe of source data to be standard-scaled to Pandas
in_mem_10_pdf = in_df.limit(10).select(F.col('ORDERKEY'),F.col('LINENUMBER'),F.col('L_RETURNFLAG'),F.col('NUMERIC_COL_ARRAY'),).to_pandas()
in_mem_10_pdf
# Test predict method of class works on Pandas Dataframe, which is what the data gets coverted to to execute in database.
LReturnFlag_StandardScaler_Memory_instance.predict(predict_input_pdf)


### Transform
We can now apply the in-memory model transformation.

First we create some source data for transformation.  __Note:__ there is no need for the models to be joined/attached to the rows in this case as they are persisted within the model.

In [None]:
# TRANSFORM 

rf_transform_mem_input_df =  (
     in_df.select(F.col('ORDERKEY'),  F.col('LINENUMBER'), # Primary Key Columns
                  F.col('L_RETURNFLAG'), # Partitioning Key
                  F.col('NUMERIC_COL_ARRAY').cast(T.StringType()).as_('NUMERIC_COL_ARRAY'),  # Source Columns as Numeric Array
           )
)

rf_transform_mem_input_df.sort('ORDERKEY', 'LINENUMBER', 'L_RETURNFLAG').show(3)

We can now use the `mv_memory` model instance we created to transform our data.

In [None]:
rf_transform_mem_input_df_10000 = rf_transform_input_df_10000.limit(10000)

transformed_mm_mem_result = mv_memory.run(rf_transform_mem_input_df_10000, partition_column="L_RETURNFLAG") \
    .select('ORDERKEY_OUT', 'LINENUMBER_OUT', 'NUMERIC_COL_ARRAY_OUT', 'NUMERIC_COL_ARRAY_SS')

transformed_mm_mem_result.show(5,100)

## ROLLING WINDOWS

In [None]:
## Mock up some device data

session.sql(f'''
create or replace table {sess_db}.{org}.device_events  as
with 
devices as (
SELECT seq4() device_id, 
  FROM TABLE(GENERATOR(ROWCOUNT => 10000)) v ) -- adjust the literal for the number of devices needed
,
timepoints as (
SELECT ROW_NUMBER() OVER (PARTITION BY null ORDER BY null) AS hrs, 
  FROM TABLE(GENERATOR(ROWCOUNT => (24*365*2))) v )  -- adjust the literal calculation for the number of hours needed
  
-- Join two CTES with product join to create a device reading for every hour.  
SELECT device_id,
  dateadd('hr',  hrs*-1 , date_trunc('hr',current_timestamp()) ) event_ts,
  uniform(1, 1000, RANDOM(12)) device_reading
  FROM devices, timepoints v 
  ORDER BY 1, 2 desc;
  
''').collect()  


In [None]:
device_events_sdf = session.table(f'''{sess_db}.{org}.DEVICE_EVENTS''')
device_events_sdf.sort(F.col('DEVICE_ID'),F.col('EVENT_TS').desc()).limit(100).collect()

In [None]:
#fs.delete_feature_view('FV_DEVICE_RAW','V1')
#fs.delete_feature_view('FV_DEVICE_MVAVG','V1')
#fs.delete_feature_view('FV_DEVICE_MVAVG_ARRAY','V1')

In [None]:
# Device Entity Definition 
device_entity = Entity(
    name="DEVICE",
    join_keys=["DEVICE_ID"],
    desc="Device entity"
    )
fs.register_entity(device_entity)

# RAW DEVICE FEATUREVIEW

# Create Raw Device FeatureView in Feature Store
device_raw_fv = FeatureView(
    name = f"FV_DEVICE_RAW",
    entities = [device_entity],
    feature_df = device_events_sdf,
    timestamp_col = 'EVENT_TS',
    # refresh_freq = '1 hours',
    desc = f"Device raw features feature view"
)
# Register the Customer FeatureView in the schema, and add the python featureview to our list
device_raw_fv_v1 = fs.register_feature_view(
    feature_view = device_raw_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

device_raw_fv_v1.feature_df.show()

In [None]:
device_mvavg_sdf = device_raw_fv_v1.feature_df.analytics.moving_agg(
    aggs={"DEVICE_READING": ["AVG"]}, # Only using AVG here but you can add additional aggregation functions if needed
    window_sizes=[4, 8 , 16], # Using 4 here as it makes the results easier to check, but you can substitute/add any number of time periods here
    order_by=["EVENT_TS"],
    group_by=["DEVICE_ID"],
)

# DEVICE MOVING_AVERAGE FEATURES FEATUREVIEW

# Create Raw Device FeatureView in Feature Store
device_mvavg_fv = FeatureView(
    name = f"FV_DEVICE_MVAVG",
    entities = [device_entity],
    feature_df = device_mvavg_sdf,
    timestamp_col = 'EVENT_TS',
    refresh_freq = '1 hours',
    desc = f"Device moving average features feature view"
)
# Register the Customer FeatureView in the schema, and add the python featureview to our list
device_mvavg_fv_v1 = fs.register_feature_view(
    feature_view = device_mvavg_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

device_mvavg_fv_v1.feature_df.sort(F.col('DEVICE_ID'),F.col('EVENT_TS').desc()).show(10)

In [None]:
from snowflake.snowpark import Window

# Gather the four readings from four prior rows .
window4 = Window.partition_by("DEVICE_ID").order_by("EVENT_TS").rows_between(-4, -1)
# Gather then moving average readings into arrays based on window4
mvavg_array_sdf = device_mvavg_fv_v1.feature_df.select(
                    F.col("DEVICE_ID"),
                    F.col("EVENT_TS"),
                    F.array_agg("DEVICE_READING_AVG_4", False).over(window4).alias("DEVICE_READING_AVG_4_ARRAY")
                   )

# DEVICE MOVING_AVERAGE ARRAY FEATURES FEATUREVIEW

# Create Raw Device FeatureView in Feature Store
device_mvavg_array_fv = FeatureView(
    name = f"FV_DEVICE_MVAVG_ARRAY",
    entities = [device_entity],
    feature_df = mvavg_array_sdf,
    timestamp_col = 'EVENT_TS',
    refresh_freq = '1 hours', # I would use a VIEW here to compute these on the fly when needed as it will eliminate a lot of data-duplication across the arrays.
    desc = f"Device moving average features as array feature view"
)
# Register the Customer FeatureView in the schema, and add the python featureview to our list
device_mvavg_array_fv_v1 = fs.register_feature_view(
    feature_view = device_mvavg_array_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

device_mvavg_array_fv_v1.feature_df.sort(F.col('DEVICE_ID'),F.col('EVENT_TS').desc()).show(10)

# UserWarning: Your pipeline won't be incrementally refreshed due to: "This dynamic table contains a complex query. 
# Refresh mode has been set to FULL. If you wish to override this automatic choice, please re-create the dynamic table 
# and specify REFRESH_MODE=INCREMENTAL. For best results, we recommend reading 
# https://docs.snowflake.com/user-guide/dynamic-table-performance-guide 
# before setting the refresh mode to INCREMENTAL.".

## SECOND FEATURE STORE

In this section we show how a we can reference and work with objects in one feature-store from another and how we can create and retrieve objects that span more than one Feature Store.

In [None]:
# Source Data Database and Schema
src_database = 'SNOWFLAKE_SAMPLE_DATA'
src_schema = 'TPCH_SF1' # <-- Modify this if you want to test with one of the larger data scale-factors. e.g. TPCH_SF1, TPCH_SF10, TPCH_SF100, TPCH_SF1000

# Database to use to create Schemas
sess_db = 'SIMON' # The database within which we will create our Feature Store (schema), and data-source schema.

org2 = f'{org}_2'                          # Name for working Schema and used to derive Feature Store Name
fs_name_2 = f"{org2}_FEATURE_STORE"      # Feature Store Name.  This will create a Schema to contain our Feature Store database objects
mr_name_2 = f"{org2}_MODEL_REGISTRY"     # Model-Registry Name.
num_spine_rows = 10                   # Maximum number of rows to use when sampling source data for Spine Entity Keys.

In [None]:
# Name of the schema where we will persist our generated training datasets
session.sql(f''' Create schema if not exists {sess_db}.{org}''').collect()

Create a second Feature Store for demonstration purposes.

In [None]:
fs_2 =  FeatureStore(
        session=session,
        database=sess_db,
        name=fs_name_2,
        default_warehouse="SIMON_XS",
        creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

__Recreate Entities__

In [None]:
# CUSTOMER ENTITY DEFINITION 

## Either using original source-code definition for our first Feature Store
# customer_entity = Entity(
#     name="CUSTOMER",
#     join_keys=["CUSTKEY"],
#     desc="Customer entity"
#     )

## Or we can retrieve it from our first Feature Store
customer_entity = fs.get_entity('CUSTOMER')

## We can retireve it from the first feature-store and use it directly to register in the 2nd feature-store
fs_2.register_entity(fs.get_entity('CUSTOMER'))

In [None]:
fs_2.list_entities()

__Recreate FeatureViews__

There are two options.  

1) We can recreate the object as-is in the new feature store, using it's original definition.  In this case if the Feature View in the original Feature Store is a Dynamic Table, we will be duplicating compute effort and storage for the Dynamic Table.
2) We can create the FeatureView and reference the underlying database object from the original Feature View, sharing the compute and storage from the first Feature Store


In [None]:
# CUSTOMER FEATUREVIEW

## Either using original source-code definition
# Create Customer FeatureView in Feature Store
# customer_fv = FeatureView(
#     name = f"FV_CUSTOMER",
#     entities = [customer_entity],
#     feature_df = customer_nosk_sdf,
#     desc = f"Customer feature view"
# )

customer_fv_v1 = fs.get_feature_view("FV_CUSTOMER", 'V1')

## Or reconstruct from FeatureView attributes
customer_fv = FeatureView(
    name = customer_fv_v1.name,
    entities = customer_fv_v1.entities,
    feature_df = customer_fv_v1.feature_df,
    timestamp_col = customer_fv_v1.timestamp_col,
    refresh_freq= customer_fv_v1.refresh_freq,
    desc = customer_fv_v1.desc,
    warehouse =  customer_fv_v1.warehouse,
    initialize = customer_fv_v1.initialize,
    refresh_mode = customer_fv_v1.refresh_mode,
    cluster_by = customer_fv_v1.cluster_by
).attach_feature_desc(customer_fv_v1.feature_descs)
customer_fv

fs2_customer_fv_v1 = fs_2.register_feature_view(
    feature_view = customer_fv,    # feature view created above, could also use external_fv
    version = customer_fv_v1.version,
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

In [None]:
customer_fv = FeatureView(
    name = customer_fv_v1.name,
    entities = customer_fv_v1.entities,
    feature_df = session.table(customer_fv_v1.fully_qualified_name()),
    timestamp_col = customer_fv_v1.timestamp_col,
    refresh_freq= customer_fv_v1.refresh_freq,
    desc = customer_fv_v1.desc,
    warehouse =  customer_fv_v1.warehouse,
    initialize = customer_fv_v1.initialize,
    refresh_mode = customer_fv_v1.refresh_mode,
    cluster_by = customer_fv_v1.cluster_by
).attach_feature_desc(customer_fv_v1.feature_descs)
customer_fv

fs2_customer_fv_v1 = fs_2.register_feature_view(
    feature_view = customer_fv,    # feature view created above, could also use external_fv
    version = customer_fv_v1.version,
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

In [None]:
fs_2.list_feature_views()

In [None]:
print(fs2_customer_fv_v1.feature_df.queries['queries'][0])

In [None]:
customer_spine_tbl = [sess_db,org,'CUSTOMER_SPINE']

customer_sdf.select('CUSTKEY') \
    .distinct() \
    .sample(n=num_spine_rows) \
    .write.saveAsTable(customer_spine_tbl,mode = 'overwrite', table_type = 'temp')

customer_spine_df = session.table(customer_spine_tbl)
    
customer_spine_df.sort('CUSTKEY').show()

In [None]:
fs.list_feature_views()

In [None]:
fs_2.generate_training_set(customer_spine_df, 
                           [fs.get_feature_view('FV_CUSTOMER', 'V1'),    # FeatureView in Feature Store 1
                           fs_2.get_feature_view('FV_CUSTOMER', 'V1')]   # FeatureView in Feature Store 2
                          ).show()

In [None]:
show tags in database {{sess_db}};

In [None]:
fs_database = 'SIMON'
session.sql(f'''show tags in database {fs_database};''').collect()

In [None]:
def fs_get_database_entities(fs_database):
    session.sql(f'''show tags in database {fs_database};''').collect()

    result_df = session.sql(f'''
select 
       "database_name",
       "schema_name",
       split_part("name",'_', 5) "entity_name",
       "allowed_values" "entity_keys",
       "comment"
  from table(result_scan(last_query_id()))
 where startswith("name", 'SNOWML_FEATURE_STORE_ENTITY_')
                 ''')

    return result_df

In [None]:
fs_get_database_entities('SIMON').to_pandas().to_markdown()

### Using Data Driven approach with parameter tables
We can use a parameter table to store small sets of values that we want to use within FeatureViews to avoid hard-coding literal values in Dataframes/SQL use as a source to FeatureViews.

As a trivial example lets say that we need to create a FeatureView for priority order categories but we may want the flexibility to change these categories.   We will use the O_ORDERPRIORITY column in ORDERS to illustrate.

In [None]:
orders_nosk_sdf.select('O_ORDERPRIORITY').distinct().sort('O_ORDERPRIORITY').show(20)

In [None]:
session.sql(f"""
create or replace table {sess_db}.{fs_name}.fv_parameters
 (
  fvname    varchar,
  fvver     varchar,
  parameter_name varchar,
  parameter_value variant
 ) 
""").collect()

Instead of hardcoding the categories within the SQL/Snowpark dataframe that is used as the source for the FeatureView, we create a parameter table containing those literal values.  We will join to this parameter table from our FeatureView dataframe definition to apply these rather than hard-coding them as literals. This table can be updated when needed, and the FeatureView will automatically refresh to apply the changes.  We don't need to recreate the FeatureView with a new definition.

In [None]:
session.sql(f"""
Insert into  {sess_db}.{fs_name}.fv_parameters
select 
  fvname, fvver, parameter_name,  parse_json(parameter_value) parameter_value
from (values
  ('FV_ORDERS_EXPEDITED', 'V1', 'EXPEDITE_CATEGORIES', '["1-URGENT", "2-HIGH"]')
 ) as oe (fvname, fvver, parameter_name, parameter_value)
""").show()


In [None]:
fv_parameters.show()

In [None]:
f'{sess_db}.{fs_name}.fv_parameters'

In [None]:
# ORDER
orders_table = 'ORDERS'
orders_sdf = session.table(f'{src_database}.{src_schema}.{orders_table}') \
                .with_columns(['ORDERKEY','CUSTKEY'],[F.col('O_ORDERKEY'),F.col('O_CUSTKEY')]) \
                .drop('O_ORDERKEY', 'O_CUSTKEY')
orders_nosk_sdf = orders_sdf.drop('CUSTKEY')

fv_parameters = session.table(f'{sess_db}.{fs_name}.fv_parameters')

fv_parameters_oe_flat =  (fv_parameters.filter( (F.col('FVNAME') == 'FV_ORDERS_EXPEDITED') &
                                               (F.col('FVVER') == 'V1') &
                                               (F.col('PARAMETER_NAME') == 'EXPEDITE_CATEGORIES')
                                             )
                                       .flatten(input = 'PARAMETER_VALUE')
                                       .select(F.col('VALUE').as_('EXPEDITE_CATEGORY'))
                         )
fv_parameters_oe_flat.show()

In [None]:
session.sql("""
select * 
from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS 
where O_ORDERPRIORITY in ('1-URGENT', '2-HIGH')
""").show()

In [None]:
session.sql("""
with parms as (
  select PARAMETER_VALUE 
  from SIMON.TPCHSF1_FEATURE_STORE.fv_parameters
  where FVNAME = 'FV_ORDERS_EXPEDITED'
    and FVVER = 'V1'
    and PARAMETER_NAME = 'EXPEDITE_CATEGORIES'
)
select * exclude PARAMETER_VALUE
from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS, parms
where array_contains(O_ORDERPRIORITY::variant, PARAMETER_VALUE)
""").show()

In [None]:
select min(O_Orderdate), max(O_Orderdate) from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS

In [None]:
fv_parameters = session.table(f'{sess_db}.{fs_name}.fv_parameters')

fv_parameters_oe_flat =  (session.table('SIMON.TPCHSF1_FEATURE_STORE.FV_PARAMETERS')
                                 .filter( (F.col('FVNAME') == 'FV_ORDERS_EXPEDITED') &
                                               (F.col('FVVER') == 'V1') &
                                               (F.col('PARAMETER_NAME') == 'EXPEDITE_CATEGORIES')
                                             )
                                       .flatten(input = 'PARAMETER_VALUE')
                                       .select(F.col('VALUE').as_('EXPEDITE_CATEGORY'))
                         )
fv_parameters_oe_flat.show()

# ORDER
orders_table = 'ORDERS'
orders_sdf = (session.table(f'SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS') 
                .with_columns(['ORDERKEY'],[F.col('O_ORDERKEY')]) 
                .drop('O_ORDERKEY') 
             )

orders_expedited_sdf = (orders_sdf.join(fv_parameters_oe_flat, 
                                             (orders_sdf["O_ORDERPRIORITY"] == fv_parameters_oe_flat["EXPEDITE_CATEGORY"]))
                                       .drop("EXPEDITE_CATEGORY"))

orders_expedited_sdf.show()

In [None]:
orders_expedited_sdf = (orders_sdf.join(fv_parameters_oe_flat, 
                                             (orders_sdf["O_ORDERPRIORITY"] == fv_parameters_oe_flat["EXPEDITE_CATEGORY"]))
                                       .drop("EXPEDITE_CATEGORY"))

orders_expedited_sdf.show()

In [None]:
# ORDERS FEATUREVIEW
orders_features = orders_sdf.drop("ORDERKEY", "CUSTKEY").columns

# Create Order FeatureView in Feature Store
orders_expedited_fv = FeatureView(
    name = f"FV_ORDERS_EXPEDITED",
    entities = [order_entity],
    feature_df = orders_expedited_sdf,
    # refresh_freq = '1 minutes',
    desc = f"Orders Expedited feature view"
)
# Register the Order FeatureView in the schema, and add the python featureview to our list
orders_expedited_fv_v1 = fs.register_feature_view(
    feature_view = orders_expedited_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

### Data Retention Dates

We can also use the same principle to apply data-retention dates for the FeatureView which will allow us to adjust the data retention period without having to recreate the FeatureView.

In [None]:
session.sql(f"""
Insert into  {sess_db}.{fs_name}.fv_parameters
select 
  fvname, fvver, parameter_name,  parse_json(parameter_value) parameter_value
from (values
  ('FV_ORDERS_EXPEDITED', 'V1', 'ORDER_RETENTION', '[1095]') -- Three years!
 ) as oe (fvname, fvver, parameter_name, parameter_value)
""").show()

In [None]:
session.sql("""
with parms as (
  select PARAMETER_VALUE 
  from SIMON.TPCHSF1_FEATURE_STORE.fv_parameters
  where FVNAME = 'FV_ORDERS_EXPEDITED'
    and FVVER = 'V1'
    and PARAMETER_NAME = 'ORDER_RETENTION'
),
max_od as (
select max(O_ORDERDATE) maxod
from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS
)
select min(O_ORDERDATE)
from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS, parms, max_od
where dateadd('DAYS', PARAMETER_VALUE[0]::integer, O_ORDERDATE ) >= max_od.maxod
""").show()

In [None]:
drop view FV_ORDERS_EXPEDITED$V1;

### Object Wrap Features

In [None]:
# Using SQL Expression
def object_wrap_features(fq_tbl, non_feature_cols):
    
   sdf = session.table(fq_tbl)
   if isinstance(non_feature_cols, str): 
      try:  
         non_feature_cols = json.loads(non_feature_cols)
      except:
         print('Feature Columns argument is not valid json :' , non_feature_cols) 
          
   # feature_cols = [F.lit(x) for x in sdf.columns if x not in non_feature_cols]
   
   non_feature_cols_str =  ','.join(col for col in non_feature_cols)
    
   object_sdf = (sdf.with_column('F_OBJECT', 
                                  F.object_construct( F.sql_expr(f'''* exclude ({non_feature_cols_str})''')) )
                    .select(non_feature_cols + ['F_OBJECT']) )
   return object_sdf

# Using list comprehension and pure Snowpark code 
def object_wrap_features(fq_tbl, non_feature_cols):
    import snowflake.snowpark.functions as F
    sdf = session.table(fq_tbl).limit(10)
    if isinstance(non_feature_cols, str):
        try:
            non_feature_cols = json.loads(non_feature_cols)
        except:
            print('Feature Columns argument is not valid json :', non_feature_cols)

            # Get feature columns (columns not in non_feature_cols)
    feature_cols = [col for col in sdf.columns if col not in non_feature_cols]

    # Create object_construct with explicit column references
    object_sdf = (sdf.with_column('F_OBJECT',
                                  F.object_construct(*[item for name in feature_cols for item in (F.lit(name), F.col(name))]))
                  .select(non_feature_cols + ['F_OBJECT'])
                 )
    return object_sdf

In [None]:
non_feature_cols = lineitem_fv_v1.list_columns().filter(F.col('CATEGORY') != 'FEATURE').select(F.col('NAME')).to_pandas()['NAME'].to_list()
all_cols

In [None]:


fcols = list(map(str, lineitem_fv_v1.feature_names))
non_fcols = list(set(all_cols) - set(fcols))
non_fcols

In [None]:

result_df = object_wrap_features(lineitem_fv_v1.fully_qualified_name(),  json.dumps(non_fcols)) 
result_df

In [None]:
result_df = object_wrap_features('FEATURE_STORE_ONLINE_TEST_3.SOURCE_DATA.EVENT', '["EVENT_KEY", "ENTITY_KEY", "CTD_TS", "LST_UPD_TS"]' )
result_df.show(2)

In [None]:
result_df = object_wrap_features('FEATURE_STORE_ONLINE_TEST_3.SOURCE_DATA.EVENT', ["EVENT_KEY", "ENTITY_KEY", "CTD_TS", "LST_UPD_TS"] )

result_df.select('ENTITY_KEY', 'CTD_TS', 'LST_UPD_TS', 'F_OBJECT' ).show(10)

In [None]:
original_list = ["FF_00000", "FF_00001", "FF_00002", "FF_0003","FF_0004","FF_0005"]
new_list = [item for name in original_list for item in (F.lit(name), F.col(name))]
print(new_list)

In [None]:
result_df = object_wrap_features('SIMON.TPCHSF1.ORDERS', '["O_ORDERKEY", "O_ORDERDATE"]' )

result_df.to_pandas().to_markdown()

In [None]:
result_df = object_wrap_features(orders_fv_v1.fully_qualified_name(), '["EVENT_KEY", "ENTITY_KEY", "CTD_TS", "LST_UPD_TS"]' )
result_df.show(2)

### Format Dataframe generated SQL.

The SQL generated from Snowpark dataframe operations is not formatted to make it easy for a human-readability.  As Snowflake FeatureStore converts these DataFrames into database objects (Dynamic Tables or Views), passing the query through a formatting tool can make the objects easier to read/review directly.

The following function can be used to format the SQL nicely for readability, and if preferred can convert sub-queries to Common Table Expressions.

In [None]:
!pip install sqlglot

In [None]:
import sqlglot
import sqlglot.optimizer.optimizer

def formatSQL (df_in, subq_to_cte = False, print_sql = True):
  """
  Prettify the given SQL from the input dataframe to nest/indent appropriately.
  Optionally replace subqueries with CTEs.
  query_in    : The raw SQL query to be prettified
  subq_to_cte : When TRUE convert nested sub-queries to CTEs

  return: a dataframe with the formatted SQL
  """
  
  query_in = df_in.queries['queries'][0]  
    
  expression = sqlglot.parse_one(query_in)    
    
  if subq_to_cte:
      query_in = (sqlglot.optimizer.
                     optimizer.eliminate_subqueries(expression).sql() )
  pretty_sql = sqlglot.transpile(query_in, 
                                 read='snowflake', pretty=True)[0]  
    
  if print_sql:
      print(query_in)
      print(pretty_sql)      
  return session.sql(pretty_sql)

In [None]:
print(orders_nosk_sdf.queries['queries'][0])
print(' ')

formatSQL(orders_nosk_sdf)


### Monitoring Feature Quality with DMF's

Snowflake has built in data quality monitoring functionality, that can also easily be applied to a Feature Store.  Below we demonstrate how these can be used to set up and record changes in our features statistics over time.

Firstly we will create a copy of the Orders table from snowflake_sample_data, as that is a data-share, which does not support data-metric functions being applied to them directly. We also want to be able to add records to the table so we can demonstrate the DMF functions triggering and capturing statistical changes.

In [None]:
# session.sql(f''' Drop schema if exists {org}_FEATURE_STORE''').collect()

In [None]:
create table {{sess_db}}.{{org}}.ORDERS if not exists as
select *
from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS
order by O_ORDERDATE
;

First we setup a new dynamic table based Orders FeatureView that we will use for testing.

In [None]:
# FEATURE STORE
fs =  FeatureStore(
        session=session,
        database=sess_db,
        name=fs_name,
        default_warehouse="SIMON_XS",
        creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

# ORDER
orders_table = 'ORDERS'
orders_sdf = session.table(f'{sess_db}.{org}.{orders_table}') \
                .with_columns(['ORDERKEY','CUSTKEY'],[F.col('O_ORDERKEY'),F.col('O_CUSTKEY')]) \
                .drop('O_ORDERKEY', 'O_CUSTKEY')
orders_nosk_sdf = orders_sdf.drop('CUSTKEY')

# ORDER ENTITY 
order_entity = Entity(
    name="ORDERS",
    join_keys=["ORDERKEY"],
    desc="Order entity"
    )
fs.register_entity(order_entity)

# ORDERS FEATUREVIEW
orders_features = orders_sdf.drop("ORDERKEY", "CUSTKEY").columns

# Create Order FeatureView in Feature Store
orders_dmfs_fv = FeatureView(
    name = f"FV_ORDERS_DMFS",
    entities = [order_entity],
    feature_df = orders_nosk_sdf,
    refresh_freq = '1 minute',
    desc = f"Order feature view"
)
# Register the Order FeatureView in the schema, and add the python featureview to our list
orders_dmfs_fv_v1 = fs.register_feature_view(
    feature_view = orders_dmfs_fv,    # feature view created above, could also use external_fv
    version = "V1",
    block = True,               # whether function call blocks until initial data is available
    overwrite = True,           # whether to replace existing feature view with same name/version
)

We can use the data metric functions directly within SQL to return the current state.  For example:

In [None]:
SELECT SNOWFLAKE.CORE.UNIQUE_COUNT(
  SELECT
    O_ORDERSTATUS
  FROM {{orders_dmfs_fv_v1.fully_qualified_name()}}
);

We need to monitor features differently by type.  We create some simple functions that return the features by type.

In [None]:
from snowflake.snowpark.types import *

def get_numeric_columns(dataframe):
    numeric_types = (IntegerType, DoubleType, DecimalType, FloatType, LongType)
    return [field.name for field in dataframe.schema.fields 
            if isinstance(field.datatype, numeric_types)]

def get_string_columns(dataframe):
    string_types = (StringType)
    return [field.name for field in dataframe.schema.fields 
            if isinstance(field.datatype, string_types)]

def get_time_date_columns(dataframe):
    time_date_types = (TimestampType, TimeType, DateType)
    return [field.name for field in dataframe.schema.fields 
            if isinstance(field.datatype, time_date_types)]

In [None]:
# Usage
numeric_cols = get_numeric_columns(orders_dmfs_fv_v1.feature_df)
print('Numeric features :',numeric_cols)
string_columns = get_string_columns(orders_dmfs_fv_v1.feature_df)
print('Character features :',string_columns)
time_date_columns = get_time_date_columns(orders_dmfs_fv_v1.feature_df)
print('Temporal features :',time_date_columns)

Below is a function to define and create Data Metric Functions on the features of a given  Feature View.

In [None]:
def fv_create_dmfs(fv):
    if fv.refresh_freq == None:
        print(f'FeatureView {fv.fully_qualified_name} is a View, not a Dynamic Table.')
        print(f'DMFs can only be defined on Dynamic Tables.')    

    fv_df = fv.feature_df
    fv_schema = fv_df.schema
    fv_cols = fv_df.columns
    dmf_cols = fv.feature_names
    anytype_dmf = ['DUPLICATE_COUNT','NULL_COUNT', 'NULL_PERCENT', 'UNIQUE_COUNT' ]
    string_dmf = ['BLANK_COUNT','BLANK_PERCENT' ]    
    numeric_dmf = ['AVG', 'MAX', 'MIN', 'STDDEV']
    fv_df_features = fv_df.select(fv.feature_names)

    try:    
        session.sql(f'''ALTER TABLE {fv.fully_qualified_name()} SET DATA_METRIC_SCHEDULE = 'TRIGGER_ON_CHANGES';''').collect()
    except:
        print(f'Schedule TRIGGER_ON_CHANGES on  featureview {fv.fully_qualified_name()} failed')
      
    for nc in get_numeric_columns(fv_df_features):
        for dmf_fun in numeric_dmf:
            try:    

                session.sql(f''' ALTER TABLE {fv.fully_qualified_name()} ADD DATA METRIC FUNCTION SNOWFLAKE.CORE.{dmf_fun} ON ({nc});''').collect()
            except:
                print(f'Function {dmf_fun} failed column {nc} on featureview on {fv.fully_qualified_name()}')
                
    for sc in get_string_columns(fv_df_features):
        for dmf_fun in string_dmf:
            try:    
                session.sql(f''' ALTER TABLE {fv.fully_qualified_name()} ADD DATA METRIC FUNCTION SNOWFLAKE.CORE.{dmf_fun} ON ({sc});''').collect()
            except:
                print(f'Function {dmf_fun} failed on column {sc} on featureview {fv.fully_qualified_name()}')

    for ac in dmf_cols:
        for dmf_fun in anytype_dmf:
            try:                    
                session.sql(f''' ALTER TABLE {fv.fully_qualified_name()} ADD DATA METRIC FUNCTION SNOWFLAKE.CORE.{dmf_fun} ON ({ac});''').collect()
            except:
                print(f'Function {dmf_fun} failed on column {ac} on featureview {fv.fully_qualified_name()}')

Create the DMF's on our dynamic table Feature View.

In [None]:
fv_create_dmfs(orders_dmfs_fv_v1)

We will simulate modifying some data in our ORDERS source table which will then reflect changes in the dynamic table Feature View.

In [None]:
select min(O_ORDERDATE), max(O_ORDERDATE)
from {{sess_db}}.{{org}}.ORDERS

First delete some records from the table, and monitor the changes in the Dynamic Tables and DMFs with the code below.  Then insert some records and do the same. You can cycle through this a few times and you should see some changes recorded in the values from the DMF's. e.g. the averages for the numeric columns will change.

In [None]:
delete from {{sess_db}}.{{org}}.ORDERS where O_ORDERDATE >= '1998-01-01'

In [None]:
insert into {{sess_db}}.{{org}}.ORDERS 
select *
from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS
where O_ORDERDATE >= '1998-01-01'
order by O_ORDERDATE
;

#### Monitoring the dynamic table and data-metric functions

The code below provides examples of monitoring the state of the Dynamic Table and the data metric functions.

In [None]:
fs.get_refresh_history(orders_dmfs_fv_v1)

In [None]:
session.sql(f"""SELECT
  database_name,
  schema_name feature_store,
  name feature_view,  
  scheduling_state,
  target_lag_type,
  target_lag_sec,
FROM
  TABLE ( {fs._config.database}.INFORMATION_SCHEMA.DYNAMIC_TABLES() )
WHERE True
  and database_name = '{fs._config.database}'
  and schema_name = '{fs._config.schema}'
  -- add addtional filters to see a subset of the Feature Views
  and name = 'FV_ORDERS_DMFS$V1'
ORDER BY
  name
""").show()


In [None]:
session.sql(f"""SELECT
  name feature_view,
  state,
  state_code,
  state_message,
  query_id,
  data_timestamp,
  refresh_start_time,
  refresh_end_time,
  statistics,
  refresh_action 
FROM
  TABLE (
    {fs._config.database}.INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY (
      NAME_PREFIX => '{fs._config.database}.{fs._config.schema}.FV_ORDERS_DMFS$V1' --, ERROR_ONLY => TRUE
    )
  )
ORDER BY
  name,
  data_timestamp desc
""").show(10)  

Retrieve data metric function results for the dynamic table FeatureView.

In [None]:
session.sql(f'''
select 
  table_schema feature_store,
  table_name feature_view,
  argument_names[0]::varchar feature_name,
  metric_name,
  measurement_time,
  value measurement_value
from SNOWFLAKE.LOCAL.DATA_QUALITY_MONITORING_RESULTS 
where 
  table_database = '{orders_dmfs_fv_v1.database}' and 
  table_schema   = '{orders_dmfs_fv_v1.schema}' and 
  table_name     = '{orders_dmfs_fv_v1.name}${orders_dmfs_fv_v1.version}' 
order by table_name, feature_name, metric_name, change_commit_time desc  
''').to_pandas().to_markdown()

To get the data metric functions configured on a given Feature View

In [None]:
session.sql(f"""SELECT 
ref_entity_schema_name feature_store,
ref_entity_name feature_view,
parse_json(ref_arguments)[0]:"name"::varchar feature_name,
metric_name, 
metric_signature, 
metric_data_type, 
  FROM TABLE(
    INFORMATION_SCHEMA.DATA_METRIC_FUNCTION_REFERENCES(
      REF_ENTITY_NAME => '{orders_dmfs_fv_v1.database}.{orders_dmfs_fv_v1.schema}.{orders_dmfs_fv_v1.name}${orders_dmfs_fv_v1.version}',
      REF_ENTITY_DOMAIN => 'table'
    )
  )
order by 1,2,3,4  
""").to_pandas().to_markdown()

In [None]:
ALTER TABLE {{orders_fv_v1.fully_qualified_name()}} SET DATA_METRIC_SCHEDULE = '5 MINUTE';

## SUMMARY

We have seen how we can create a number of Entities in the Feature Store representing the different business-entity levels that we need to store and maintain features at.  

We can easily retrieve features from different levels of the entity-hierarchy to enrich our training and inference datasets, by adding additional entity key-columns to our Spine dataframes and adding the additional FeatureViews that we want Features from to our data retrieval function (`generate_training_set`, `generate_dataset`, `retrieve_feature_values`). 

We can also retrieve and combine features from lower entity-hierarchy levels, and perform additional processing as needed to derive features at the required level.

We have seen how we can use common categorical and numeric preprocessing techniques to derive new features, and how we can if needed created new FeatureViews to contain and maintain these.

We have demonstrated techniques for working across multiple feature stores.

We have seen how we can monitor our feature views refresh state and the data-quality of them.