In [None]:
!python --version

In [None]:
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
import json
import pandas as pd

In [None]:
with open('/Users/anshikabajpai/Desktop/github_adt_project/kidney-diagnosis-snowflake/creds.json') as f:
    connection_parameters = json.load(f)

In [None]:
session = Session.builder.configs(connection_parameters).create()

In [None]:
session.sql("CREATE OR REPLACE WAREHOUSE HOL_WH WITH WAREHOUSE_SIZE='X-SMALL'").collect()
session.sql("CREATE OR REPLACE DATABASE KIDNEY_DB").collect()
session.sql("CREATE OR REPLACE SCHEMA DATA").collect() # one scpecific schema for training and inference data
session.sql("use schema PUBLIC").collect()

In [None]:
session.sql('select current_warehouse(), current_database(), current_schema(), current_user(), current_role()').collect()


In [None]:
session.sql("create or replace stage load_data").collect()
session.sql("create or replace stage models").collect()

session.sql("create or replace stage procedures").collect()
session.sql("create or replace stage custom_packages").collect()

session.sql("create or replace sequence seq_model_01 start = 1 increment = 1").collect()

In [None]:
session.file.put('/Users/anshikabajpai/Desktop/github_adt_project/kidney-diagnosis-snowflake/preprocessing/kidney_disease_train.csv', 'LOAD_DATA')

In [None]:
schema_log=T.StructType([T.StructField("date", T.TimestampType()),
                     T.StructField("class_method", T.StringType()),
                     T.StructField("model_name", T.StringType()),
                     T.StructField("data_training", T.StringType()),
                     T.StructField("class_report", T.VariantType()),
                     T.StructField("TN", T.IntegerType()),
                     T.StructField("FP", T.IntegerType()),
                     T.StructField("FN", T.IntegerType()),
                     T.StructField("TP", T.IntegerType()),
                     T.StructField("roc_auc", T.FloatType()),
                     T.StructField("avg_precision", T.FloatType()),

])

log_df = session.create_dataframe([],schema=schema_log)
log_df.write.mode('overwrite').save_as_table('Models_Eval')

schema_inference=T.StructType([T.StructField("date", T.TimestampType()),
                     T.StructField("model_name", T.StringType()),
                     T.StructField("source_table", T.StringType()),
                     T.StructField("target_table", T.StringType()),
                     T.StructField("accuracy", T.FloatType()),
                     T.StructField("precision", T.FloatType()),
                     T.StructField("recall", T.FloatType()),
                     T.StructField("f1_score", T.FloatType()),
                     T.StructField("TN", T.IntegerType()),
                     T.StructField("FP", T.IntegerType()),
                     T.StructField("FN", T.IntegerType()),
                     T.StructField("TP", T.IntegerType()),
                     T.StructField("Time_Total", T.FloatType()),
                     T.StructField("Time_Scoring", T.FloatType())
])

inference_df = session.create_dataframe([],schema=schema_inference)
inference_df.write.mode('overwrite').save_as_table('Inference_Info')

In [None]:
session.sql("create or replace view accuracy_sum_v as select DATE, model_name, class_method, data_training,\
        class_report:accuracy as accuracy, roc_auc, avg_precision from Models_Eval;").collect()


cmd = """create or replace view class_report_sumary_v as\
            select model_name,  data_training,\
            class_report:"0"."f1-score" neg_f1_score,\
            class_report:"0"."precision" neg_precision,\
            class_report:"0"."recall" neg_recall,\
            class_report:"1"."f1-score" pos_f1_score,\
            class_report:"1"."precision" pos_precision,\
            class_report:"1"."recall" pos_recall,\
            class_report:"accuracy" accuracy,\
            TN, FP, FN, TP  from Models_Eval"""
session.sql(cmd).collect()

session.sql("create or replace view data_training_v as select model_name, data_training from Models_Eval").collect()


In [None]:
schema_model = T.StructType([T.StructField("model_name", T.StringType())])

df_models_table = session.create_dataframe([
                         ['Logistic Regression'],
                         ['Naive Bayes'],
                         ['Random Forest Classifier'],
                         ['DecisionTreeClassifier'],
                         ['Support Vector Classifier'],
                        #  ['XGBoost'],
                         ['K_NeighborsClassifier']], schema=schema_model)

