## 4. Data Scientist - Create ML models with Spark

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Create a Spark DataFrame from hive table

In [None]:
data = spark.sql("""
SELECT * 
FROM bank_demo_db.bank_marketing
""")

Cache the DataFrame in memory 

In [None]:
data.cache()

DataFrame[Age: int, Job: string, MaritalStatus: string, Education: string, Default: string, Balance: int, Housing: string, Loan: string, Contact: string, Day: int, Month: string, Duration: int, Campaign: int, PDays: int, Previous: int, POutcome: string, Deposit: int]

In [None]:
data.groupBy("Deposit").count().show()

+-------+-----+
|Deposit|count|
+-------+-----+
|      1|39922|
|      2| 5289|
+-------+-----+



### Split training and test data

In [None]:
(train_data, test_data) = data.randomSplit([0.7, 0.3], seed=42)

In [None]:
train_data.groupBy("Deposit").count().show()

+-------+-----+
|Deposit|count|
+-------+-----+
|      1|28004|
|      2| 3709|
+-------+-----+



In [None]:
train_data.count()

31713

In [None]:
test_data.count()

13498

## Create Spark ML Pipeline

Train a RandomForestClassifier model

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.classification import RandomForestClassifier


categorical_cols = [field for (field, data_type) in train_data.dtypes 
                    if ((data_type == "string") & (field != 'Deposit'))]

index_output_cols = [x + "_Index" for x in categorical_cols]

ohe_output_cols = [x + "_OHE" for x in categorical_cols]

categorical_string_indexer = StringIndexer(
    inputCols=categorical_cols,
    outputCols=index_output_cols,
    handleInvalid="skip")

ohe_encoder = OneHotEncoder(
    inputCols=index_output_cols,
    outputCols=ohe_output_cols)

numeric_cols = [field for (field, data_type) in train_data.dtypes 
                if (((data_type == "double") | (data_type == "int") | (data_type == "bigint"))
                  & (field != 'Deposit'))]

assembler_inputs = ohe_output_cols + numeric_cols

vec_assembler = VectorAssembler(
    inputCols=assembler_inputs,
    outputCol="features")

label_string_indexer = StringIndexer(). \
  setInputCol("Deposit"). \
  setOutputCol("label")

# Train a RandomForestClassifier model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

pipeline = Pipeline(stages=[
    categorical_string_indexer,
    ohe_encoder,
    vec_assembler,
    label_string_indexer,
    rf
])

# Train model on training data
pipeline_model = pipeline.fit(train_data)

