#  Credit Scoring with Snowpark for Python
Author: Zohar Nissare-Houssen

In this notebook, we are going to use the [Snowpark](https://dhttps://docs.snowflake.com/en/developer-guide/snowpark/reference/python/index.html) Python API and Python UDFs to run through a credit card scoring demo.

In this scenario, Snowbank wants to use their existing credit files to analyze the current credit standings on whether the loans are being paid without any issues, and/or if there are any delays/default. 

Based on the current credit standing, Snowbank wants to build a machine learning credit scoring algorithm based on the dataset to be able to automate an assessment on whether a loan should be approved or declined.

## Prerequisite

Please run the Credit Scoring Demo Setup Notebook prior to running this demo.

This version requires Snowpark **0.6.0** or higher

## 1. Data Exploration

In this section, we will explore the dataset for the existing credits on file.

### 1.1 Opening a Snowflake Session

In [1]:
from snowflake.snowpark import *
from snowflake.snowpark import version
from snowflake.snowpark.functions import *

import json
import pandas as pd

  warn_incompatible_dep(


In [3]:
with open('creds.json') as f:
    connection_parameters = json.load(f)    

session = Session.builder.configs(connection_parameters).create()

print(session.sql("select current_warehouse(), current_database(), current_schema(), current_user(), current_role()").collect())

# Print the current version of the Snowpark library
print(version.VERSION)

[Row(CURRENT_WAREHOUSE()='COMPUTE_WH', CURRENT_DATABASE()='CREDIT_BANK', CURRENT_SCHEMA()='PUBLIC', CURRENT_USER()='ANAMHULUBA', CURRENT_ROLE()='ACCOUNTADMIN')]
(0, 7, 0)


### 1.2 The Data

In [7]:
credit_df = session.table("CREDIT_FILES")
credit_df.count()

2940

In [10]:
credit_df.describe().show()
session.sql("SELECT * FROM CREDIT_FILES").collect()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"SUMMARY"  |"CREDIT_REQUEST_ID"  |"CREDIT_AMOUNT"  |"CREDIT_DURATION"  |"PURPOSE"  |"INSTALLMENT_COMMITMENT"  |"OTHER_PARTIES"  |"CREDIT_STANDING"  |"CREDIT_SCORE"  |"CHECKING_BALANCE"  |"SAVINGS_BALANCE"  |"EXISTING_CREDITS"  |"ASSETS"  |"HOUSING"  |"QUALIFICATION"  |"JOB_HISTORY"  |"AGE"         |"SEX"  |"MARITAL_STATUS"  |"NUM_DEPENDENTS"  |"RESIDENCE_SINCE"  |"OTHER_PAYMENT_PLANS"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

[Row(CREDIT_REQUEST_ID=147, CREDIT_AMOUNT=8600, CREDIT_DURATION=6, PURPOSE='Vehicle', INSTALLMENT_COMMITMENT=1, OTHER_PARTIES='None', CREDIT_STANDING='good', CREDIT_SCORE=466, CHECKING_BALANCE=-728.12, SAVINGS_BALANCE=17.0, EXISTING_CREDITS=2, ASSETS='None', HOUSING='own', QUALIFICATION='skilled', JOB_HISTORY=15, AGE=39, SEX='F', MARITAL_STATUS='Married', NUM_DEPENDENTS=1, RESIDENCE_SINCE=4, OTHER_PAYMENT_PLANS='none'),
 Row(CREDIT_REQUEST_ID=248, CREDIT_AMOUNT=12040, CREDIT_DURATION=6, PURPOSE='Vehicle', INSTALLMENT_COMMITMENT=4, OTHER_PARTIES='None', CREDIT_STANDING='good', CREDIT_SCORE=202, CHECKING_BALANCE=0.0, SAVINGS_BALANCE=2443.0, EXISTING_CREDITS=1, ASSETS='None', HOUSING='rent', QUALIFICATION='skilled', JOB_HISTORY=1, AGE=35, SEX='M', MARITAL_STATUS='Single', NUM_DEPENDENTS=1, RESIDENCE_SINCE=1, OTHER_PAYMENT_PLANS='bank'),
 Row(CREDIT_REQUEST_ID=112, CREDIT_AMOUNT=3920, CREDIT_DURATION=15, PURPOSE='Tuition', INSTALLMENT_COMMITMENT=4, OTHER_PARTIES='None', CREDIT_STANDING='go

In [15]:
dff = session.sql("SELECT * FROM CREDIT_FILES").collect()

In [17]:
credit_df.toPandas()

SnowparkFetchDataException: (1406): Failed to fetch a Pandas Dataframe. The error is: to_pandas() did not return a Pandas DataFrame. If you use session.sql(...).to_pandas(), the input query can only be a SELECT statement. Or you can use session.sql(...).collect() to get a list of Row objects for a non-SELECT statement, then convert it to a Pandas DataFrame.

### 1.3  Visualizing the Numeric Features

From this visualization, we can see a few interesting characteristics:

* Most of the credit requests are for small amounts (< 50k)
* Most of the credit terms are 20 months or less.
* Most of the applicants have a very good credit score.
* Most of the applicants do not have a lot of balance in either credits or savings with Snowbank.
* Most of the applicants are less than 40 years old.

In [None]:
credit_df.toPandas().hist(figsize=(15,15))

### 1.4  Visualizing the Categorical Features

From this visualization, we can see a few interesting characteristics:

* Most of the popular credit requests are related to either a vehicle purchase or consumer goods.
* The vast majority of loans do not have guarantors, nor co-applicants.
* Most of credit in file is in good standing.
* The majority of the applicants are male, foreign workers, and skilled who own their own house/apartment.
* Higher amounts of loans (which threshold varies per category of loan) have a higher chance of defaulting.

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

sns.set(style="darkgrid")


fig, axs = plt.subplots(5, 2, figsize=(15, 30))
df = credit_df.toPandas()
sns.countplot(data=df, y="PURPOSE", ax=axs[0,0])
sns.countplot(data=df, x="OTHER_PARTIES", ax=axs[0,1])
sns.countplot(data=df, x="CREDIT_STANDING", ax=axs[1,0])
sns.countplot(data=df, x="ASSETS", ax=axs[1,1])
sns.countplot(data=df, x="HOUSING", ax=axs[2,0])
sns.countplot(data=df, x="QUALIFICATION", ax=axs[2,1])
sns.countplot(data=df, x="SEX", ax=axs[3,0])
sns.countplot(data=df, x="MARITAL_STATUS", ax=axs[3,1])
sns.countplot(data=df, x="OTHER_PAYMENT_PLANS", ax=axs[4,0])
sns.stripplot(y="PURPOSE", x="CREDIT_AMOUNT", data=df, hue='CREDIT_STANDING', jitter=True, ax=axs[4,1])
plt.show()

### 1.5 Running queries through Snowpark API

We can use the Snowpark API to run queries to get various insights. For example, let's try to determine the range of f loans per different category. We can check the Snowflake query history and review how the Snowpark API has been pushed down as SQL.

In [None]:
df_loan_status = credit_df.select(col("PURPOSE"),col("CREDIT_AMOUNT"))\
                          .groupBy(col("PURPOSE"))\
                          .agg([min(col("CREDIT_AMOUNT")).as_("MIN_CREDIT_AMOUNT"), max(col("CREDIT_AMOUNT")).as_("MAX_CREDIT_AMOUNT"), median(col("CREDIT_AMOUNT")).as_("MED_CREDIT_AMOUNT"),avg(col("CREDIT_AMOUNT")).as_("AVG_CREDIT_AMOUNT")])\
                          .sort(col("PURPOSE"))
df_loan_status.toPandas()

## 2. Data Transformation and Encoding


For the current use case, in order to prepare the data for machine learning, we need to encode the categorical values into numerical. 

In order to achieve this, we can leverage Snowflake compute by defining Python UDFs in order to perform the encoding.

### 2.1 Encoding Categorical Values

In [None]:
session.sql("create or replace stage fnstage encryption = (type = 'SNOWFLAKE_SSE')").collect()

In [None]:
@udf(name="PURPOSE_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Purpose_Encode(x : str) -> int:
  if x == "Consumer Goods":
    return 1
  elif x == "Vehicle":
    return 2
  elif x == "Tuition":
    return 3
  elif x == "Business":
    return 4
  elif x == "Repairs":
    return 5
  else:
    return 0

@udf(name="OTHER_PARTIES_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Other_Parties_Encode(x : str) -> int:
  if x == "Guarantor":
    return 1
  elif x == "Co-Applicant":
    return 2
  else:
    return 0

@udf(name="CREDIT_STANDING_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Credit_Standing_Encode(x : str) -> int:
  if x == "good":
    return 1
  else:
    return 0

@udf(name="ASSETS_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Assets_Encode(x : str) -> int:
  if x == "Vehicle":
    return 1
  elif x == "Investments":
    return 2
  elif x == "Home":
    return 3
  else:
    return 0

@udf(name="HOUSING_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Housing_Encode(x : str) -> int:
  if x == "rent":
    return 1
  elif x == "own":
    return 2
  else:
    return 0

@udf(name="QUALIFICATION_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Qualification_Encode(x : str) -> int:
  if x == "unskilled":
    return 1
  elif x == "skilled":
    return 2
  elif x == "highly skilled":
    return 3
  else:
    return 0

@udf(name="SEX_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Sex_Encode(x : str) -> int:
  if x == "M":
    return 1
  else:
    return 0

@udf(name="MARITAL_STATUS_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Marital_Status_Encode(x : str) -> int:
 if x == "Married":
    return 1
 elif x == "Single":
    return 2
 else:
    return 0


@udf(name="OTHER_PAYMENT_PLANS_Encode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Other_Payment_Plans_Encode(x : str) -> int:
  if x == "bank":
    return 1
  elif x == "stores":
    return 2
  else:
    return 0

@udf(name="CREDIT_SCORE_Decode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Credit_Score_Decode(x : int) -> str:
  if x == 1:
    return "Approved"
  else:
    return "Denied"

We can invoke the UDF through the Snowpark API as follows:

In [None]:
df=credit_df.select(col("PURPOSE"), \
                    Purpose_Encode(col("PURPOSE")).as_("PURPOSE_ENCODED")) \
                    .groupBy(col("PURPOSE"),col("PURPOSE_ENCODED")) \
                    .agg((count("PURPOSE")).alias("COUNT"))

df.toPandas()

We could use the UDFs just created within Snowflake through SQL as follows
>```sql
select 'Tuition' as PURPOSE, PURPOSE_Encode('Tuition') as PURPOSE_CODE;
>```

### 2.2 Decoding an integer value using a vectorized UDF

We also want to create a decoding function mapping the output of our machine learning model to a string, on whether a credit request has been approved or denied. For this, we want to demonstrate with a simple example the use of a vectorized UDF. 

Compared to the default row-by-row processing pattern of a normal UDF, which sometimes is inefficient, a vectorized UDF allows vectorized operations on a dataframe, with the input as a Pandas DataFrame or Pandas Series. In a vectorized UDF, you can operate on a batches of rows by handling Pandas DataFrame or Pandas Series.

As you can see in the example below, we pass in as a parameter a Pandas series corresponding to the value of the columns. 

In [None]:
from snowflake.snowpark.types import PandasSeries, PandasDataFrame 

@udf(name="CREDIT_SCORE_Decode", is_permanent=True, stage_location="@fnstage", replace=True, session=session)
def Credit_Score_Decode(series: PandasSeries[int])-> PandasSeries[str]:
    return series.apply(lambda x: "Approved" if (x == 1) else "Denied")

Let's try to test the UDF previously created by passing a Pandas series with 3 values:

In [None]:
df1 = session.createDataFrame([1,0,1]).to_df("a")

df1.select(Credit_Score_Decode("a")).collect()

### 2.3 Preparing the Feature Matrix for ML

In this section, we are going to leverage the Snowpark Python API, along with the Python UDFs that we just created previously in order to prepare a feature matrix for a Random Forest Classifier Model.

In [None]:
feature_matrix = credit_df.select(
                                   Purpose_Encode(col("PURPOSE")).as_("PURPOSE_CODE"), 
                                   Qualification_Encode(col("QUALIFICATION")).as_("QUALIFICATION_CODE"), 
                                   Other_Parties_Encode(col("OTHER_PARTIES")).as_("OTHER_PARTIES_CODE"),
                                   Other_Payment_Plans_Encode(col("OTHER_PAYMENT_PLANS")).as_("OTHER_PAYMENT_PLANS_CODE"),
                                   Housing_Encode(col("HOUSING")).as_("HOUSING_CODE"),
                                   Assets_Encode(col("ASSETS")).as_("ASSETS_CODE"),
                                   Sex_Encode(col("SEX")).as_("SEX_CODE"),
                                   Marital_Status_Encode(col("MARITAL_STATUS")).as_("MARITAL_STATUS_CODE"),
                                   Credit_Standing_Encode(col("CREDIT_STANDING")).as_("CREDIT_STANDING_CODE"),
                                   col("CHECKING_BALANCE"),
                                   col("SAVINGS_BALANCE"),
                                   col("AGE"),
                                   col("JOB_HISTORY"),
                                   col("CREDIT_SCORE"),
                                   col("CREDIT_DURATION"), 
                                   col("CREDIT_AMOUNT"),
                                   col("RESIDENCE_SINCE"),
                                   col("INSTALLMENT_COMMITMENT"),
                                   col("NUM_DEPENDENTS"),
                                   col("EXISTING_CREDITS")
                                 )                                  

Now that the feature matrix has been defined, we will convert it into a Pandas Dataframe.

In [None]:
df = feature_matrix.toPandas().astype(int)

In [None]:
df.info()

This is what the data looks like:

In [None]:
df.head()

## 3. Random Forest Model Training

We are going to leverage the Random Forest Classifier Model available as part of the scikit-learn popular ML Library available in Python.

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(df.drop('CREDIT_STANDING_CODE',axis=1), 
                                                    df['CREDIT_STANDING_CODE'], test_size=0.30)

In [None]:
from sklearn.ensemble import RandomForestClassifier

rfc = RandomForestClassifier(n_estimators=100)
rfc.fit(X_train, y_train)

## 4. Testing the Model

In [None]:
rfc_pred = rfc.predict(X_test)

In [None]:
from sklearn.metrics import classification_report,confusion_matrix
print(classification_report(y_test,rfc_pred))

In [None]:
print(confusion_matrix(y_test,rfc_pred))

As we are seeing a good performance for the model in terms of precision for predicting 0 -> Loan Denial, Snowbank wants to operationalize the model within their Snowflake environment.

## 5. Export the Model within a Python UDF for Scoring

In [None]:
from joblib import dump, load
dump(rfc, './credit_score.joblib')

In [None]:
session.file.put("credit_score.joblib", "@fnstage", auto_compress=False, overwrite=True)

In [None]:
session.clear_imports()
session.add_import("@fnstage/credit_score.joblib")
#session.add_packages("joblib", "pandas", "scikit-learn")

In [None]:
@udf(name = 'credit_score', is_permanent = True, replace = True, stage_location = '@FNSTAGE', session = session, 
     packages=["joblib","pandas","scikit-learn"])
def credit_score(arg: list) -> int:
    import joblib
    import sys
    import pandas as pd

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    pipe_file = import_dir + 'credit_score.joblib'
        
    pipeline = joblib.load(pipe_file)
    row = pd.DataFrame([arg], columns=[ 'PURPOSE_CODE',
                                        'QUALIFICATION_CODE',
                                        'OTHER_PARTIES_CODE',
                                        'OTHER_PAYMENT_PLANS_CODE',
                                        'HOUSING_CODE',
                                        'ASSETS_CODE',
                                        'SEX_CODE',
                                        'MARITAL_STATUS_CODE',
                                        'CHECKING_BALANCE',
                                        'SAVINGS_BALANCE',
                                        'AGE',
                                        'JOB_HISTORY',
                                        'CREDIT_SCORE',
                                        'CREDIT_DURATION',
                                        'CREDIT_AMOUNT',
                                        'RESIDENCE_SINCE',
                                        'INSTALLMENT_COMMITMENT',
                                        'NUM_DEPENDENTS',
                                        'EXISTING_CREDITS'])           
    return pipeline.predict(row)[0]

## 6. Performing Inference in Snowflake

In the example below, we want to process an existing batch of 60 credit pending requests and provide an assessment on whether the loan should be approved or denied. The data looks like as follows:

In [None]:
df_cred_req = session.table("CREDIT_REQUESTS")

In [None]:
df_cred_req.toPandas()

In [None]:
df.info()

### 6.1 Develop Stored Procedure for scoring

As the bank receives the credit requests in near real-time, we want to write a stored procedure which could be called through a task to score micro-batches of requests as they come in. 

The Python stored procedure will first build the input features for the model using the Snowpark API and invoke the Python UDF we built earlier for scoring.

In [None]:
session.add_packages('snowflake-snowpark-python')

@sproc(name="process_credit_requests", replace=True, is_permanent=True, stage_location="@fnstage")
def process_credit_requests_fn (session: snowflake.snowpark.Session, credit_requests: str, credit_assessment: str) -> int:
    
    #Build the input features for the model using the Snowpark API as well as the Python UDFs for encoding.
    df_cred_req = session.table(credit_requests).select( 
                            col("CREDIT_REQUEST_ID"), col("PURPOSE"), 
                            Purpose_Encode(col("PURPOSE")).as_("PURPOSE_CODE"),
                            Qualification_Encode(col("QUALIFICATION")).as_("QUALIFICATION_CODE"),
                            Other_Parties_Encode(col("OTHER_PARTIES")).as_("OTHER_PARTIES_CODE"),
                            Other_Payment_Plans_Encode(col("OTHER_PAYMENT_PLANS")).as_("OTHER_PAYMENT_PLANS_CODE"),
                            Housing_Encode(col("HOUSING")).as_("HOUSING_CODE"),
                            Assets_Encode(col("ASSETS")).as_("ASSETS_CODE"),
                            Sex_Encode(col("SEX")).as_("SEX_CODE"),
                            Marital_Status_Encode(col("MARITAL_STATUS")).as_("MARITAL_STATUS_CODE"),
                            col("CHECKING_BALANCE"),
                            col("SAVINGS_BALANCE"),
                            col("AGE"),
                            col("JOB_HISTORY"),
                            col("CREDIT_SCORE"),
                            col("CREDIT_DURATION"), 
                            col("CREDIT_AMOUNT"), 
                            col("RESIDENCE_SINCE"),
                            col("INSTALLMENT_COMMITMENT"),
                            col("NUM_DEPENDENTS"),
                            col("EXISTING_CREDITS")
                         )
    
    #Call the UDF to score the existing credit requests read previously    
    input_features = [ 'PURPOSE_CODE',
                   'QUALIFICATION_CODE',
                   'OTHER_PARTIES_CODE',
                   'OTHER_PAYMENT_PLANS_CODE',
                   'HOUSING_CODE',
                   'ASSETS_CODE',
                   'SEX_CODE',
                   'MARITAL_STATUS_CODE',
                   'CHECKING_BALANCE',
                   'SAVINGS_BALANCE',
                   'AGE',
                   'JOB_HISTORY',
                   'CREDIT_SCORE',
                   'CREDIT_DURATION',
                   'CREDIT_AMOUNT',
                   'RESIDENCE_SINCE',
                   'INSTALLMENT_COMMITMENT',
                   'NUM_DEPENDENTS',
                   'EXISTING_CREDITS']           

    df_assessment = df_cred_req.select(col("CREDIT_REQUEST_ID"), col("PURPOSE"), col("CREDIT_AMOUNT"), col("CREDIT_DURATION"),
                    call_udf("credit_score_decode",(call_udf("credit_score", array_construct(*input_features)))).as_("CREDIT_STATUS"))
    
    df_assessment.write.mode("overwrite").saveAsTable(credit_assessment)
    
    #The stored procedure will return the total number of credit request assessed.
    return df_assessment.count()

### 6.2 Invoking the Stored Procedure for scoring

In [None]:
session.call("process_credit_requests", "credit_requests", "credit_assessments")

We could run this query directy from Snowflake as follows

>```sql
CALL process_credit_requests ('credit_requests', 'credit_assessments');
>```

Let's now take a look at the credit_assessments table:

In [None]:
session.table("credit_assessments").toPandas()