df_models_table.write.mode("overwrite").save_as_table("MODELS")

In [None]:
def copy_into (session: Session, file_name: str, table_name: str) -> str:
    import snowflake.snowpark.types as T

    schema_kidney = T.StructType([
        # T.StructField("id", T.DoubleType()),
        T.StructField("age", T.DoubleType()),
        T.StructField("bp", T.DoubleType()),
        T.StructField("sg", T.DoubleType()),
        T.StructField("al", T.DoubleType()),
        T.StructField("su", T.DoubleType()),
        T.StructField("rbc", T.IntegerType()),
        T.StructField("pc", T.IntegerType()),
        T.StructField("pcc", T.IntegerType()),
        T.StructField("ba", T.IntegerType()),
        T.StructField("bgr", T.DoubleType()),
        T.StructField("bu", T.DoubleType()),
        T.StructField("sc", T.DoubleType()),
        T.StructField("sod", T.DoubleType()),
        T.StructField("pot", T.DoubleType()),
        T.StructField("hemo", T.DoubleType()),
        T.StructField("pcv", T.DoubleType()),
        T.StructField("wc", T.DoubleType()),
        T.StructField("rc", T.DoubleType()),
        T.StructField("htn", T.IntegerType()),
        T.StructField("dm", T.StringType()),
        T.StructField("cad", T.StringType()),
        T.StructField("appet", T.IntegerType()),
        T.StructField("pe", T.IntegerType()),
        T.StructField("ane", T.IntegerType()),
        T.StructField("classification", T.StringType())
])

    # Load the table within the DATA schema
    load_df2 = session.read\
        .option("FIELD_DELIMITER", ',')\
        .option("SKIP_HEADER", 1)\
        .option("ON_ERROR", "CONTINUE")\
        .schema(schema_kidney).csv(file_name)\
        .copy_into_table(table_name)

    return load_df2

In [None]:
copy_into(session, "@LOAD_DATA/kidney_disease_train.csv", "TEST_TABLE")

In [None]:
# session.sql("DROP TABLE IF EXISTS DEFAULT").collect()

In [None]:
session.use_warehouse("HOL_WH")

session.sproc.register(
    func=copy_into,
    name="copy_into",
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location="@procedures",
    replace=True)

In [None]:
import cloudpickle
import os

def save_file_to_stage(session, obj, stage, name):
    model_output_dir = '/tmp'
    os.makedirs(model_output_dir, exist_ok=True)
    model_file = os.path.join(model_output_dir, name)

    # Save the object with cloudpickle
    with open(model_file, 'wb') as f:
        cloudpickle.dump(obj, f)

    # Upload to the specified Snowflake stage
    session.file.put(model_file, stage, overwrite=True, auto_compress=False)

In [None]:
# Function to deploy a trained model as a UDF, used by the training stored procedure
def create_udf(snf_session, udf_name, model, input_cols, stage_loc, py_packages):
    @F.udf(name=udf_name, is_permanent=True, stage_location=stage_loc, max_batch_size=1000,
           packages=py_packages, replace=True, session=snf_session)
    def predict(ds: T.PandasSeries[dict]) -> T.PandasSeries[float]:
        df = pd.json_normalize(ds)[input_cols]
        prediction = model.predict(df)
        return prediction

In [None]:
def log_training(session, class_method, model_name, clone_table_name, class_report, TN, FP, FN, TP,
                     auc, ave_precision):
    import datetime
    import json

    dt = datetime.datetime.now()
    dt_str = str(dt)

    cmd = "INSERT INTO models_eval (select '%s', '%s', '%s', '%s', PARSE_JSON('%s'),\
                        '%s','%s','%s','%s','%s','%s')" %\
        (dt_str, class_method, model_name, clone_table_name, json.dumps(class_report),\
         TN, FP, FN, TP, auc, ave_precision)

    session.sql(cmd).collect()

def log_inference_snp(model_name, source_table, target_table, metrics_df, time_total, time_inference):

    lg_df = metrics_df.with_columns(["date", "model_name", "source_table", "target_table", "Time_Total", "Time_Scoring"]
                                    , [F.current_timestamp(), F.lit(model_name), F.lit(source_table), F.lit(target_table), F.lit(time_total), F.lit(time_inference)])

    lg_df.write.save_as_table("INFERENCE_RUNS", mode="append", column_order="name")


