In [1]:
import logging, sys, os
from snowflake.snowpark.session import Session
import json

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
log = logging.getLogger()

def create_stage(session, stage_name="insight_exp"):
    try:
        session.sql(f"create or replace stage {stage_name}").collect()
        return f"@{stage_name}"
    except Exception as ex:
        print("Error while creating snowflake stage", ex)
        log.error("Error while creating snowflake stage", ex)
        raise ex


# Method to be registered as SF Stored Procedure 
def run_experiment(session: Session, exp_data: dict) -> list:
    # Imports
    from snowflake.ml.modeling.pipeline import Pipeline
    from snowflake.ml.modeling.preprocessing import MinMaxScaler, LabelEncoder, OneHotEncoder
    from snowflake.snowpark.functions import col
    from snowflake.snowpark.types import StringType
    from snowflake.snowpark.exceptions import SnowparkSQLException
    from fosforml import register_model
    import importlib, json, os


    # variable for holding logs
    logs = []
    
    # function for accumulating logs
    def log_message(level: str, message: str):
        logs.append(f"{level}: {message}")
    
    
    # Functions used in stored proc
    def apply_data_cleansing(df):
        """
        Method handles null values in snowpark dataframe.
        :param:
        df: input dataframe
        :returns:
        df_cleaned: dataframe after null handling
        """
        # fillna - Unknown and 0
        schema_fields = df.schema.fields
        fill_values = {field.name: "UNKNOWN" if isinstance(field.datatype, StringType) else 0 for field in schema_fields}
        df_cleaned = df.fillna(fill_values)
        return df_cleaned


    def get_feature_columns(df):
        """
        Identifies the numerical and categorical features in dataset.
        Identifies features for label encoding and one hot encoding
        :param:
        df: input dataframe
        :returns:
        categorical_features: list of non-numerical feature columns
        numerical_features: list of numerical feature columns
        le_column_features: list of feature label encoder columns
        oh_column_features: list of feature one hot encoder columns
        """
        schema_fields = df.schema.fields
        features = df.columns
        features.remove(exp_data.get("target_column"))
        df_schema = session.sql(f"DESCRIBE TABLE {exp_data.get('dataset_name')}").collect()
        categorical_types = ['VARCHAR','CHAR','STRING','TEXT','BOOL']
        categorical_features = []  
        for row in df_schema:
            for typ in categorical_types:
                if typ in row['type']:
                    categorical_features.append(row['name'])
                    break
        numerical_features = list(set(features) - set(categorical_features))
        log_message("INFO",f"numerical_features:  {numerical_features}")
        log_message("INFO",f"categorical_features: {categorical_features}")
        
        
        #identify columns for labelencoding and onehotencoding   
        le_column_features = categorical_features
        oh_column_features = []
        if len(categorical_features) >= 1:
            log_message("INFO",f"{categorical_features} columns are non numeric in feature dataset, encoding required.")
            for column in categorical_features:
                if df.select(df[column]).distinct().count() < 10:
                    oh_column_features.append(column)
            log_message("INFO",f"Columns identified to be encoded with label encoder: {le_column_features}")
            log_message("INFO",f"Columns identified to be encoded with one hot encoder: {oh_column_features}")
        return categorical_features, numerical_features, le_column_features, oh_column_features


    def create_and_run_preprocessing(df, categorical_features, numerical_features, le_column_features, oh_column_features):
        """
        Based on different features column input creates preprocessing steps and run it on input dataframe
        :param:
        df: input dataframe
        categorical_features: list of non-numerical feature columns
        numerical_features: list of numerical feature columns
        le_column_features: list of feature label encoder columns
        oh_column_features: list of feature one hot encoder columns
        :returns:
        df_train: preprocessed train dataset
        df_test: preprocessed test dataset
        """
        #pipeline steps 
        log_message("INFO","Setting up preprocessing pipeline based on dataset")
        categorical_pp = {f'le_{column}':LabelEncoder(input_cols=column, output_cols=column) for column in le_column_features}
        if len(oh_column_features)>0:
            categorical_pp['oh_enc'] = OneHotEncoder(input_cols=oh_column_features, output_cols=oh_column_features, handle_unknown='ignore')
        numerical_pp = {
            'scaler': MinMaxScaler(input_cols=numerical_features, output_cols=numerical_features)
        }
        steps = [(key, categorical_pp[key]) for key in categorical_pp if categorical_pp[key]!=[]] + \
        [(key, numerical_pp[key]) for key in numerical_pp if numerical_features!=[]]
        log_message("INFO",f"Selected preprocesing steps: \n{steps}")   
            
        # Run preprocessing pipeline steps
        log_message("INFO","Running data preprocessing pipeline") 
        df = Pipeline(steps=steps).fit(df).transform(df)
        log_message("INFO",f"Transformed dataset: \n {df.show()}")
        df_train, df_test = df.random_split(weights=[0.8, 0.2], seed=0)
        for col_name in df_train.schema.names:
            new_col = col_name.replace('.','_')
            df_train = df_train.withColumnRenamed(col_name, new_col)
        for col_name in df_test.schema.names:
            new_col = col_name.replace('.','_')
            df_test = df_test.withColumnRenamed(col_name, new_col)
        return df_train, df_test


    def run_estimator(df_train, df_test, input_cols):
        """
        trains on df_train and creates model object for given algorithm/estimator.
        runs prediction function of model object on test dataset
        :param:
        df_train: input training dataframe
        df_test: input test dataframe
        input_cols: list of input feature names
        :returns:
        model: trained model object
        df_pred: output predict dataframe
        """
        for algorithm in exp_data.get("algorithms"):
            algorithm = algorithm["algorithm_name"].rsplit('.', 1)
            module = importlib.import_module(algorithm[0])
            log_message("INFO",f"----Creating {algorithm[1]} algorithm pipeline----")
            attr = getattr(module, algorithm[1])
            pipe = Pipeline(steps=[("algorithm", attr(input_cols=input_cols
                                                  , label_cols=[exp_data.get("target_column")]
                                                  , output_cols=['PREDICTIONS']))]
                   )
    
            # Fit the pipeline
            log_message("INFO",f"Running model pipeline {algorithm[1]}")
            model = pipe.fit(df_train)
     
            # Test the model
            log_message("INFO","Running prediction on model with test dataset")
            df_pred = model.predict(df_test)
            return model, df_pred, algorithm[1]


    def register_model_using_fosforml(model, df_pred):
        response = register_model(
            model_obj=model,
            session=session,
            name=exp_data.get("experiment_name"),
            snowflake_df=df_pred,
            dataset_name=_exp_data.get("dataset_name"),
            dataset_source="SnowflakeDataset",
            description=exp_data.get("description","This is an experiment generated model"),
            flavour="snowflake",
            model_type=exp_data.get("algorithm_type").lower(),
            conda_dependencies=["xgboost","scikit-learn"],
            prediction_column="PREDICTIONS",
            source="Experiment"
        )
        model_name = response.split(" ")[1]
        return model_name


    try:
        # loading experiment details ans setting up environment
        log_message("INFO",f"loading experiment details ans setting up environment")
        os.environ["RUN_ID"] = os.environ['run_id'] = exp_data.get("monitor_run_id")
        os.environ["algorithm_type"] = exp_data.get("algorithm_type")
        # os.environ["algorithm"] = exp_data.get("algorithm")
        os.environ["experiment_name"] = exp_data.get("experiment_name", "")
        os.environ["experiment_id"] = exp_data.get("experiment_id")
        os.environ["PROJECT_ID"] = exp_data.get("project_id")

        # Reading dataset
        log_message("INFO","Reading dataset features")
        data = session.table(exp_data.get("dataset_name"))

        # fillna
        log_message("INFO","Handling nulls in dataset")
        data = apply_data_cleansing(data)

        # Identify feature columns
        log_message("INFO","Identifying feature columns")
        categorical_features, numerical_features, le_column_features, oh_column_features = get_feature_columns(data)

        # Based on feature, do preprocessing
        data_train, data_test = create_and_run_preprocessing(data, categorical_features, numerical_features, le_column_features, oh_column_features)

        # Run model training and prediction
        input_cols = data_train.columns
        input_cols.remove(exp_data.get("target_column"))
        model, data_pred, estimator = run_estimator(data_train, data_test, input_cols)

        os.environ["algorithm"] = estimator

        # Register model on snowflake registry and generate metrices
        log_message("INFO", "leveraging fosforml for registering model and evaluating metrices")
        model_name = register_model_using_fosforml(model, data_pred)
        if model_name.startswith("'EXPERIMENT"):
              log_message("INFO", f"Model registration failed, {response}")
              raise Exception(f"Model registration failed, {response}")
        log_message("INFO", f"Model registered on snowflake registry by name: {model_name}")

        return [{"Execution Logs:": "\n".join(logs),
                 "experiment_name":exp_data.get("experiment_name"),
                 "algorithm_type":exp_data.get("algorithm_type"),
                 "algorithm": estimator,
                 "dataset_name": exp_data.get("dataset_name"),
                 "target_column": exp_data.get("target_column"),
                 "run_status": "SUCCESS",
                 "registry_model_name": model_name}]
    except Exception as ex:
        log_message("ERROR", f"{repr(ex)}")
        raise ex("Execution Logs: "+"\n".join(logs))
        
        
def create_sproc(session, stage, func_name="run_experiment"):
    try:
        session.custom_package_usage_config = {
                                               "enabled": True,
                                               'force_push': True
                                            }
                                            
        session.sproc.register(func=run_experiment,
                               name="run_experiment",
                               imports=[("notebooks_api/experiments/recipe/snowpark_recipe.py","notebooks_api.experiments.recipe.snowpark_recipe")],
                               packages=["fosforml"],
                               is_permanent=True,
                               stage_location=stage,
                               if_not_exists=True)
    except Exception as ex:
        print("Error while creating snowflake stored procedure", ex)
        log.error("Error while creating snowflake stored procedure", ex)
        raise ex 

    
def initiate_sproc_process(session, sproc_name="run_experiment"): #payload
    stage = create_stage(session)
    log.info("Creating stored procedure...")
    create_sproc(session, stage)
    log.info("Stored procedure has been created successfully!")
    return "Success"