In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf

def create_session_object():
   connection_parameters = {
      "account": "ijb12818.us-east-1",
      "user": "fengliplatform",
      "password": "Leaf1126snowflake+",
      "role": "sysadmin",
      "warehouse": "compute_WH",
      "database": "fengdb",
      "schema": "public"
   }
   session = Session.builder.configs(connection_parameters).create()
   print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())
   return session

session = create_session_object()

[Row(CURRENT_WAREHOUSE()='COMPUTE_WH', CURRENT_DATABASE()='FENGDB', CURRENT_SCHEMA()='PUBLIC')]


In [2]:
from sklearn import datasets
import pandas as pd

iris = datasets.load_iris()
iris_feature_pd_df = pd.DataFrame(data= iris.data, columns= iris.feature_names)
iris_target_pd_df = pd.DataFrame(data= iris.target, columns= ['species'])

In [3]:
from sklearn.model_selection import train_test_split
X = iris_feature_pd_df
y = iris_target_pd_df
X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                   test_size= 0.33, random_state= 101)

In [17]:
iris_train_pd_df = pd.concat([X_train, y_train], axis=1)
iris_test_pd_df = pd.concat([X_test, y_test], axis=1)

In [18]:
iris_train_sp_df = session.create_dataframe(iris_train_pd_df)
iris_train_sp_df.write.save_as_table(
    table_name='iris_train_table',
    mode='overwrite'
)
iris_test_sp_df = session.create_dataframe(iris_test_pd_df)
iris_test_sp_df.write.save_as_table(
    table_name='iris_test_table',
    mode='overwrite'
)

In [44]:
train_table = session.table('iris_train_table').to_pandas()
train_target_table = train_table['species']
train_feature_table = train_table.drop('species', axis= 1)

In [None]:
from sklearn.linear_model import LogisticRegression
model = LogisticRegression()
model.fit(train_feature_table, train_target_table)

In [54]:
model.predict(iris_test_pd_df.drop('species', axis= 1))

array([0, 0, 0, 2, 1, 2, 1, 1, 2, 0, 2, 0, 0, 2, 2, 1, 1, 1, 0, 2, 1, 0,
       1, 1, 1, 1, 1, 2, 0, 0, 2, 1, 2, 1, 2, 1, 1, 1, 1, 2, 0, 0, 0, 2,
       1, 0, 2, 1, 0, 1], dtype=int8)

In [15]:
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.types import StringType

def train_sproc(session:snowflake.snowpark.Session, 
    train_table_name: str, 
    target_col: str)->str:
    import joblib, sys, os
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    
    train_table = session.table(train_table_name).to_pandas()
    train_target_table = train_table[target_col]
    train_feature_table = train_table.drop(target_col, axis= 1)

    model = LogisticRegression()
    model.fit(train_feature_table, train_target_table)

    # save the model
    import joblib
    joblib.dump(model, '/tmp/predict_iris_type_model4.joblib')

    # upload into my_internal_stage in Snowflake
    session.file.put(
        "/tmp/predict_iris_type_model4.joblib", "@my_internal_stage", auto_compress=False, overwrite=True
    )
    return 'Success'


session.sproc.register(
    func = train_sproc,
    is_permanent = True,
    name = 'sf_train_sproc',
    replace = True,
    return_type = StringType(),
    stage_location = '@my_internal_stage',
    packages=[
            'snowflake-snowpark-python',
            'scikit-learn',
            'joblib']
)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x298a99e8cd0>

In [43]:
session.call('sf_train_sproc', 'iris_train_table', 'species')

'Success'

In [19]:
session.clear_imports()
session.clear_packages()

#Register above uploded model as import of UDF
session.add_import("@my_internal_stage/predict_iris_type_model4.joblib")

# map packege dependancies
# make sure local dev env has the supported versions
session.add_packages("joblib==1.1.1", "scikit-learn==1.1.3", "pandas==1.4.4")

In [57]:
from snowflake.snowpark.types import PandasDataFrame, PandasSeries

def predict_udf(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[int]:
    import joblib, sys, os
    import pandas as pd

    import_dir = sys._xoptions.get("snowflake_import_directory")
    with open(os.path.join(import_dir, 'predict_iris_type_model4.joblib'), 'rb') as iris_model_file:
        model = joblib.load(iris_model_file)

    return model.predict(df)

session.udf.register(
    func = predict_udf,
    is_permanent = True,
    name = 'sf_predict_udf',
    replace = True,
    stage_location = '@my_internal_stage'
)

<snowflake.snowpark.udf.UserDefinedFunction at 0x298aa6bba00>

In [None]:
# SQL way
session.sql(f'''
    select sf_predict_udf("sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)") as predict
    from IRIS_TEST_TABLE
''').collect()

In [58]:
prediction_pd_df = pd.DataFrame(session.sql(f'''
    select sf_predict_udf("sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)") as predict
    from IRIS_TEST_TABLE order by "sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"
''').collect())

In [59]:
prediction_pd_df

Unnamed: 0,PREDICT
0,0
1,0
2,0
3,0
4,0
5,0
6,0
7,1
8,0
9,0


In [60]:
truth_pd_df = pd.DataFrame(session.sql(f'''
    select "species" from IRIS_TEST_TABLE 
        order by "sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"
''').collect())

In [61]:
truth_pd_df['species']

0     0
1     0
2     0
3     0
4     0
5     0
6     0
7     1
8     0
9     0
10    1
11    0
12    0
13    0
14    0
15    0
16    1
17    1
18    1
19    1
20    0
21    1
22    1
23    1
24    1
25    1
26    1
27    1
28    1
29    2
30    1
31    2
32    2
33    1
34    1
35    2
36    2
37    1
38    2
39    2
40    2
41    1
42    1
43    1
44    2
45    1
46    2
47    2
48    2
49    2
Name: species, dtype: int64

In [None]:
prediction_pd_df['PREDICT']

In [62]:
from sklearn.metrics import accuracy_score
score = accuracy_score(truth_pd_df['species'], prediction_pd_df['PREDICT'])

In [63]:
score

0.98

In [None]:
# DF way ???
import snowflake.snowpark.functions as F
iris_feature_sp_df.with_column('prediction', 
    predict_udf(F.object_construct('*'))).show(20)