## 3. Data Scientist - Create ML models with Spark

In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('Spark - Data Scientist Demo') \
.config('spark.jars', '/usr/lib/spark/jars/spark-bigquery-latest.jar') \
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0") \
.getOrCreate()

In [3]:
spark.conf.get("spark.app.id")
spark.sparkContext._jvm.scala.util.Properties.versionString()

'version 2.12.12'

Create a Spark DataFrame from bq table

In [7]:
table = "datalake-vol2:datalake_vol2_raw.transaction_data_train"
df_transaction_data_train = spark.read \
.format("bigquery") \
.option("table", table) \
.load()

In [8]:
view_name = "bank_transaction_view"
df_transaction_data_train.createOrReplaceTempView(view_name)

In [9]:
data = spark.sql("""
SELECT * 
FROM bank_transaction_view
""")

In [10]:
data = data.drop('transactionID')
data.cache()

DataFrame[step: bigint, type: string, amount: double, oldbalanceOrg: double, newbalanceOrig: double, oldbalanceDest: double, newbalanceDest: double, isFraud: bigint]

Create a pyspark ml pipeline to process the data and train a Decision Tree classifier 

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier


categorical_cols = [field for (field, data_type) in data.dtypes 
                    if ((data_type == "string") & (field != 'isFraud'))]

index_output_cols = [x + "_Index" for x in categorical_cols]

ohe_output_cols = [x + "_OHE" for x in categorical_cols]

categorical_string_indexer = StringIndexer(
    inputCols=categorical_cols,
    outputCols=index_output_cols,
    handleInvalid="skip").fit(data)

ohe_encoder = OneHotEncoder(
    inputCols=index_output_cols,
    outputCols=ohe_output_cols)

numeric_cols = [field for (field, data_type) in data.dtypes 
                if (((data_type == "double") | (data_type == "int") | (data_type == "bigint"))
                  & (field != 'isFraud'))]

assembler_inputs = ohe_output_cols + numeric_cols

vec_assembler = VectorAssembler(
    inputCols=assembler_inputs,
    outputCol="features")

dtc = DecisionTreeClassifier(labelCol="isFraud", featuresCol="features", maxDepth=3, maxBins=12)


pipeline = Pipeline(stages=[
    categorical_string_indexer,
    ohe_encoder,
    vec_assembler,
    dtc 
])


Evaluate the model on a hold out set 

In [11]:
(train_data, test_data) = data.randomSplit([0.9, 0.1], seed=42)

In [12]:
train_data.cache()
test_data.cache()

DataFrame[step: bigint, type: string, amount: double, oldbalanceOrg: double, newbalanceOrig: double, oldbalanceDest: double, newbalanceDest: double, isFraud: bigint]

In [None]:
model = pipeline.fit(train_data)

In [None]:
predictions = model.transform(test_data)

In [None]:
predictions.show(5)

In [None]:
# Select example rows to display.
predictions.select("prediction", "isFraud").show(5)

### Evaluate the model

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

binaryEvaluator = BinaryClassificationEvaluator(labelCol="isFraud")

auc = binaryEvaluator.evaluate(predictions, {binaryEvaluator.metricName: "areaUnderROC"})
print(auc)

In [None]:
tests_np = np.array((predictions.select("isFraud","prediction").collect()))
tests_np

In [None]:
tests_np = np.array((predictions.select("isFraud","prediction").collect()))

np_acc = accuracy_score(tests_np[:,0], tests_np[:,1])
np_f1 = f1_score(tests_np[:,0], tests_np[:,1])
np_precision = precision_score(tests_np[:,0], tests_np[:,1])
np_recall = recall_score(tests_np[:,0], tests_np[:,1])
np_auc = roc_auc_score(tests_np[:,0], tests_np[:,1])

print("f1:", np_f1)
print("precision:", np_precision)
print("recall:", np_recall)

In [None]:
# import package that will generate the confusion matrix scores
from sklearn.metrics import confusion_matrix
# import packages that will help display the scores
import pandas as pd

confusion_matrix_scores = confusion_matrix(tests_np[:,0], 
                                           tests_np[:,1], 
                                           labels=[1, 0])

# display scores as a heatmap
df = pd.DataFrame(confusion_matrix_scores, 
                  columns = ["Predicted True", "Predicted Not True"],
                  index = ["Actually True", "Actually Not True"])


df.head()

### Save and load model_pipeline

In [4]:
from pyspark.ml import Pipeline, PipelineModel

model_path = 'gs://datalake-vol2-data/'