In [None]:
def sf_train(session: Session, class_method: str, table_name: str,
             stage: str, model_name: str, keep_data_clone: bool) -> dict:
    """
    Train a classification model on a Snowflake table using scikit-learn .
    Works locally (no snowflake.snowpark.ml dependency).
    """

    # --- Imports ---
    from snowflake.snowpark import functions as F
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LogisticRegression
    from sklearn.naive_bayes import GaussianNB
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.svm import SVC
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.metrics import (
        confusion_matrix, classification_report,
        roc_auc_score, average_precision_score,
        precision_recall_curve, roc_curve
    )
    import pandas as pd
    import numpy as np
    import json

    # --- Helpers (assumed defined elsewhere) ---
    # save_file_to_stage(session, model_obj, stage, file_name)
    # create_udf(session, model_name, model_obj, input_cols, stage_loc, py_packages)
    # log_training(session, class_method, model_name, data_table, metrics...)
    
    # --- Step 1: Unique model name ---
    seq = str(session.sql("select seq_model_01.nextval").collect()[0][0])
    model_name = model_name + "_" + seq

    # --- Step 2: Clone table if required ---
    if keep_data_clone:
        clone_table_name = f"{table_name}_CLONE_{model_name}"
        session.sql(f"create TABLE {clone_table_name} clone {table_name}").collect()
    else:
        clone_table_name = table_name

    # --- Step 3: Load data ---
    df_sf = session.table(clone_table_name)

    # Get all column names
    cols = df_sf.columns
    if "CLASSIFICATION" not in [c.upper() for c in cols]:
        raise ValueError("Target column 'CLASSIFICATION' not found in table")

    # Identify target and feature columns
    target_col = [c for c in cols if c.upper() == "CLASSIFICATION"][0]
    feature_cols = [c for c in cols if c != target_col]

    # Convert Snowpark DataFrame to Pandas
    pdf = df_sf.to_pandas()

    # Separate features and target
    X = pdf[feature_cols]
    y = pdf[target_col].astype(int)

    # --- Step 4: Preprocessing (scaling) ---
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    X_scaled = pd.DataFrame(X_scaled, columns=feature_cols)

    # Save scaler to stage
    scaler_name = model_name + ".scaler"
    save_file_to_stage(session, scaler, stage, scaler_name)

    # --- Step 5: Choose classifier ---
    if class_method == "Logistic Regression":
        model = LogisticRegression()
    elif class_method == "Naive Bayes":
        model = GaussianNB()
    elif class_method == "Random Forest Classifier":
        model = RandomForestClassifier(n_estimators=50, random_state=42, max_depth=8)
    elif class_method == "K_NeighborsClassifier":
        model = KNeighborsClassifier(n_neighbors=5)
    elif class_method == "Support Vector Classifier":
        model = SVC(kernel='rbf', C=2, probability=True)
    elif class_method == "DecisionTreeClassifier":
        model = DecisionTreeClassifier(criterion='entropy', random_state=42, max_depth=6)
    else:
        raise ValueError(f"Unknown classifier: {class_method}")

    # --- Step 6: Train using the entire dataset ---
    model.fit(X_scaled, y)

    # --- Step 7: Predictions (on same dataset, since using full data) ---
    y_pred = model.predict(X_scaled)

    # --- Step 8: Save model and create UDF ---
    save_file_to_stage(session, model, stage, model_name)
    input_cols = X_scaled.columns
    stage_loc = 'procedures'
    py_packages = ['pandas==1.5.3', 'numpy==1.23.5', 'scikit-learn==1.2.2']
    create_udf(session, model_name, model, input_cols, stage_loc, py_packages)

    # --- Step 9: Metrics ---
    cnf_matrix = confusion_matrix(y, y_pred)
    TN, FP, FN, TP = cnf_matrix.ravel()
    class_report = classification_report(y, y_pred, output_dict=True)
    acc_score = class_report["accuracy"] * 100
    fpr, tpr, _ = roc_curve(y, y_pred)
    auc = roc_auc_score(y, y_pred)
    p, r, _ = precision_recall_curve(y, y_pred)
    ave_precision = average_precision_score(y, y_pred)

    # Log training results in Snowflake
    log_training(session, class_method, model_name, clone_table_name, class_report,
                 TN, FP, FN, TP, auc, ave_precision)

    # --- Step 10: Return summary ---
    ret_dict = {
        "Model": model_name,
        "Classifier": class_method,
        "Confusion_Matrix": cnf_matrix.tolist(),
        "Accuracy(%)": acc_score,
        "Classification_Report": class_report,
        "FPR": fpr.tolist(),
        "TPR": tpr.tolist(),
        "Precision": p.tolist(),
        "Recall": r.tolist(),
        "AUC": auc,
        "Average_Precision": ave_precision
    }

    return ret_dict


