## 3. Data Scientist - Create ML models with Spark

In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
import re

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('Spark - Data Scientist Demo') \
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0") \
.getOrCreate()

In [3]:
spark.conf.get("spark.app.id")
spark.sparkContext._jvm.scala.util.Properties.versionString()

'version 2.12.10'

In [4]:
project_id = !gcloud config list --format 'value(core.project)' 2>/dev/null
bq_raw_dataset_name = project_id[0] + '-raw'
bq_raw_dataset_name = bq_raw_dataset_name.replace('-', '_')
bq_raw_table_path = project_id[0] + ':' + bq_raw_dataset_name + '.transaction_data_train' 
bq_raw_table_path

'thetraining-project:thetraining_project_raw.transaction_data_train'

#### Load Training Data using Spark

In [5]:
data = spark.read \
.format("bigquery") \
.option("table", bq_raw_table_path) \
.load()

In [6]:
data = data.drop('transactionID')
data.cache()

DataFrame[step: bigint, type: string, amount: double, oldbalanceOrg: double, newbalanceOrig: double, oldbalanceDest: double, newbalanceDest: double, isFraud: bigint]

### Oversampling the minority label (isFraud=1)

In [7]:
from pyspark.sql.functions import col, explode, array, lit

major_df = data.filter(col("isFraud") == 0)
minor_df = data.filter(col("isFraud") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

ratio: 780


In [8]:
# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in range(ratio)]))).drop('dummy')
# combine both oversampled minority rows and previous majority rows
data = major_df.unionAll(oversampled_df)
data.show()

+----+--------+----------+-------------+--------------+--------------+--------------+-------+
|step|    type|    amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|
+----+--------+----------+-------------+--------------+--------------+--------------+-------+
| 302|TRANSFER| 419955.64|      20978.0|           0.0|       30562.0|     751695.31|      0|
| 322|TRANSFER| 169566.23|       3832.0|           0.0|    2717714.64|    2887280.87|      0|
| 540|TRANSFER|1348788.69|        675.0|           0.0|     171461.44|    1520250.13|      0|
| 349|TRANSFER| 790428.92|         66.0|           0.0|     305371.89|    1095800.81|      0|
|   8|TRANSFER|1746499.66|        209.0|           0.0|       75521.0|     2180688.8|      0|
| 156|TRANSFER|1350240.79|     112105.0|           0.0|    1972716.25|    3322957.04|      0|
| 254|TRANSFER| 166272.49|      51930.0|           0.0|    7321489.31|     7487761.8|      0|
| 476|TRANSFER| 223746.33|     160672.0|           0.0|     

#### Create a pyspark ML pipeline 

The pipeline will transform the features and train a Decision Tree classifier 

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier


categorical_cols = [field for (field, data_type) in data.dtypes 
                    if ((data_type == "string") & (field != 'isFraud'))]

ohe_output_cols = [x + "_OHE" for x in categorical_cols]

string_indexers = StringIndexer(inputCol='type', outputCol='type' +"_Index").fit(data) 

one_hot_indexer = OneHotEncoder(inputCol='type_Index', outputCol='type' +"_OHE")

numeric_cols = [field for (field, data_type) in data.dtypes 
                if (((data_type == "double") | (data_type == "int") | (data_type == "bigint"))
                  & (field != 'isFraud'))]

assembler_inputs = ohe_output_cols + numeric_cols

vec_assembler = VectorAssembler(
    inputCols=assembler_inputs,
    outputCol="features")

#DecisionTree
dtc = DecisionTreeClassifier(labelCol="isFraud", featuresCol="features", maxDepth=3, maxBins=12)
# #Linear Regression
# dtc = LogisticRegression(labelCol="isFraud", featuresCol="features", maxIter=10, )

pipeline = Pipeline(stages=[
    string_indexers,
    one_hot_indexer,
    vec_assembler,
    dtc 
])

## Hyperparameter tuning

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
paramGrid = ParamGridBuilder() \
    .addGrid(dtc.maxDepth, [3, 6, 10, 11 ]) \
    .addGrid(dtc.maxBins, [12, 24, 200]) \
    .build()
# #Linear Regression
# paramGrid = ParamGridBuilder() \
#     .addGrid(lr.regParam, [0.1, 0.01]) \
#     .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="isFraud"),
                          numFolds=2)

# Run cross-validation, and choose the best set of parameters.
model = crossval.fit(data)

#### Train the model 

In [10]:
model = pipeline.fit(data)

#### Persist the model to GCS 

In [12]:
from pyspark.ml import Pipeline, PipelineModel

gcs_bucket = project_id[0] + '-data'
model_path = f'gs://{gcs_bucket}/model/'
# model = model.bestModel
model.write().overwrite().save(model_path)

#### Predict on test data
**TODO** 

* Provide path_to_predict_csv

In [13]:
path_to_predict_csv = "gs://thetraining-project-data/transaction_data_test.csv"
df_transaction_data_predict_from_csv = spark \
.read \
.option("inferSchema" , "true") \
.option("header" , "true") \
.csv(path_to_predict_csv)
df_transaction_data_predict_from_csv.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- transactionID: string (nullable = true)



