## Notebook Index
1. [Feature Store ](https://app.snowflake.com/sfpscogs/rpegu_aiml/#/notebooks/ML_MODELS.DS.%2201_FeatureStore_Creation%22)
2. [Feature Reduction ](https://app.snowflake.com/sfpscogs/rpegu_aiml/#/notebooks/ML_MODELS.DS.%2202_Feature_Reduction%22)
3. [Model Training ](https://app.snowflake.com/sfpscogs/rpegu_aiml/#/notebooks/ML_MODELS.DS.%2203_Model_Training%22)
4. Model Inference & scheduling 👈



## Notebook Overview?

In this notebook, you'll learn how to perform batch inferencing using Snowflake ML by:
* Leveraging features stored in the feature store
* Loading the model signature from the Snowflake Model Registry
* Running predictions at scale within Snowflake

This approach enables you to generate model predictions for large datasets efficiently, all within the Snowflake platform. Additionally, the notebook can be scheduled to run at regular intervals, allowing for fully automated and production-grade batch scoring workflows

In [None]:
# Import python packages
import pandas as pd

# Snowpark ML
import snowflake.snowpark.functions as F
from snowflake.ml.modeling.pipeline import Pipeline 

# Snowflake Feature Store
from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,  CreationMode)
## Snwoflake Model Registry
from snowflake.ml.registry import Registry
import joblib

from snowflake.snowpark.types import DecimalType, DoubleType, StringType
from snowflake.snowpark.version import VERSION
import joblib
# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:

#database
input_database          = 'ML_MODELS'
working_database       = 'ML_MODELS'

#schema
input_schema            = 'DS'
working_schema          = 'DS'
fs_schema               = 'FEATURE_STORE'
model_registry_schema   = 'ML_REGISTRY'
stage_name = 'MODEL_OBJECT'
stage = f"@{working_database }.{working_schema }.{stage_name }"



warehouse = 'DS_W'
snowpark_opt_warehouse  = 'SNOWPARK_OPT_WH'
session.use_warehouse(warehouse )

snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION
# Current Environment Details
print('\nConnection Established with the following parameters:')
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('stage                        : {}'.format(stage))

print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

In [None]:



model_name= 'ML_XGBOOST_MODEL'
version= 'V1'
ref_mmyyyy= '052025'
dataset_version = f'V1_{ref_mmyyyy}'
print(dataset_version)


Connecting the Snowflake Feature Store, to retrive the features 

In [None]:
try:
    fs = FeatureStore(
        session=session,
        database=working_database,
        name=fs_schema,
        default_warehouse=warehouse,
        creation_mode=CreationMode.FAIL_IF_NOT_EXISTS
    )
except:
    fs = FeatureStore(
        session=session,
        database=working_database,
        name=fs_schema,
        default_warehouse=warehouse,
        creation_mode=CreationMode.CREATE_IF_NOT_EXIST
    )

In [None]:
## Retrieve the features 

fv_feature_ent1_instance  = fs.get_feature_view("FV_FEATURE_ENT_1", "V_1")
fv_feature_ent2_instance  = fs.get_feature_view("FV_FEATURE_ENT_2", "V_1")
fv_feature_ent3_instance  = fs.get_feature_view("FV_FEATURE_ENT_3", "V_1")
fv_feature_ent4_instance  = fs.get_feature_view("FV_FEATURE_ENT_4", "V_1")


fv_list = [fv_feature_ent1_instance, 
           fv_feature_ent2_instance, 
           fv_feature_ent3_instance,
           fv_feature_ent4_instance] 


universe_tbl = '.'.join([input_database, input_schema, 'DEMO_TARGETS_TBL'])
universe_sdf            = session.table(universe_tbl).filter(F.col("REF_MMYY") == ref_mmyyyy)


#get the input signature from the desired model from the model registr

reg = Registry(session, database_name = working_database,schema_name = model_registry_schema)
reg.show_models()
mv = reg.get_model(model_name).version("v1")
# the input signature of model
input_signature = mv.show_functions()[0].get("signature").inputs
input_cols = [c.name for c in input_signature]



In [None]:
--ALTER DATASET INFERENCE_DATASET drop version 'V1_052025'

In [None]:
# Create feature view slices
#ds_cols = []
## Manually add the categorical columns as the column name modified in one-hote-encoding
ds_cols = []
slice_list = []

input_cols=input_cols
#input_cols
for fv in fv_list:
    fv_cols = list(fv._feature_desc)
    #fv_cols
    slice_cols = [
    col for col in fv_cols
    if col not in ds_cols and (
        col in input_cols or col in {"CAT_1", "CAT_2", "CAT_3", "CAT_4", "CAT_5"}
    )]
    
    if len(slice_cols) > 0:
        slice_list.append(fv.slice(slice_cols))
        ds_cols += fv_cols
        
dataset = fs.generate_dataset(
    name=f"{working_database}.{working_schema}.INFERENCE_DATASET",
    spine_df=universe_sdf,
    features = slice_list,
    version=dataset_version,
    #output_type="table",
    spine_label_cols=["TARGET"],
    desc="training dataset for ml demo"
)    




In [None]:
from snowflake.ml import dataset
# Create a DataConnector from a Snowflake Dataset
ds = dataset.load_dataset(session, "INFERENCE_DATASET", dataset_version)

# Get a Snowpark DataFrame
df = ds.read.to_snowpark_dataframe()

In [None]:
df.show()

In [None]:
excluded = ['MEMBER', 'TARGET','REF_MMYY']
features = [col for col in df.columns if col not in excluded]

cat_cols = [field.name for field in df.schema.fields if isinstance(field.datatype, StringType)]
cat_cols=[col for col in cat_cols if col != excluded] 
#cat_cols
## Making sure the string column has values in UPPER Case, no space  or special character

# Apply transformations to upper and remove space
def fix_values(columnn):
    return F.upper(F.regexp_replace(F.col(columnn), '[^a-zA-Z0-9]+', '_'))


for col in cat_cols:
    df = df.with_column(col, fix_values(col))


num_cols = [field.name for field in df.schema.fields if isinstance(field.datatype, DecimalType)]
for colname in num_cols: df = df.with_column(colname,sdf[colname].cast(DoubleType()))


## CATEGORICAL COLUMNS
# Create dictionary to fill nulls with 'UNKNOWN'
fill_values_cat = {col_name: 'UNKNOWN' for col_name in cat_cols}

## NUMERICAL COLUMNS
# Create dictionary to fill nulls with 0.00
fill_values_num = {col_name: 0.00 for col_name in num_cols}

# Merge both dictionaries
fill_values = {**fill_values_cat, **fill_values_num}

# Apply fillna to the Snowpark DataFrame
df = df.fillna(fill_values)

In [None]:
df.show()

In [None]:
## get all columns with stringType= type
excluded = ['MEMBER_ID', 'TARGET','REF_MMYY','CAT_1','CAT_2','CAT_3','CAT_4','CAT_5']
num_cols = [col for col in df.columns if col not in excluded]


excluded = ['MEMBER_ID', 'TARGET','REF_MMYY']
# Get string columns
string_columns = [field.name for field in df.schema.fields if isinstance(field.datatype, StringType)]

# Filter out excluded columns
cat_cols = [col for col in string_columns if col not in excluded]


# Load the preprocessing pipeline object from stage- to do this, we download the preprocessing_pipeline.joblib.gz file to the warehouse
# where our notebook is running, and then load it using joblib.
session.file.get(f'{stage}/preprocessing_pipeline.joblib', '/tmp')
PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib'

preprocessing_pipeline = joblib.load(PIPELINE_FILE)

df=preprocessing_pipeline.fit(df).transform(df)

In [None]:
prediction_result = mv.run(df, function_name ="PREDICT")

In [None]:
result = prediction_result.select('MEMBER_ID','PREDICTED_TARGET','REF_MMYY')
result.show()
result.write.mode("append").save_as_table("TARGET_CUSTOMER_PREDICTION", table_type="transient")

In [None]:
Select * from TARGET_CUSTOMER_PREDICTION