## Snowflake Snowpark Setup

Imported necessary modules and functions from Snowflake Snowpark for data processing and machine learning tasks. Printed the Snowpark version for reference. 

[![Snowflake](https://miro.medium.com/v2/resize:fit:1400/format:webp/0*8ie8h8yM00XAN_Kx)](https://www.google.com/)


In [None]:
from snowflake.snowpark.functions import *
from snowflake.snowpark import Session
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.types import Variant
import snowflake.snowpark.functions as F
import json
from pprint import pprint
import pandas as pd
from snowflake.snowpark.types import FloatType
import numpy as np
import snowflake.ml
from matplotlib import pyplot as plt
from snowflake.snowpark.functions import udf
from snowflake.ml.feature_store.feature_store import FeatureStore, CreationMode


from snowflake.snowpark import version
print(version.VERSION)


## Get Active Snowpark Session
Retrieve and print the active Snowflake Snowpark session.

In [None]:

#Get Active Snowpark Session
from snowflake.snowpark.context import get_active_session
session = get_active_session()
print(session)
     

# PART 1. DATA & FEATURE ENGINEERING

[![Snowflake](https://miro.medium.com/v2/resize:fit:1400/format:webp/0*8ie8h8yM00XAN_Kx)](https://www.google.com/)

## Load Data: Full Source Code
Full source code: [IoTDB Tutorial](https://tutorials.demohub.dev/demo/iotdb).

1. Created a database named `IoTDB`.
2. Defined a file format named `CSV_SCHEMA` for CSV files with specific parsing settings.
3. Created a stage named `DEMOHUB_S3_INT` referencing external data from 's3://demohubpublic/data/'.
4. Created and populated a table named `sensor_data` using schema inference from CSV data in the stage.
5. Loaded the actual data from the stage into the `sensor_data` table.


In [None]:
--Load the Data: Full Source Code - https://tutorials.demohub.dev/demo/iotdb 


-- +----------------------------------------------------+
-- |             1. DATABASE AND SCHEMA SETUP          |
-- +----------------------------------------------------+

-- Create or replace the database
CREATE OR REPLACE DATABASE IoTDB;

-- Use the database
USE IoTDB;

-- +----------------------------------------------------+
-- |            2. CREATE FILE FORMAT                 |
-- +----------------------------------------------------+

-- Create a file format to specify CSV structure
CREATE OR REPLACE FILE FORMAT CSV_SCHEMA
    TYPE = CSV
    PARSE_HEADER = TRUE
    SKIP_BLANK_LINES = TRUE
    TRIM_SPACE = TRUE
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;

-- +----------------------------------------------------+
-- |              3. CREATE STAGE                      |
-- +----------------------------------------------------+

-- Create a stage to reference external data
CREATE OR REPLACE STAGE DEMOHUB_S3_INT 
    URL = 's3://demohubpublic/data/'
    DIRECTORY = ( ENABLE = true )
    COMMENT = 'DemoHub S3 datasets';

-- +----------------------------------------------------+
-- |        4. LOAD DATA USING SCHEMA INFERENCE        |
-- +----------------------------------------------------+
-- Create and populate the table using schema inference
CREATE OR REPLACE TABLE sensor_data USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
    FROM TABLE (INFER_SCHEMA(
    LOCATION=>'@demohub_s3_int/iot/sensor_data/',
    FILE_FORMAT=>'CSV_SCHEMA')));

-- +----------------------------------------------------+
-- |                5. COPY DATA INTO TABLE             |
-- +----------------------------------------------------+
-- Load the actual data from the stage into the table
COPY INTO sensor_data FROM '@demohub_s3_int/iot/sensor_data/'
FILE_FORMAT = 'CSV_SCHEMA'
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

# 👍 Examine Target Value
Retrieve the raw sensor data using a Snowpark DataFrame (not a Pandas DataFrame).

In [None]:
raw_sensor_df = session.table("sensor_data")

In [None]:
raw_sensor_df

# 🤯 Drop Uninformative Columns
Identify and remove columns with limited analytical value or variance.

1. Get all column names from the raw sensor data.
2. Specify columns to drop: "TRA", "W31", "W32".
3. Select columns to keep based on those not in the drop list.
4. Create a new DataFrame with only the retained columns.


In [None]:
# 🤯 Drop columns that have little to no analytical value/variance. 

# Get all column names
all_columns = raw_sensor_df.columns

# Columns to drop
columns_to_drop = ["TRA", "W31", "W32"]

# Select columns to keep
columns_to_keep = [col for col in all_columns if col not in columns_to_drop]

# Create a new DataFrame with only the kept columns
raw_sensor_df = raw_sensor_df.select(*columns_to_keep)

# Convert to Pandas DataFrame
Display the contents of the Snowpark DataFrame as a Pandas DataFrame.


In [None]:
# Convert the list of lists into a Pandas DataFrame
raw_sensor_df

# Visual Data Exploration
Explore the data visually by plotting histograms for all continuous variables.

1. Convert the Snowpark DataFrame to a Pandas DataFrame.
2. Plot histograms for all continuous variables with 30 bins.
3. Display the histograms with a figsize of 15x15.


In [None]:
# Lets explore the data some more - visually. Do so by plotting histograms for all continuous variables
sensor_pdf = raw_sensor_df.toPandas()
sensor_pdf.hist(bins=30, figsize=(15,15))
plt.show()

# Identify Low/No Variance Columns

Identify columns with low variance, as they have little to no impact on model prediction.

- Get column names.
- Compute variances for each column.
- Transform the result into a Pandas DataFrame, melt it, and sort by variance values.


In [None]:
#Identify Low/No Variance Columns - So, they can be dropped Low Variance columns have little to no impact on the predictive power of our model.
train_cols = raw_sensor_df.columns
variance_cols = list(map(variance,list(map(col,train_cols))))
raw_sensor_df.select(variance_cols).to_pandas().melt().sort_values('value')

# Drop Non-Useful Columns

Remove columns that are not useful for model prediction.

- Specify columns to remove: 'NF_DMD', 'PCNFR_DMD', 'P2', 'T2', 'FARB', 'EPR', 'W31', 'W32', 'UNIT_NUMBER'.
- Drop the specified columns from the dataset.


In [None]:
#Drop Non-Useful Columns
#remove_cols = ['NF_DMD', 'PCNFR_DMD', 'P2', 'T2', 'FARB', 'EPR', 'W31', 'W32', 'UNIT_NUMBER']
remove_cols = ['NF_DMD', 'PCNFR_DMD', 'P2', 'T2', 'FARB', 'EPR', 'W31', 'W32']
raw_sensor_df=raw_sensor_df.drop(remove_cols)


# Examine Target Value
Determine the maximum time in cycles (max_tic) for each unit number.

1. Group the raw sensor data by 'unit_number'.
2. Aggregate the maximum 'time_in_cycles' as 'max_tic' for each group.
3. Convert the result to a Pandas DataFrame for examination.


In [None]:
#Lets Examine Our Target Value
max_cycles=raw_sensor_df.group_by('unit_number').agg(F.max("time_in_cycles").alias("max_tic"))
max_cycles.to_pandas()

# Re-arrange Column Order
Adjust column order by moving 'unit_number' from the front to the end and add the 'max_tic' column.

1. Get the column names of the raw sensor data.
2. Pop 'unit_number' from the front.
3. Reorder columns and append 'unit_number' and 'max_tic'.


In [None]:
#Re-arrange Column orders. Pop Unit_number from front and move to end. Then add max_tic column
col_names=raw_sensor_df.columns

#Display the Column names
col_names

#Pop and Reorder
col_names.pop(0)
col_names = list(map(F.col, col_names))
col_names.append(raw_sensor_df.unit_number.alias("unit_number"))
col_names.append(max_cycles.max_tic)

#Display again
col_names

# Data Manipulation and Filtering

1. Join raw sensor data with 'max_cycles'.
2. Calculate Remaining Useful Life (RUL).


In [None]:
raw_sensor_df=raw_sensor_df.join(max_cycles, max_cycles.unit_number == raw_sensor_df.unit_number,join_type = 'inner' )\
         .select(col_names)\
         .with_column("RUL", F.col("MAX_TIC")-F.col("TIME_IN_CYCLES") )\
         .drop('MAX_TIC')\
         .filter(F.col("time_in_cycles")>0)

raw_sensor_df

# Describe Time in Cycles (TIC) and Remaining Useful Life (RUL)

Compute descriptive statistics for 'time_in_cycles' and 'RUL'.

1. Select columns 'time_in_cycles' and 'RUL'.
2. Calculate statistics (mean, stddev, min, max, etc.).
3. Display the descriptive statistics. 


In [None]:
#Describe the TIC 
raw_sensor_df.select(['time_in_cycles','RUL']).describe().show()

# Data Engineering: Deduplicate Dataset

Remove duplicate rows from the dataset.

1. Drop duplicate rows based on all columns.


In [None]:
#Do some data engineering by De-duplicating the dataset
raw_sensor_df=raw_sensor_df.drop_duplicates()

# Create Feature Store for Up-to-date Features

Create a view named 'sensor_data_features' to store up-to-date features. 

- Use the Snowflake UI to verify successful creation of the view.


In [None]:
#Create a View For Up-to-date features Check Snowflake UI to see that view has been created successfully
raw_sensor_df.createOrReplaceView('sensor_data_features')

In [None]:

fs = FeatureStore(
    session=session,
    database="IOTDB",
    name="sensor_data_fs",
    default_warehouse="DEMO_WH",
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

In [None]:
from snowflake.ml.feature_store import Entity

# Example entity definition (adjust based on your data)
sensor_entity = Entity(
    name="SENSOR_ENTITY",
    join_keys=["UNIT_NUMBER", "TIME_IN_CYCLES"] # Primary key or identifier for your sensor data
)

fs.register_entity(sensor_entity)

fs.list_entities().show()


In [None]:
from snowflake.ml.feature_store import FeatureView

sensor_data_df = session.table("IOTDB.PUBLIC.SENSOR_DATA_FEATURES")

# Create the FeatureView object
sensor_feature_view = FeatureView(
    name="SENSOR_DATA_FEATURE_VIEW",
    entities=[sensor_entity],  # Include your entity if you defined one
    feature_df=sensor_data_df,  # Select the columns to be features
    #timestamp_col="TIMESTAMP_COL",  # Assuming you have a timestamp column in your DataFrame
    refresh_freq=None, #CHANGE AS NEEDED ****
    desc="Feature view for sensor data"
)

In [None]:
registered_fv: FeatureView = fs.register_feature_view(
    feature_view=sensor_feature_view,    # feature view created above, could also use external_fv
    version="1",
    block=True         # whether function call blocks until initial data is available
)

# Create Model Stage in Snowflake

Create or replace a stage named 'model_stage' in Snowflake to store the trained model. 

- Use this stage to bring your own pre-trained model if needed.


In [None]:
# Create a Stage in Snowflake to Hold the Train Model. Note: Can use this stage to bring your own Pre-trained Model
session.sql('CREATE OR REPLACE STAGE  model_stage').show()

# PART 2. MODEL TRAINING

[![Snowflake](https://miro.medium.com/v2/resize:fit:1400/format:webp/0*8ie8h8yM00XAN_Kx)](https://www.google.com/)

# Define GBM Training Method

Define a method to train Gradient Boosting Machines (GBM) using the specified session, features table, and version number.

- The method uses scikit-learn's GradientBoostingRegressor for training.
- It splits the data into train and test sets and fits the model.
- The trained model is then logged to the Snowflake ML Model Registry.
- Returns model information and training performance metrics.


In [None]:
#Define a METHOD to do the Gradient Boosting Machines Training

def train_time_to_fail_gbm(session:Session, features_table:str, version_num:str)-> Variant:
    from sklearn.ensemble import GradientBoostingRegressor
    from sklearn.model_selection import train_test_split
    from snowflake.ml.registry import Registry  # Import from Snowflake ML library
    import os
    import datetime
    from joblib import dump
    
    df_in = session.table(features_table)

    #Use this option for reading from the feature store and using the dataset object. 
    #dataset = generate_fs_dataset(session, 'SENSOR_DATA_FEATURE_VIEW','SENSOR_DATA_FEATURE_VIEW', 'SENSOR_DATA_DATASET')
    #training_df = dataset.read.to_pandas()
   
    
    training_df = df_in.to_pandas()

    
    gbm = GradientBoostingRegressor()
    
    
    X = training_df.iloc[:,:-1].to_numpy()
    Y = training_df.iloc[:,-1:].to_numpy()
    Y = np.ravel(Y)
    

    n_estimators = 200 
    reg = GradientBoostingRegressor(random_state =0, verbose = True, max_depth =2, n_estimators = n_estimators)
    
    X_train, X_test, y_train, y_test = train_test_split(X,Y,random_state = 42, test_size = 0.1)
    fit_model = reg.fit(X_train, y_train)

    version_name = version_num
    #version_name = datetime.datetime.now().strftime("v_%Y_%m_%d_%H_%M_%S")
    model_name="time_to_fail_model"
    
    #utils_log_model(session=session, model=fit_model, model_name="time_to_fail_model", input_example=X_train[:5], description="A GBM for IoT data predicting remaining useful life, and time to fail")

    registry = Registry(session=session)
    # Log the model to the registry
    model_info = registry.log_model(
        model=fit_model,
        model_name=model_name,
        version_name=version_name,
        sample_input_data=X_train[:5],
        comment="A GBM for IoT data predicting remaining useful life, and time to fail",
        options={'relax_version': False}
    )
    return {"Model_Name": model_name,"Model_Info": model_info,"R2_Train": fit_model.score(X_train,y_train),"R2_Test":fit_model.score(X_test,y_test)}
    #return {"Model_Name": model_name,"Model_Version": version_name,"Model_Info": model_info,"R2_Train": fit_model.score(X_train,y_train),"R2_Test":fit_model.score(X_test,y_test)}

# Train Gradient Boost Model (GBM)

Train a Gradient Boosting Machine (GBM) model by initiating the Snowpark session locally in the notebook. 

- The code executes in Snowflake Snowpark environment.
- Use the specified session and features table ("SENSOR_DATA_FEATURES"). 


In [None]:
#Train Gradient Boost Model (GBM) - By Intiating the Snowpark Session Locally In Notebook. The code runs in Snowflake Snowpark
train_time_to_fail_gbm(session, "SENSOR_DATA_FEATURES", 'v1')

# Register Gradient Boost Model Training Procedure

Register a stored procedure named "train_time_to_fail_gbm_model" in Snowflake.

- The procedure trains the GBM model using the function "train_time_to_fail_gbm".
- Necessary packages are included.
- The trained model is stored in the stage location "@model_stage".  


In [None]:
#Train Gradient Boost Model On Snowflake - With A Storeproc That Can Be Called, Monitored, Governed or Orchestrated in Snowflake
session.sproc.register(func = train_time_to_fail_gbm, \
                       name = "train_time_to_fail_gbm_model",\
                       packages = ['snowflake-snowpark-python','scikit-learn','joblib','snowflake-ml'],\
                       is_permanent=True,stage_location="@model_stage",\
                       replace=True)

## Train Model Using Features Views

Pass the features view ("SENSOR_DATA_FEATURES") into the stored procedure to train the model.

- Call the stored procedure "train_time_to_fail_gbm_model".
- Provide the features view and version number ("V13") as inputs.


In [None]:
## Passing the views (features) in to the SP to train the model 
session.call('train_time_to_fail_gbm_model',"SENSOR_DATA_FEATURES", "v2")

## Training Time-to-Fail GBM Model in SQL

Trains a time-to-fail GBM model on the `SENSOR_DATA_FEATURES` table from the `iotdb.public` schema. The model version is specified as `v50`, utilizing the `train_time_to_fail_gbm_model` stored procedure.


In [None]:
--- 💪 Task: Switch to Snowsight and Invoke the StoredProc In a Worksheet

call IOTDB.SENSOR_DATA_FS.TRAIN_TIME_TO_FAIL_GBM_MODEL('SENSOR_DATA_FEATURES','v3');

# PART 3. USING MODELS FOR INFERENCE

[![Snowflake](https://miro.medium.com/v2/resize:fit:1400/format:webp/0*8ie8h8yM00XAN_Kx)](https://www.google.com/)

## Selecting Actual and Predicted Time to Fail

Retrieves actual and predicted time to fail from the `SENSOR_DATA_FEATURES` table. The `RUL` column is aliased as `ACTUAL_TIME_TO_FAIL`, while the prediction is generated using a time to fail model applied to various sensor features, resulting in the `PREDICTED_TIME_TO_FAIL` column.


In [None]:
SELECT
    UNIT_NUMBER,TIME_IN_CYCLES,
    RUL as ACTUAL_TIME_TO_FAIL,
    time_to_fail_model!predict(TIME_IN_CYCLES, SETTING_1, SETTING_2, T24, T30, T50, P15, P30, NF, NC, PS30, PHI, NRF, NRC, BPR, HTBLEED, UNIT_NUMBER):output_feature_0::INT AS PREDICTED_TIME_TO_FAIL
FROM
    SENSOR_DATA_FEATURES order by 1 asc, 2 asc;

## Creating Prediction Results View

A view named `PREDICTION_RESULTS` is created to store actual and predicted time to fail. The `ACTUAL_TIME_TO_FAIL` column is derived from the `RUL` column, while `PREDICTED_TIME_TO_FAIL` is generated using a time to fail model applied to various sensor features from the `SENSOR_DATA_FEATURES` table.


In [None]:
CREATE OR REPLACE VIEW PREDICTION_RESULTS AS
SELECT
    UNIT_NUMBER,TIME_IN_CYCLES,
    RUL as ACTUAL_TIME_TO_FAIL,
    time_to_fail_model!predict(TIME_IN_CYCLES, SETTING_1, SETTING_2, T24, T30, T50, P15, P30, NF, NC, PS30, PHI, NRF, NRC, BPR, HTBLEED, UNIT_NUMBER):output_feature_0::INT AS PREDICTED_TIME_TO_FAIL
FROM
    IOTDB.PUBLIC.SENSOR_DATA_FEATURES order by 1 asc, 2 asc;


In [None]:
results_df = session.table("PREDICTION_RESULTS").to_pandas()

## Visualizing Actual vs. Predicted Time to Fail

Plotting actual versus predicted time to fail using a scatter plot. Blue dots represent actual values, while orange dots represent predicted values. A diagonal reference line is added for comparison.


In [None]:
import matplotlib.pyplot as plt

df = results_df

# Plot actual vs predicted
plt.figure(figsize=(10, 6))
plt.scatter(df['ACTUAL_TIME_TO_FAIL'], df['PREDICTED_TIME_TO_FAIL'], alpha=0.5)
plt.plot([df['ACTUAL_TIME_TO_FAIL'].min(), df['ACTUAL_TIME_TO_FAIL'].max()],
         [df['ACTUAL_TIME_TO_FAIL'].min(), df['ACTUAL_TIME_TO_FAIL'].max()],
         'r--', lw=2)

plt.xlabel('Actual Time to Fail')
plt.ylabel('Predicted Time to Fail')
plt.title('Actual Time to Fail vs Predicted Time to Fail')
plt.grid(True)
plt.show()