# Inplace Machine Learning in Snowflake 
## Demonstrated using Lending Club Data

Author: Avadhoot Agasti

Snowflake has phenomenal support for executing Python, Spark, Java etc code base. The machine learning models can be trained and executed using these languages. However, day to day execution of the models require separate environment to run the models. 

This notebook showcases a mechanism to push the model inside Snowflake UDF so that we can directly run the model as part of SQL. This is very beneficial since we don't need to maintain any other environment. 

I have used Lending Club data to demonstrate this. 

## Dataset and Use Case
Lending Club Data

The Raw data has 1.95M records and 145 columns. 

The use case is as below:
Every loan has a purpose, like major purchase, small business, vacation etc. In this dataset, there are about 121K records where purpose is not defined (other). The idea is the train a classifier to predict the purpose for these 121K records.  

### Snowflake Connection and Utility Functions

In [4]:
from configparser import ConfigParser
config = ConfigParser()
config.read('config.properties')
# get config data
account = config.get('DatabaseSection', 'account')
user = config.get('DatabaseSection', 'user')
password = config.get('DatabaseSection', 'password')

In [5]:
import snowflake.connector
from pandas import DataFrame

ctx = snowflake.connector.connect(
user=user,
password=password,
account=account
)

In [6]:
def execSQL(sql):
    ctx.cursor().execute(sql)
    
def execSQLAndGetDF(sql, max_rows = 1000000):
    try:
        cur = ctx.cursor()
        cs = cur.execute(sql)
        rows = cs.fetchmany(max_rows)
    finally:
        cs.close()
    df = DataFrame(rows)
    df.columns = [col[0] for col in cur.description]
    return df

In [7]:
execSQL("USE DATABASE LENDINGCLUBDB")
execSQL("USE SCHEMA RAWSCHEMA")
execSQL("USE WAREHOUSE LoadWH")

In [135]:
purposeDF = execSQLAndGetDF("select count(*), purpose from loan_data group by 2;")
purposeDF

Unnamed: 0,COUNT(*),PURPOSE
0,20087,small_business
1,43620,major_purchase
2,1097090,debt_consolidation
3,35,educational
4,129269,home_improvement
5,454328,credit_card
6,12356,house
7,13902,vacation
8,23526,medical
9,13255,moving


### Data Preperation

We use Snowflake SQL to perform data preperation operations. This includes min_max_normalization, one-hot-encoding etc. 

In [110]:
sql = "with aggregates as (\
  select \
  min(loan_amnt) as min_loan_amnt, max(loan_amnt) as max_loan_amnt,\
  min(funded_amnt) as min_funded_amnt, max(funded_amnt) as max_funded_amnt,\
  min(annual_inc) as min_annual_inc, max(annual_inc) as max_annual_inc \
  from loan_data) \
\
select \
(loan_amnt - min_loan_amnt)/ (max_loan_amnt - min_loan_amnt) as normalized_loan_amnt, \
(funded_amnt - min_funded_amnt)/ (max_funded_amnt - min_funded_amnt) as normalized_funded_amnt, \
(annual_inc - min_annual_inc)/ (max_annual_inc - min_annual_inc) as normalized_annual_inc, \
int_rate, installment, \
\
case when purpose='small_business' then 1 \
when purpose='major_purchase' then 2 \
when purpose='debt_consolidation' then 3 \
when purpose='educational' then 4 \
when purpose='credit_card' then 5 \
when purpose='house' then 6 \
when purpose='home_improvement' then 7 \
when purpose='moving' then 8 \
when purpose='medical' then 9 \
when purpose='vacation' then 10 \
when purpose='car' then 11 \
when purpose='renewable_energy' then 12 \
end as purpose, \
\
iff(loan_status='Does not meet the credit policy. Status:Fully Paid',1,0) as status_dcp_fp,\
iff(loan_status='Does not meet the credit policy. Status:Charged Off',1,0) as status_dcp_coff,\
iff(loan_status='Late (16-30 days)',1,0) as status_late_30,\
iff(loan_status='Charged Off',1,0) as status_coff,\
iff(loan_status='In Grace Period',1,0) as status_igp,\
iff(loan_status='Current',1,0) as status_current,\
iff(loan_status='Late (31-120 days)',1,0) as status_late_120,\
iff(loan_status='Fully Paid',1,0) as status_fp,\
iff(loan_status='Default',1,0) as status_def,\
\
iff(term='36 months',1,0) as term_med,\
iff(term='60 months',1,0) as term_long,\
\
iff(grade='A',1,0) as grade_A, \
iff(grade='B',1,0) as grade_B, \
iff(grade='C',1,0) as grade_C, \
iff(grade='D',1,0) as grade_D, \
iff(grade='E',1,0) as grade_E, \
iff(grade='F',1,0) as grade_F, \
iff(grade='G',1,0) as grade_G,\
\
iff(home_ownership='OWN',1,0) as ho_own,\
iff(home_ownership='MORTGAGE',1,0) as ho_mtg,\
iff(home_ownership='NONE',1,0) as ho_none,\
iff(home_ownership='RENT',1,0) as ho_rent,\
iff(home_ownership='ANY',1,0) as ho_any,\
iff(home_ownership='OTHER',1,0) as ho_other \
\
from aggregates, loan_data \
where purpose != 'other' \
"