model.write().overwrite().save(model_path)

In [5]:
loaded_pipeline_model = PipelineModel.load(model_path)

In [13]:
# Make predictions using loaded model

tests = loaded_pipeline_model.transform(test_data)

tests.show(5)

+----+-------+--------+-------------+--------------+--------------+--------------+-------+----------+-------------+--------------------+------------------+--------------------+----------+
|step|   type|  amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|type_Index|     type_OHE|            features|     rawPrediction|         probability|prediction|
+----+-------+--------+-------------+--------------+--------------+--------------+-------+----------+-------------+--------------------+------------------+--------------------+----------+
|   1|CASH_IN| 5763.99|   1276098.29|    1281862.28|      24632.95|      46462.23|      0|       2.0|(4,[2],[1.0])|[0.0,0.0,1.0,0.0,...|[4195304.0,2525.0]|[0.99939849860487...|       0.0|
|   1|CASH_IN| 6287.28|   4817829.42|    4824116.71|       11274.0|           0.0|      0|       2.0|(4,[2],[1.0])|[0.0,0.0,1.0,0.0,...|[4195304.0,2525.0]|[0.99939849860487...|       0.0|
|   1|CASH_IN|27363.26|   2311674.95|    2339038.21| 1.91692

In [None]:
bq_table_path = 'datalake_vol2_annotated.transaction_data_test'
schema_inline = tests.schema.simpleString().replace('struct<', '').replace('>', '').replace('int', 'int64').replace('double', 'float64').replace('bigint64', 'int64').replace('vector', 'STRING')

!bq mk --table \
{bq_table_path} \
{schema_inline}

In [None]:
tests.write \
.format("bigquery") \
.option("table", 'datalake-vol2:datalake_vol2_annotated.transaction_data_test') \
.option("temporaryGcsBucket", "datalake-vol2-data") \
.mode('overwrite') \
.save()

In [None]:
%%bigquery
SELECT * FROM datalake_vol2_annotated.INFORMATION_SCHEMA.TABLES;

### Predict Results

In [14]:
path_to_predict_csv = "gs://datalake-vol2-data/dataset/transaction_data_test.csv"
df_transaction_data_predict_from_csv = spark \
.read \
.option("inferSchema" , "true") \
.option("header" , "true") \
.csv(path_to_predict_csv)
df_transaction_data_predict_from_csv.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- transactionID: string (nullable = true)



In [15]:
# Make predictions on test.
predictions = loaded_pipeline_model.transform(df_transaction_data_predict_from_csv)

# Select example rows to display.
predictions.select("prediction", "isFraud").show(5)

+----------+-------+
|prediction|isFraud|
+----------+-------+
|       0.0|      0|
|       0.0|      0|
|       0.0|      0|
|       0.0|      0|
|       0.0|      0|
+----------+-------+
only showing top 5 rows



In [16]:
predictions.show(5)

+----+-----+-------+-------------+--------------+--------------+--------------+-------+--------------------+----------+---------+--------------------+------------------+--------------------+----------+
|step| type| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|       transactionID|type_Index| type_OHE|            features|     rawPrediction|         probability|prediction|
+----+-----+-------+-------------+--------------+--------------+--------------+-------+--------------------+----------+---------+--------------------+------------------+--------------------+----------+
| 310|DEBIT|3334.31|     102439.0|      99104.69|     962290.61|     965624.92|      0|818ca43a-9ac9-484...|       4.0|(4,[],[])|[0.0,0.0,0.0,0.0,...|[4195304.0,2525.0]|[0.99939849860487...|       0.0|
| 351|DEBIT|5268.56|        468.0|           0.0|    1672373.71|    1677642.27|      0|70948920-5fa6-401...|       4.0|(4,[],[])|(10,[4,5,6,8,9],[...|[4195304.0,2525.0]|[0.99939849860487...|  

### Join Data

In [17]:
path_to_join_csv = "gs://datalake-vol2-data/dataset/transaction_data_join.csv"
df_transaction_data_join_from_csv = spark \
.read \
.option("inferSchema" , "true") \
.option("header" , "true") \
.csv(path_to_join_csv)
df_transaction_data_join_from_csv.printSchema()

root
 |-- nameOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- transactionID: string (nullable = true)



In [26]:
result = predictions.join(df_transaction_data_join_from_csv, "transactionID")

In [27]:
result.show(5)

+--------------------+----+--------+---------+-------------+--------------+--------------+--------------+-------+----------+-------------+--------------------+------------------+--------------------+----------+----------+-----------+
|       transactionID|step|    type|   amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|type_Index|     type_OHE|            features|     rawPrediction|         probability|prediction|  nameOrig|   nameDest|
+--------------------+----+--------+---------+-------------+--------------+--------------+--------------+-------+----------+-------------+--------------------+------------------+--------------------+----------+----------+-----------+
|000013e5-f6a2-4b9...| 398| PAYMENT| 35466.89|      82453.0|      46986.11|           0.0|           0.0|      0|       1.0|(4,[1],[1.0])|(10,[1,4,5,6,7],[...|[4195304.0,2525.0]|[0.99939849860487...|       0.0|C976174798| M546696585|
|00002da3-5819-4ac...| 260| PAYMENT|  3980.21|        908.0|    

In [22]:
result.count()

1271928

### Store result to Enriched Zone

In [29]:
bq_result_table_path = 'datalake-vol2:datalake_vol2_enriched.transaction_analysis_result'
schema_inline = result.schema.simpleString().replace('struct<', '').replace('>', '').replace('int', 'int64').replace('bigint64', 'int64').replace('double', 'float64').replace('vector', 'STRING')

!bq mk --table \
{bq_result_table_path} \
{schema_inline}

Table 'datalake-vol2:datalake_vol2_enriched.transaction_analysis_result' successfully created.


In [30]:
result.write \
.format("bigquery") \
.option("table", bq_result_table_path) \
.option("temporaryGcsBucket", "datalake-vol2-data") \
.mode('overwrite') \
.save()

In [31]:
%%bigquery
SELECT * FROM datalake_vol2_enriched.INFORMATION_SCHEMA.TABLES;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 445.63query/s]                          
Downloading: 100%|██████████| 2/2 [00:02<00:00,  1.23s/rows]


Unnamed: 0,table_catalog,table_schema,table_name,table_type,is_insertable_into,is_typed,creation_time
0,datalake-vol2,datalake_vol2_enriched,bank_result,BASE TABLE,YES,NO,2021-01-14 12:51:00.955000+00:00
1,datalake-vol2,datalake_vol2_enriched,transaction_analysis_result,BASE TABLE,YES,NO,2021-01-27 10:59:46.736000+00:00


In [33]:
%%bigquery 
SELECT * FROM `datalake-vol2.datalake_vol2_enriched.transaction_analysis_result`
LIMIT 5

Query complete after 0.00s: 100%|██████████| 2/2 [00:00<00:00, 702.68query/s]                         
Downloading: 100%|██████████| 5/5 [00:02<00:00,  2.06rows/s]


Unnamed: 0,transactionID,step,type,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,isFraud,type_Index,type_OHE,features,rawPrediction,probability,prediction,nameOrig,nameDest
0,00179de5-2372-4ddf-a09f-9e8f8610d0c5,525,DEBIT,1767.85,603.0,0.0,141760.64,143528.49,0,4.0,"{'type': 0, 'size': 4, 'indices': {'list': []}...","{'type': 0, 'size': 10, 'indices': {'list': [{...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...",0.0,C855266015,C205529901
1,007953a2-964e-4b2c-8087-6520bae7c9d6,137,DEBIT,4771.72,0.0,0.0,4441936.36,4446708.08,0,4.0,"{'type': 0, 'size': 4, 'indices': {'list': []}...","{'type': 0, 'size': 10, 'indices': {'list': [{...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...",0.0,C437172340,C913843316
2,01d71f63-9714-4ee1-b7b4-21737d152ebd,181,DEBIT,615.78,0.0,0.0,66308.88,66924.66,0,4.0,"{'type': 0, 'size': 4, 'indices': {'list': []}...","{'type': 0, 'size': 10, 'indices': {'list': [{...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...",0.0,C1346040274,C836517205
3,02350904-81a3-466c-afb7-9eef28a1c618,226,DEBIT,2654.77,0.0,0.0,248858.36,251513.13,0,4.0,"{'type': 0, 'size': 4, 'indices': {'list': []}...","{'type': 0, 'size': 10, 'indices': {'list': [{...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...",0.0,C277389310,C69054979
4,024dea1d-50a1-4533-ae0a-6f693edb5f94,184,DEBIT,22780.0,10314.0,0.0,1947504.62,1970284.62,0,4.0,"{'type': 0, 'size': 4, 'indices': {'list': []}...","{'type': 0, 'size': 10, 'indices': {'list': [{...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...",0.0,C2094647121,C1347177629