In [None]:
# %%time

ret = sf_train(session, class_method="Logistic Regression", table_name="TEST_TABLE",
         stage="@models", model_name="LR", keep_data_clone=False)
print (ret)

In [None]:
%%time

ret = sf_train(session, class_method="Naive Bayes", table_name="TEST_TABLE",
         stage="@models", model_name="NB", keep_data_clone=False)
print (ret)

In [None]:
%%time

ret = sf_train(session, class_method="Random Forest Classifier", table_name="TEST_TABLE",
         stage="@models", model_name="RF", keep_data_clone=False)
print (ret)

In [None]:
%%time

ret = sf_train(session, class_method="K_NeighborsClassifier", table_name="TEST_TABLE",
         stage="@models", model_name="KNN", keep_data_clone=False)
print (ret)

In [None]:
%%time

ret = sf_train(session, class_method="Support Vector Classifier", table_name="TEST_TABLE",
         stage="@models", model_name="SVC", keep_data_clone=False)
print (ret)

In [None]:
%%time

ret = sf_train(session, class_method="DecisionTreeClassifier", table_name="TEST_TABLE",
         stage="@models", model_name="DT", keep_data_clone=False)
print (ret)

In [None]:
mc_df = session.table("MODELS_EVAL")
mc_df.to_pandas()

In [None]:
#For the last model, display ROC curve
from matplotlib import pyplot as plt

plt.plot(ret["FPR"], ret["TPR"])
plt.xlabel('fpr')
plt.ylabel('tpr')
plt.title(f'ROC Curve\nROC AUC={ret["AUC"]:.3f}');

In [None]:
ret

In [None]:
plt.plot(ret["Recall"], ret["Precision"])
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title(f'Precision Recal Curve\nAP={ret["Average_Precision"]:.3f}');

In [None]:
session.sproc.register(
    func=sf_train,
    name="sf_train",
    packages=['snowflake-snowpark-python',
              'scikit-learn==1.2.2',
              'cloudpickle==3.0.0',
              'sqlalchemy==1.4.39',
              'tqdm==4.64.1',
              'colorlog==5.0.1','numpy==1.23.5','pandas==1.5.3']              ,

    is_permanent=True,
    stage_location="@procedures",
    replace=True)

In [None]:
session.call ("sf_train", "DecisionTreeClassifier", "TEST_TABLE", "@models", "DT", True)

In [None]:
session.sql("ls @models").collect()


In [None]:
# Function that calculates the metrics using Snowpark ie SQL
def metrics_score_snp(df, y_true, y_pred):
    return df.group_by([y_true, y_pred]).count()\
                .with_column("type", F.when((F.col(y_true) == 0) & (F.col(y_pred) == 0), "tn")\
                                    .when((F.col(y_true) == 0) & (F.col(y_pred) == 1), "fp")\
                                    .when((F.col(y_true) == 1) & (F.col(y_pred) == 0), "fn")\
                                    .when((F.col(y_true) == 1) & (F.col(y_pred) == 1), "tp"))\
                .select(["TYPE", "COUNT"]).pivot("TYPE", ['tn', 'tp', 'fn', 'fp']).sum("COUNT")\
                .select(F.col("'tp'").as_("tp"), F.col("'tn'").as_("tn"), F.col("'fn'").as_("fn"), F.col("'fp'").as_("fp"))\
                .with_columns(["accuracy", "precision", "recall"],
                             [((F.col("tp") + F.col("tn")) / (F.col("tp") + F.col("tn") + F.col("fn") + F.col("fp")))
                             , (F.col("tp") / (F.col("tp") + F.col("fp")))
                              ,(F.col("tp") / (F.col("tp") + F.col("fn")))])\
                .with_column("f1_score", (F.lit(2)*F.col("precision")*F.col("recall")) / (F.col("precision")+F.col("recall")))\
                .select(["ACCURACY","PRECISION", "RECALL","F1_SCORE", "TN", "FP", "FN", "TP"])