sql = sql + " limit 100;"

df = execSQLAndGetDF(sql)


In [111]:
#Separate label
train_labels = df[["PURPOSE"]].copy()
train_data = df.drop(["PURPOSE"], axis=1) # drop labels for training set

In [139]:
train_data.iloc[:5]

Unnamed: 0,NORMALIZED_LOAN_AMNT,NORMALIZED_FUNDED_AMNT,NORMALIZED_ANNUAL_INC,INT_RATE,INSTALLMENT,STATUS_DCP_FP,STATUS_DCP_COFF,STATUS_LATE_30,STATUS_COFF,STATUS_IGP,...,GRADE_D,GRADE_E,GRADE_F,GRADE_G,HO_OWN,HO_MTG,HO_NONE,HO_RENT,HO_ANY,HO_OTHER
0,0.358974,0.358974,0.00071,11.49,494.57,0,0,0,1,0,...,0,0,0,0,0,0,0,1,0,0
1,0.128205,0.128205,0.000646,9.49,192.17,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
2,0.179487,0.179487,0.000601,9.49,256.23,0,0,0,1,0,...,0,0,0,0,0,1,0,0,0,0
3,0.589744,0.589744,0.001055,8.59,493.44,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
4,0.064103,0.064103,0.000483,13.49,118.76,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0


In [138]:
train_labels.iloc[:5]

Unnamed: 0,PURPOSE
0,3
1,5
2,3
3,3
4,3


### Model Training in Scikit Learn

The data scientists like to use SKLearn to train the models. Feature engineering, model creation, scoring etc are complicated activities and we want to continue leverage the best tool to do this. So we won't disturb that process. 

In [114]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold

from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier

In [115]:
clf_dtree = DecisionTreeClassifier()
clf_dtree.fit(train_data, train_labels)
cross_val_score(clf_dtree, train_data, train_labels, cv=10)



array([0.4       , 0.5       , 0.41666667, 0.6       , 0.5       ,
       0.77777778, 0.44444444, 0.25      , 0.25      , 0.42857143])

The accuracy is not that great but thats not the purpose of this notebook. We can spend more time improving the accuracy. 

In [116]:
predict_df = clf_dtree.predict(train_data)

In [117]:
predict_df

array([ 3,  5,  3,  3,  3,  3,  3,  3,  3,  3,  5,  3,  3,  3,  3,  3,  3,
        3,  2,  3,  6,  5,  3,  3,  5,  3,  3,  3,  7,  3,  7,  5,  5,  3,
        3,  3,  3,  3,  7,  3,  3,  2, 10,  3,  3,  5,  3,  5,  3,  3,  3,
        3,  7,  7,  3,  3,  3, 11,  7,  3,  7, 11,  3,  3,  5,  3,  3,  3,
        2,  3,  3,  3,  3,  3,  3, 11,  3,  3,  5,  7,  3,  3,  3,  3,  3,
        3,  5,  3,  7,  5,  3,  5,  3,  3,  5,  3,  3,  5,  3,  9])

### Porting the model to Snowflake
We use multiple steps to port the model to Snowflake


Step 1: Create equivalent model in JavaScript. We use porter project to achieve this. 
https://github.com/nok/sklearn-porter/blob/release/1.0.0/examples/basics/index.pct.ipynb


Step 2: Decorate the generated Javascript to create Snowflake UDF from it


Step 3: Start using UDF to predict the class

In [118]:
from sklearn_porter import Porter

In [119]:
porter = Porter(clf_dtree, language='JS')
ml_class = porter.export(embed_data=True, template='attached')
print(ml_class)

