In [None]:
# Imported Libraries
import os
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import time

# Classifier Libraries
from sklearn.linear_model import LogisticRegression

# Other Libraries
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold, StratifiedKFold
import warnings
warnings.filterwarnings("ignore")

# Snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import *

In [None]:
from snowflake.snowpark import version as v
print(v.VERSION)

In [None]:
#### PASTE THE CONNECTION CODE BELOW PROVIDED BY INSTRUCTOR ####

In [None]:
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')

In [None]:
print(session.sql("select current_warehouse(), current_database(),current_schema(), current_role()").collect())

#### Use plain SQL

In [None]:
snowdf = session.sql("select AMOUNT, AMOUNT*2 as DBL_AMOUNT from anomaly_base")
snowdf.show()

#### Convert to Snowpark DF to Pandas DF

In [None]:
pandas_df = snowdf.to_pandas()
pandas_df.head(10)

<div>
<img src="images/pandas_df.png" width="1000"/>
</div>

#### With Snowpark, I can now create a Snowflake native dataframe and implement my logic in Python Snowpark Dataframes rather than SQL

In [None]:
snowdf = session.table("anomaly_base")

<div>
<img src="images/snowdf.png" width="1000"/>
</div>

#### Perform the same operation using Python dataframe operations that transpile down to SQL

In [None]:
snowdf.select(
    F.col('AMOUNT'),
    (F.col('AMOUNT')*2).alias('DBL_AMOUNT')
).toPandas().head()

In [None]:
snowdf.select(
    F.col('AMOUNT'),
    (F.col('AMOUNT')*2).alias('DBL_AMOUNT')
).show()

#### I can specify predicates using the filter method or other commonly available dataframe API methods

In [None]:
snowdf.filter(F.col('CLASS')==1).limit(5).toPandas().head()

#### I can also define my custom logic as a Python UDF that I can push down into Snowflake and then call using SQL or Python

In [None]:
def double_it(x: int) -> int:
    return 2*x

double_udf = udf(double_it, name=f"double_it_{user_number}", replace=True)

In [None]:
snowdf.select(F.col('AMOUNT'), double_udf(F.col('AMOUNT'))).toPandas().head()

### Exploratory Data Analysis

#### Pandas provides many methods that help in EDA but is limited in scale because it doesn't scale beyond a single node

In [None]:
pandas_df = snowdf.toPandas()
pandas_df.describe()

#### With the Snowpark DF, you are no longer limited by memory limitations as this `snowdf` exists in Snowflake and not in the memory of this notebook environment

In [None]:
snowdf.describe().toPandas()

#### Now lets take a look at our credit transaction data set...

In [None]:
print('No Frauds', (snowdf.filter(F.col('CLASS')==0).count() / snowdf.count() * 100), '% of the dataset')
print('Frauds', (snowdf.filter(F.col('CLASS')==1).count() / snowdf.count() * 100), '% of the dataset')

Notice how imbalanced is our original dataset! Most of the transactions are non-fraud. If we use this dataframe as the base for our predictive models and analysis we might get a lot of errors and our algorithms will probably overfit since it will "assume" that most transactions are not fraud. But we don't want our model to assume, we want our model to detect patterns that give signs of fraud!

#### Since our classes are highly skewed we should make them equivalent in order to have a normal distribution of the classes.

In [None]:
# Lets shuffle the data before creating the subsamples

pandas_df = pandas_df.dropna()
df = pandas_df.sample(frac=1)

# amount of fraud classes 492 rows.
fraud_df = df.loc[df['CLASS'] == 1]
non_fraud_df = df.loc[df['CLASS'] == 0][:492]

normal_distributed_df = pd.concat([fraud_df, non_fraud_df])

# Shuffle dataframe rows
new_df = normal_distributed_df.sample(frac=1, random_state=42)
new_df.drop(['TIME', 'AMOUNT'], axis=1, inplace=True)
new_df.head()

#### Now that we have our dataframe correctly balanced, we can go and train a regression model

## Train a Logistic Regression Model

#### Lets split up the features and the label first

In [None]:
X = new_df.drop('CLASS', axis=1)
y = new_df['CLASS']

#### And split our training and test sets

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

#### Lets create a test dataset along with the CLASS variable that we can use to test our model

In [None]:
combined_df = X_test
combined_df['CLASS'] = y_test.values

In [None]:
combined_df

#### Save this test data into Snowflake where we want to run inference