Load the saved model 

In [14]:
loaded_pipeline_model = PipelineModel.load(model_path)

In [15]:
predictions = loaded_pipeline_model.transform(df_transaction_data_predict_from_csv)

In [16]:
predictions.show(5)

+----+-----+-------+-------------+--------------+--------------+--------------+-------+--------------------+----------+---------+--------------------+--------------------+--------------------+----------+
|step| type| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|       transactionID|type_Index| type_OHE|            features|       rawPrediction|         probability|prediction|
+----+-----+-------+-------------+--------------+--------------+--------------+-------+--------------------+----------+---------+--------------------+--------------------+--------------------+----------+
| 310|DEBIT|3334.31|     102439.0|      99104.69|     962290.61|     965624.92|      0|818ca43a-9ac9-484...|       4.0|(4,[],[])|[0.0,0.0,0.0,0.0,...|  [1420940.0,2340.0]|[0.99835591029172...|       0.0|
| 351|DEBIT|5268.56|        468.0|           0.0|    1672373.71|    1677642.27|      0|70948920-5fa6-401...|       4.0|(4,[],[])|(10,[4,5,6,8,9],[...|[3265139.0,408720.0]|[0.8887491327

In [17]:
# Select example rows to display.
predictions.select("prediction", "isFraud").show(5)

+----------+-------+
|prediction|isFraud|
+----------+-------+
|       0.0|      0|
|       0.0|      0|
|       0.0|      0|
|       0.0|      0|
|       0.0|      0|
+----------+-------+
only showing top 5 rows



### Evaluate the model

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

binaryEvaluator = BinaryClassificationEvaluator(labelCol="isFraud")

auc = binaryEvaluator.evaluate(predictions, {binaryEvaluator.metricName: "areaUnderROC"})
print(auc)

0.8908125351606421


In [19]:
tests_np = np.array((predictions.select("isFraud","prediction").collect()))
tests_np

array([[0., 0.],
       [0., 0.],
       [0., 0.],
       ...,
       [0., 0.],
       [0., 0.],
       [0., 0.]])

In [20]:
tests_np = np.array((predictions.select("isFraud","prediction").collect()))

np_acc = accuracy_score(tests_np[:,0], tests_np[:,1])
np_f1 = f1_score(tests_np[:,0], tests_np[:,1])
np_precision = precision_score(tests_np[:,0], tests_np[:,1])
np_recall = recall_score(tests_np[:,0], tests_np[:,1])
np_auc = roc_auc_score(tests_np[:,0], tests_np[:,1])

print("f1:", np_f1)
print("precision:", np_precision)
print("recall:", np_recall)

f1: 0.03849006322828806
precision: 0.0196647614154518
recall: 0.9016489988221437


#### Create confusion matrix

In [21]:
# import package that will generate the confusion matrix scores
from sklearn.metrics import confusion_matrix
# import packages that will help display the scores
import pandas as pd

confusion_matrix_scores = confusion_matrix(tests_np[:,0], 
                                           tests_np[:,1], 
                                           labels=[1, 0])

# display scores as a heatmap
df = pd.DataFrame(confusion_matrix_scores, 
                  columns = ["Predicted Positive(1)", "Predicted Negative(0)"],
                  index = ["Actual Positive(1)", "Actual Negative(0)"])


df.head()

Unnamed: 0,Predicted Positive(1),Predicted Negative(0)
Actual Positive(1),1531,167
Actual Negative(0),76324,1193906


In [22]:
bq_annotated_table_name = 'transaction_data_predictions'
bq_annotated_table_path=  project_id[0] +  '_annotated.' + bq_annotated_table_name
bq_annotated_table_path = bq_annotated_table_path.replace('-', '_')
bq_annotated_table_path

'thetraining_project_annotated.transaction_data_predictions'

#### Persist predictions as an annotated dataset

In [37]:
schema_inline = predictions.schema.simpleString().replace('struct<', '').replace('>', '').replace('int', 'int64').replace('double', 'float64').replace('bigint64', 'int64').replace('vector', 'STRING')

!bq mk --table \
{bq_annotated_table_path} \
{schema_inline}

BigQuery error in mk operation: Table 'thetraining-
project:thetraining_project_annotated.transaction_data_predictions' could not be
created; a table with this name already exists.


In [38]:
schema_inline

'step:int64,type:string,amount:float64,oldbalanceOrg:float64,newbalanceOrig:float64,oldbalanceDest:float64,newbalanceDest:float64,isFraud:int64,transactionID:string,type_Index:float64,type_OHE:STRING,features:STRING,rawPrediction:STRING,probability:STRING,prediction:float64'

In [24]:
predictions.write \
.format("bigquery") \
.option("table", project_id[0]  + ':' + bq_annotated_table_path) \
.option("temporaryGcsBucket", project_id[0]  + '-data') \
.mode('overwrite') \
.save()

In [25]:
annotated_dataset_name =  project_id[0] +  '_annotated'
annotated_dataset_name = annotated_dataset_name.replace('-', '_')
annotated_dataset_name

'thetraining_project_annotated'

**TODO** 
* Add annotated_dataset_name in the FROM clause below

In [26]:
%%bigquery
SELECT * FROM thetraining_project_annotated.INFORMATION_SCHEMA.TABLES;

Unnamed: 0,table_catalog,table_schema,table_name,table_type,is_insertable_into,is_typed,creation_time
0,thetraining-project,thetraining_project_annotated,transaction_data_workflow,BASE TABLE,YES,NO,2021-03-15 11:41:26.369000+00:00
1,thetraining-project,thetraining_project_annotated,transaction_data_predictions,BASE TABLE,YES,NO,2021-03-03 09:56:08.781000+00:00


#### Join buisness data to enrich the dataset

**TODO** 
* Provide the path to the join csv

In [27]:
path_to_join_csv = "gs://thetraining-project-data/transaction_data_join.csv"
df_transaction_data_join_from_csv = spark \
.read \
.option("inferSchema" , "true") \
.option("header" , "true") \
.csv(path_to_join_csv)
df_transaction_data_join_from_csv.printSchema()

root
 |-- nameOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- transactionID: string (nullable = true)



**TODO** (Challenge 2)
* Join the 2 spark dataframes (predictions & df_transaction_data_join_from_csv) on transactionID field 

In [28]:
joined_result = predictions.join(df_transaction_data_join_from_csv, "transactionID")

In [29]:
joined_result.show(5)

+--------------------+----+--------+---------+-------------+--------------+--------------+--------------+-------+----------+-------------+--------------------+--------------------+--------------------+----------+-----------+-----------+
|       transactionID|step|    type|   amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|type_Index|     type_OHE|            features|       rawPrediction|         probability|prediction|   nameOrig|   nameDest|
+--------------------+----+--------+---------+-------------+--------------+--------------+--------------+-------+----------+-------------+--------------------+--------------------+--------------------+----------+-----------+-----------+
|0013918d-0cdc-4e2...| 306|   DEBIT|  1244.34|      30927.0|      29682.66|    3248801.84|    3250046.18|      0|       4.0|    (4,[],[])|[0.0,0.0,0.0,0.0,...|[3265139.0,408720.0]|[0.88874913272392...|       0.0| C831886474|C1847064333|
|00162366-932d-4fc...| 204|CASH_OUT|203860.23|      

In [30]:
joined_result.count()

1271928

#### Persist result as an enriched dataset

In [31]:
bq_enriched_table_name = 'transaction_analysis_enriched'
bq_enriched_table_path = project_id[0] +  '_enriched.' + bq_enriched_table_name
bq_enriched_table_path = bq_enriched_table_path.replace('-', '_')
bq_enriched_table_path = project_id[0] + ':' + bq_enriched_table_path
bq_enriched_table_path

'thetraining-project:thetraining_project_enriched.transaction_analysis_enriched'

In [32]:
schema_inline = joined_result.schema.simpleString().replace('struct<', '').replace('>', '').replace('int', 'int64').replace('bigint64', 'int64').replace('double', 'float64').replace('vector', 'STRING')

!bq mk --table \
{bq_enriched_table_path} \
{schema_inline}

BigQuery error in mk operation: Table 'thetraining-
project:thetraining_project_enriched.transaction_analysis_enriched' could not be
created; a table with this name already exists.


In [33]:
schema_inline

'transactionID:string,step:int64,type:string,amount:float64,oldbalanceOrg:float64,newbalanceOrig:float64,oldbalanceDest:float64,newbalanceDest:float64,isFraud:int64,type_Index:float64,type_OHE:STRING,features:STRING,rawPrediction:STRING,probability:STRING,prediction:float64,nameOrig:string,nameDest:string'

In [34]:
joined_result.write \
.format("bigquery") \
.option("table", bq_enriched_table_path) \
.option("temporaryGcsBucket", project_id[0]  + '-data') \
.mode('overwrite') \
.save()

In [35]:
enriched_dataset_name = project_id[0] +  '_enriched'
enriched_dataset_name = enriched_dataset_name.replace('-', '_')
enriched_dataset_name

'thetraining_project_enriched'

**TODO**
* Provide the enriched_dataset_name in the FROM clause

In [36]:
%%bigquery
SELECT * FROM thetraining_project_enriched.INFORMATION_SCHEMA.TABLES;

Unnamed: 0,table_catalog,table_schema,table_name,table_type,is_insertable_into,is_typed,creation_time
0,thetraining-project,thetraining_project_enriched,transaction_analysis_enriched,BASE TABLE,YES,NO,2021-03-03 12:09:03.854000+00:00


**TODO**
* Query the enriched table

**TODO** (Optional: Challenge 3)
* Improve the ML pipeline
    * Try out different ML models [[doc]](https://spark.apache.org/docs/latest/ml-pipeline.html)
    * Explore hyperparameter tuning 
    * How would you split the data when there is class imbalance? 