# Make predictions on test.
predictions = pipeline_model.transform(test_data)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5) 

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(42,[10,12,15,16,...|
|       0.0|  0.0|(42,[10,12,16,18,...|
|       0.0|  1.0|(42,[10,12,16,18,...|
|       0.0|  1.0|(42,[10,12,16,18,...|
|       0.0|  0.0|(42,[10,12,15,16,...|
+----------+-----+--------------------+
only showing top 5 rows



As the dataset is imbalanced a good metric is AUC: Area Under the ROC Curve. [Learn more about AUC here.](https://developers.google.com/machine-learning/crash-course/classification/roc-and-auc#AUC)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

binaryEvaluator = BinaryClassificationEvaluator(labelCol="label")

auc = binaryEvaluator.evaluate(predictions, {binaryEvaluator.metricName: "areaUnderROC"})
print(auc)

0.8867264917867028


In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score

predictions_np = np.array((predictions.select("label","prediction").collect()))

np_acc = accuracy_score(predictions_np[:,0], predictions_np[:,1])
np_f1 = f1_score(predictions_np[:,0], predictions_np[:,1])
np_precision = precision_score(predictions_np[:,0], predictions_np[:,1])
np_recall = recall_score(predictions_np[:,0], predictions_np[:,1])
np_auc = roc_auc_score(predictions_np[:,0], predictions_np[:,1])

print("f1:", np_f1)
print("precision:", np_precision)
print("recall:", np_recall)

f1: 0.08268424206111442
precision: 0.7752808988764045
recall: 0.043670886075949364


In [None]:
# import package that will generate the confusion matrix scores
from sklearn.metrics import confusion_matrix
# import packages that will help display the scores
import pandas as pd

confusion_matrix_scores = confusion_matrix(predictions_np[:,0], 
                                           predictions_np[:,1], 
                                           labels=[1, 0])

# display scores as a heatmap
df = pd.DataFrame(confusion_matrix_scores, 
                  columns = ["Predicted True", "Predicted Not True"],
                  index = ["Actually True", "Actually Not True"])


df.head()

Unnamed: 0,Predicted True,Predicted Not True
Actually True,69,1511
Actually Not True,20,11898


## Improve model using XGBoost

Train model using XGBoost

In [None]:
# spark.stop()

In [None]:
from pyspark.sql import SparkSession

warehouse_location = 'gs://dataproc-datalake-demo/hive-warehouse'
service_endpoint = 'thrift://hive-cluster-m.us-central1-f:9083'

spark = SparkSession.builder \
  .appName('Hive and XGBoost - GPU') \
  .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.1') \
  .config("spark.rapids.memory.gpu.pooling.enabled", "false") \
  .config("spark.executor.instances", "4") \
  .config("spark.executor.cores", "2") \
  .config("spark.task.cpus", "2") \
  .config("spark.task.resource.gpu.amount", "1") \
  .config("hive.metastore.uris", service_endpoint)  \
  .config("spark.sql.warehouse.dir", warehouse_location) \
  .enableHiveSupport() \
  .getOrCreate()

In [None]:
spark.conf.get("spark.app.id")

'application_1599581036896_0008'

In [None]:
data = spark.sql("""
SELECT * 
FROM bank_demo_db.bank_marketing
""")

(train_data, test_data) = data.randomSplit([0.7, 0.3], seed=42)

train_data.cache()
train_data.show(3)

test_data.cache()
test_data.show(3)

+---+-------+-------------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+-------+
|Age|    Job|MaritalStatus|Education|Default|Balance|Housing|Loan| Contact|Day|Month|Duration|Campaign|PDays|Previous|POutcome|Deposit|
+---+-------+-------------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+-------+
| 18|student|       single|  primary|     no|    608|     no|  no|cellular| 12|  aug|     267|       1|   -1|       0| unknown|      2|
| 18|student|       single|  primary|     no|    608|     no|  no|cellular| 13|  nov|     210|       1|   93|       1| success|      2|
| 18|student|       single|secondary|     no|      5|     no|  no|cellular| 24|  aug|     143|       2|   -1|       0| unknown|      1|
+---+-------+-------------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+-------+
only showing top 3 rows

+---+-------+----------

## Create ML Pipeline with XGBoost model

In [None]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from ml.dmlc.xgboost4j.scala.spark import XGBoostClassificationModel, XGBoostClassifier

categorical_cols = [field for (field, data_type) in train_data.dtypes 
                    if ((data_type == "string") & (field != 'Deposit'))]

string_index_output_cols = [x + "_Index" for x in categorical_cols]

categorical_string_indexer = StringIndexer(
    inputCols=categorical_cols,
    outputCols=string_index_output_cols,
    handleInvalid="skip")

numeric_cols = [field for (field, data_type) in train_data.dtypes 
                if (((data_type == "double") | (data_type == "int") | (data_type == "bigint"))
                  & (field != 'Deposit'))]

features = string_index_output_cols + numeric_cols

label_string_indexer = StringIndexer(). \
  setInputCol("Deposit"). \
  setOutputCol("label")

params = { 
    'treeMethod': 'gpu_hist',
    'maxDepth': 10, 
    'maxLeaves': 256,
    'growPolicy': 'depthwise',
    'objective': 'binary:logistic',
    'numRound': 100,
    'numWorkers': 2
}

# For GPU you must use .setFeaturesCols(features) and pass in the list of columns that are the features
xgbc = XGBoostClassifier(**params).setLabelCol("label").setFeaturesCols(features)

# For CPU training you must use .setFeaturesCol('features') which 
# expects the features to be vectorised into one column first
# xgbc = XGBoostClassifier(**params).setLabelCol('label').setFeaturesCol('features')

pipeline = Pipeline(stages=[
    categorical_string_indexer,
    label_string_indexer,
    xgbc
])

In [None]:
%%time
# Train model on training data
pipeline_model = pipeline.fit(train_data)

CPU times: user 74.4 ms, sys: 18.3 ms, total: 92.7 ms
Wall time: 20.1 s


In [None]:
# Make predictions on test

predictions = pipeline_model.transform(test_data)
predictions.select("prediction", "label").show(5)

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  0.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       0.0|  0.0|
+----------+-----+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

binaryEvaluator = BinaryClassificationEvaluator(labelCol="label")

auc = binaryEvaluator.evaluate(predictions, {binaryEvaluator.metricName: "areaUnderROC"})
print(auc)

0.9261211368401443


### View model stats using Numpy and Scikit-learn

PySpark cannot be used to calculate the precision, recall, and f1_score for binary classification evaluation and therefore sklearn.metrics is used

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score

predictions_np = np.array((predictions.select("label","prediction").collect()))

np_acc = accuracy_score(predictions_np[:,0], predictions_np[:,1])
np_f1 = f1_score(predictions_np[:,0], predictions_np[:,1])
np_precision = precision_score(predictions_np[:,0], predictions_np[:,1])
np_recall = recall_score(predictions_np[:,0], predictions_np[:,1])
np_auc = roc_auc_score(predictions_np[:,0], predictions_np[:,1])

print("f1:", np_f1)
print("precision:", np_precision)
print("recall:", np_recall)

f1: 0.5221674876847291
precision: 0.5879556259904913
recall: 0.46962025316455697


In [None]:
# import package that will generate the confusion matrix scores
from sklearn.metrics import confusion_matrix
# import packages that will help display the scores
import pandas as pd

confusion_matrix_scores = confusion_matrix(predictions_np[:,0], 
                                           predictions_np[:,1], 
                                           labels=[1, 0])

# display scores as a heatmap
df = pd.DataFrame(confusion_matrix_scores, 
                  columns = ["Predicted True", "Predicted Not True"],
                  index = ["Actually True", "Actually Not True"])


df.head()

Unnamed: 0,Predicted True,Predicted Not True
Actually True,742,838
Actually Not True,520,11398


### Save model_pipeline

In [None]:
from pyspark.ml import Pipeline, PipelineModel

model_path = 'gs://dataproc-datalake-examples/xgboost/pipeline_model/bank-marketing'

pipeline_model.write().overwrite().save(model_path)

In [None]:
loaded_pipeline_model = PipelineModel.load(model_path)

In [None]:
# Make predictions using loaded model

predictions = loaded_pipeline_model.transform(test_data)

predictions.show(5)

+---+-------+---+--------+--------+-----+--------+-------+-----------+---------+-------------------+--------------+-------------+-------------+---------------+----------+-------------+-----+--------------------+--------------------+----------+
|Age|Balance|Day|Duration|Campaign|PDays|Previous|Deposit|Month_Index|Job_Index|MaritalStatus_Index|POutcome_Index|Housing_Index|Contact_Index|Education_Index|Loan_Index|Default_Index|label|       rawPrediction|         probability|prediction|
+---+-------+---+--------+--------+-----+--------+-------+-----------+---------+-------------------+--------------+-------------+-------------+---------------+----------+-------------+-----+--------------------+--------------------+----------+
| 18|   1944| 10|     122|       3|   -1|       0|      1|        2.0|     10.0|                1.0|           0.0|          1.0|          2.0|            2.0|       0.0|          0.0|  0.0|[6.82134294509887...|[0.99891093163751...|       0.0|
| 18|     35| 21|     10

### Save prediction results to a new table

In [None]:
predictions.write.mode('overwrite').format("parquet").saveAsTable("bank_demo_db.bank_marketing_predictions")

In [None]:
spark.sql("SHOW TABLES in bank_demo_db").show(10, False)

+------------+--------------------------+-----------+
|database    |tableName                 |isTemporary|
+------------+--------------------------+-----------+
|bank_demo_db|bank_marketing            |false      |
|bank_demo_db|bank_marketing_predictions|false      |
+------------+--------------------------+-----------+