var DecisionTreeClassifier = function() {

    var findMax = function(nums) {
        var index = 0;
        for (var i = 0; i < nums.length; i++) {
            index = nums[i] > nums[index] ? i : index;
        }
        return index;
    };

    this.predict = function(features) {
        var classes = new Array(8);
            
        if (features[0] <= 0.053846001625061035) {
            if (features[8] <= 0.5) {
                classes[0] = 0; 
                classes[1] = 0; 
                classes[2] = 0; 
                classes[3] = 0; 
                classes[4] = 0; 
                classes[5] = 0; 
                classes[6] = 0; 
                classes[7] = 3; 
            } else {
                classes[0] = 1; 
                classes[1] = 0; 
                classes[2] = 0; 
                classes[3] = 0; 
                classes[4] = 0; 
                classes[5] = 0; 
                classes[6] = 0; 
                classes[7] = 0; 
            }
        } else 

### Decorate the generated code to create UDF

In [120]:
udf_name = "lending_club_purpose_classifier"
udf_header = "CREATE OR REPLACE FUNCTION " + udf_name + " (INPUT_ARRAY VARCHAR) \
              returns varchar \
              language javascript \
              as $$ "

udf_footer_js = " \
                else \
                { \
                        var features = INPUT_ARRAY.split(',').map(function(item) { \
                                                return parseFloat(item, 10); \
                                                }); \
                        var clf = new DecisionTreeClassifier(); \
                        var prediction = clf.predict(features); \
                }"

udf_footer =  "$$;"
#udf = udf_header + ml_class + udf_footer;

In [121]:
f = open('udf_code.js', 'w+')
#ml_class + udf_footer_js
f.write(ml_class+udf_footer_js)
f.close()

In [122]:
import requests

In [123]:
js_file = 'udf_code.js'
with open(js_file, 'r') as c:
    js = c.read()

# Pack it, ship it    
payload = {'input': js}
url = 'https://javascript-minifier.com/raw'
print("Requesting mini-me of {}. . .".format(c.name))
r = requests.post(url, payload)

# Write out minified version
minified = js_file.rstrip('.js')+'.min.js'
with open(minified, 'w') as m:
    m.write(r.text)

print("Minification complete. See {}".format(m.name))

Requesting mini-me of udf_code.js. . .
Minification complete. See udf_code.min.js


In [128]:
print(minified)

udf_code.min.js


### Small Change in the Generated Code
At this step, I made a small change in the generated code to add following line 

return (predictions);

In [129]:
with open(minified, 'r') as c:
    udf_code_min = c.read()

udf = udf_header + udf_code_min + udf_footer;


In [130]:
execSQL(udf)

### Predict using UDF

In [140]:
predict_sql = "with aggregates as (\
  select \
  min(loan_amnt) as min_loan_amnt, max(loan_amnt) as max_loan_amnt,\
  min(funded_amnt) as min_funded_amnt, max(funded_amnt) as max_funded_amnt,\
  min(annual_inc) as min_annual_inc, max(annual_inc) as max_annual_inc \
  from loan_data) \
\
select \
lending_club_purpose_classifier( \
cast((loan_amnt - min_loan_amnt)/ (max_loan_amnt - min_loan_amnt) as string) || ',' || \
cast((funded_amnt - min_funded_amnt)/ (max_funded_amnt - min_funded_amnt) as string) || ',' || \
cast((annual_inc - min_annual_inc)/ (max_annual_inc - min_annual_inc) as string) || ',' || \
cast(int_rate as string) || ',' || \
cast(installment as string) || ',' || \
iff(loan_status='Does not meet the credit policy. Status:Fully Paid','1','0') || ',' || \
iff(loan_status='Does not meet the credit policy. Status:Charged Off','1','0') || ',' || \
iff(loan_status='Late (16-30 days)','1','0') || ',' || \
iff(loan_status='Charged Off','1','0') || ',' || \
iff(loan_status='In Grace Period','1','0') || ',' || \
iff(loan_status='Current','1','0') || ',' || \
iff(loan_status='Late (31-120 days)','1','0') || ',' || \
iff(loan_status='Fully Paid','1','0') || ',' || \
iff(loan_status='Default','1','0') || ',' || \
\
iff(term='36 months','1','0') || ',' || \
iff(term='60 months','1','0') || ',' || \
\
iff(grade='C','1','0') || ',' || \
iff(grade='D','1','0') || ',' || \
iff(grade='E','1','0') || ',' || \
iff(grade='F','1','0') || ',' || \
iff(grade='G','1','0') || ',' || \
\
iff(home_ownership='OWN','1','0') || ',' || \
iff(home_ownership='MORTGAGE','1','0') || ',' || \
iff(home_ownership='NE','1','0') || ',' || \
iff(home_ownership='RENT','1','0') || ',' || \
iff(home_ownership='ANY','1','0') || ',' || \
iff(home_ownership='OTHER','1','0') \
\
) as predicted_purpose \
from aggregates, loan_data \
where purpose = 'other' \
"

predict_sql = predict_sql + " limit 100;"
df = execSQLAndGetDF(predict_sql)


In [142]:
df.iloc[50:55]

Unnamed: 0,PREDICTED_PURPOSE
50,7
51,1
52,7
53,0
54,1


In [134]:
#df.groupby('PREDICTED_PURPOSE').count()
df.PREDICTED_PURPOSE.unique()




array(['0', '7', '1', '4', '5', '2', '6'], dtype=object)

## Summary
We demonstrated how Scikit Learn models can be converted into Snowflake UDF to execute ML within Snowflake



## References

Porter: 
https://github.com/nok/sklearn-porter/blob/release/1.0.0/examples/basics/index.pct.ipynb