In [None]:
%%time
# First create the table
list_val = [i+ " number" for i in combined_df.columns]
query = f""" create or replace table ANOMALY_TEST_{user_number} ({','.join(map(str, list_val))})"""
session.sql(query).collect()

combined_df.columns = [i.upper() for i in combined_df.columns]

# Write the data from the DataFrame to the table named "TRIPS_FORECAST".
session.write_pandas(combined_df, table_name=f"ANOMALY_TEST_{user_number}", auto_create_table=True)

In [None]:
session.table(f"ANOMALY_TEST_{user_number}").toPandas().head()

#### Turn the values into an array for feeding the classification algorithms.

In [None]:
features = list(X_train.columns)

X_train = X_train.values
X_test = X_test.values
y_train = y_train.values
y_test = y_test.values

#### Train a model using the LogisticRegression algorithm provide by Scikit-Learn

In [None]:
from sklearn.model_selection import cross_val_score

clf = LogisticRegression(random_state=0).fit(X, y)
training_score = cross_val_score(clf, X_train, y_train, cv=5)
print("Classifiers: ", clf.__class__.__name__, "Has a training score of", round(training_score.mean(), 2) * 100, "% accuracy score")

#### Create an inference UDF using Snowpark

In [None]:
import pandas as pd
def predict_anomaly_lr(V1: float, V2: float, V3: float, V4: float, 
                    V5: float, V6: float, V7: float, V8: float, 
                    V9: float, V10: float, V11: float, V12: float, 
                    V13: float, V14: float, V15: float, V16: float, 
                    V17: float, V18: float, V19: float, V20: float, 
                    V21: float, V22: float, V23: float, V24: float,
                    V25: float, V26: float, V27: float, V28: float) -> int:
    """Inferring types from type hints"""
    row = pd.DataFrame([locals()], columns=features)
    return clf.predict(row)[0]

#### And push this inference function into Snowflake

In [None]:
predict_anomaly_udf = udf(predict_anomaly_lr, name=f"model_{user_number}", replace=True)

#### Now we run inference against the test data we previously saved into the `ANOMALY_PREDICTIONS` table in Snowflake

In [None]:
test_snowdf = session.table(f"ANOMALY_TEST_{user_number}")
inputs = test_snowdf.drop(F.col('CLASS'))
# Score the test data - which we know are all anomalies
prediction_snowdf = test_snowdf.select(*inputs,
                    predict_anomaly_udf(*inputs).alias('PREDICTION'), 
                    (F.col('CLASS')).alias('ACTUAL_LABEL')
                    ).limit(50)

#### Notice that this 👆🏼 inference is being run in Snowflake as a UDF and NOT in this notebook. This makes it a scalable inference pipeline which can automatically parallelize the inference as you increase the size of the warehouse in Snowflake.

In [None]:
df = prediction_snowdf.toPandas()
df.head(10)

#### Finally, lets plot the actual labels and what our model predicted

In [None]:
sns.scatterplot(data=df, x=df.index, y="ACTUAL_LABEL")
sns.scatterplot(data=df, x=df.index, y="PREDICTION")

plt.legend(labels=['ACTUAL_LABEL', 'PREDICTION'])

#### Save the predictions into Snowflake

In [None]:
%%time
# First create the table
list_val = [i+ " number" for i in combined_df.columns]
query = f""" create or replace table ANOMALY_PREDICTION_{user_number} ({','.join(map(str, list_val))})"""
session.sql(query).collect()

prediction_snowdf.write.mode("overwrite").saveAsTable(f"ANOMALY_PREDICTION_{user_number}") #push to snowflake

<h2> References: </h2>
<ul>
    <li>Adopted from this <a href="https://www.kaggle.com/janiobachmann/credit-fraud-dealing-with-imbalanced-datasets">Kaggle notebook</a></li>
<li>Hands on Machine Learning with Scikit-Learn & TensorFlow by Aurélien Géron (O'Reilly). CopyRight 2017 Aurélien Géron  </li>
<li><a src="https://www.youtube.com/watch?v=DQC_YE3I5ig&t=794s" > Machine Learning - Over-& Undersampling - Python/ Scikit/ Scikit-Imblearn </a>by Coding-Maniac</li>
<li><a src="https://www.kaggle.com/lane203j/auprc-5-fold-c-v-and-resampling-methods"> auprc, 5-fold c-v, and resampling methods
</a> by Jeremy Lane (Kaggle Notebook) </li>
</ul>