def sf_score(session: Session, table_name: str, table_target: str, stage: str,
        model_name: str) -> dict:

    import joblib
    import time

    t0 = time.time()

    # This is the Snowpark Dataframe pointing to the table with the records we have to score
    df = session.table(table_name)

    #### Preprocessing ######

    df_columns = df.drop(F.col('CLASSIFICATION'))
    # df_columns.show()
    df_pd = df_columns.to_pandas()  # Convert Snowpark DataFrame to Pandas

    file_to_get = stage + "/" + model_name + ".scaler"
    session.file.get(file_to_get, '/tmp/')

    scaler_name = "/tmp/" + model_name + ".scaler"
    scaler = joblib.load(scaler_name)
    # print(scaler.feature_names_in_)

    # Reorder columns to exactly match the scaler's feature order
    df_pd = df_pd[scaler.feature_names_in_]
    df_scaled = scaler.transform(df_pd)



    ################
    ## Predicting ##
    ################

    scaler_input_cols = df_pd.columns
    scaler_output_cols = scaler_input_cols

    ## Generate the column names we are goign to pass to the UDF
    key_vals = []
    for col in scaler_output_cols:
        key_vals.extend([F.lit(col), F.col(col)])

    # Taking time so we measure how much time spend predicting with UDFs
    t1 = time.time()

    df_scaled_pd = pd.DataFrame(df_scaled, columns=scaler.feature_names_in_)
    # print(df_scaled_pd)

    # Convert Pandas DataFrame to Snowpark DataFrame
    try:
        df_scaled_sp = session.create_dataframe(df_scaled_pd)
        print("Snowpark dataframe successfully created!")
    except Exception as e:
        print("Error creating Snowpark dataframe:", e)
        raise

    y_true = df.select(F.col('CLASSIFICATION'))
    y_true_pd=y_true.to_pandas()


    results_df=session.create_dataframe(pd.concat([df_scaled_pd,y_true_pd],axis=1))

    results_df = results_df.with_column("PREDICTED", F.call_udf(model_name, F.object_construct(*key_vals)))

    print("Columns in results_df:", results_df.columns)
    results_df.show(3)

    t2 = time.time()


    # get a new Snowpark Dataframe with the metrics score
    df_metrics = metrics_score_snp(results_df, y_true="CLASSIFICATION", y_pred="PREDICTED")

    results_df.write.mode("overwrite").save_as_table(table_target)

    # Get timestamps
    t3 = time.time()
    time_total = t3 - t0
    time_inference = t2 - t1

    # Write inference results into the INFERENCE_RUNS table
    log_inference_snp(model_name, table_name, table_target, df_metrics, time_total, time_inference)

    return df_metrics.collect()[0].as_dict() #accuracy, precision, recall, f1_score, TN, FP, FN, TP

In [None]:
sf_score(session, table_name="TEST_TABLE", table_target="RESULTS_TABLE", stage="@models", model_name="LR_1")


In [None]:
session.sproc.register(
    func=sf_score,
    name="sf_score",
    packages=['snowflake-snowpark-python',
              'scikit-learn==1.2.2',
              'cloudpickle==3.0.0',
              'sqlalchemy==1.4.39',
              'tqdm==4.64.1',
              'colorlog==5.0.1','numpy==1.23.5','pandas==1.5.3'],
    is_permanent=True,
    stage_location="@procedures",
    replace=True)

In [None]:
df = session.table("INFERENCE_RUNS")
df.to_pandas()

In [None]:
df = session.table("MODELS_EVAL")
df.to_pandas()

**Running the code on Test Data**

In [None]:
session.file.put('/Users/anshikabajpai/Desktop/github_adt_project/kidney-diagnosis-snowflake/preprocessing/kidney_disease_test.csv', 'LOAD_DATA')

In [None]:
print (session.call("copy_into", "@LOAD_DATA/kidney_disease_test.csv", "TEST2"))

In [None]:
session.sql("ls @models").collect()


In [None]:
sf_score(session, table_name="TEST2", table_target="RESULTS2", stage="@models", model_name="LR_1")


In [None]:

df = session.table("RESULTS2")
df.limit(2).to_pandas()
