# Model Development

### Objective: create a Classifier to predict Marketing Interaction Outcomes

- Train and validate initial model
- Create SparkML Pipeline and save it to Object Store

In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import os
import sys
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("ModelDevelopment")\
    .config("spark.authenticate", "true")\
    .config("spark.yarn.access.hadoopFileSystems", os.environ["STORAGE"])\
    .config("spark.hadoop.yarn.resourcemanager.principal",os.environ["HADOOP_USER_NAME"])\
    .config("spark.executor.memory","6g")\
    .config("spark.executor.cores","5")\
    .getOrCreate()

#.master("local[*]")\
    
# **Note:** 
# Our file isn't big, so running it in Spark local mode is fine but you can add the following config 
# if you want to run Spark on the kubernetes cluster 
# 
# > .config("spark.yarn.access.hadoopFileSystems",os.getenv['STORAGE'])\

#.config("spark.authenticate", "true") \
#    .config("spark.yarn.access.hadoopFileSystems", os.environ['STORAGE'])

print("Spark Version: {}".format(spark.version))

Spark Version: 2.4.0.7.1.2.0-96


In [4]:
spark.sql("SELECT * FROM DEFAULT.CUSTOMER_INTERACTIONS_CICD LIMIT 10").show()

+---------------+--------------------+----------------+--------+------------+--------------------+-------+-------+-------------+---------+---------+-----------+-------+---------------+----------+---------+--------------------+--------------------+
|           NAME|      STREET_ADDRESS|            CITY|POSTCODE|PHONE_NUMBER|                 JOB|RECENCY|HISTORY|USED_DISCOUNT|USED_BOGO| ZIP_CODE|IS_REFERRAL|CHANNEL|          OFFER|CONVERSION|    SCORE|            BATCH_ID|           BATCH_TMS|
+---------------+--------------------+----------------+--------+------------+--------------------+-------+-------+-------------+---------+---------+-----------+-------+---------------+----------+---------+--------------------+--------------------+
|   George Hicks|80552 Washington ...|     North David|   15923|        null|Radiographer, the...|     10|    142|            1|        0|Surburban|          0|  Phone|Buy One Get One|         0|1.0767447|79a030e2-17e2-11e...|2020-10-26 23:25:...|
|   Mega

In [5]:
hist_DF = spark.sql("SELECT * FROM DEFAULT.CUSTOMER_INTERACTIONS_CICD")

In [6]:
hist_DF.dtypes

[('NAME', 'string'),
 ('STREET_ADDRESS', 'string'),
 ('CITY', 'string'),
 ('POSTCODE', 'int'),
 ('PHONE_NUMBER', 'int'),
 ('JOB', 'string'),
 ('RECENCY', 'int'),
 ('HISTORY', 'int'),
 ('USED_DISCOUNT', 'int'),
 ('USED_BOGO', 'int'),
 ('ZIP_CODE', 'string'),
 ('IS_REFERRAL', 'int'),
 ('CHANNEL', 'string'),
 ('OFFER', 'string'),
 ('CONVERSION', 'int'),
 ('SCORE', 'float'),
 ('BATCH_ID', 'string'),
 ('BATCH_TMS', 'timestamp')]

In [7]:
df = hist_DF.select("RECENCY", "HISTORY", "USED_DISCOUNT", "USED_BOGO", "ZIP_CODE", "IS_REFERRAL", "CHANNEL", "OFFER", "SCORE", "CONVERSION")

In [8]:
#Renaming target feature as "LABEL":
df = df.withColumnRenamed("CONVERSION","label")

In [9]:
cat_cols = [item[0] for item in df.dtypes if item[1].startswith('string')]
num_cols = [item[0] for item in df.dtypes if item[1].startswith('in')]

In [11]:
num_cols.remove('label')

In [12]:
#df.groupby("label").count().show()

In [13]:
train, test = df.randomSplit([0.8, 0.2], seed=1)

#### Creating Pipeline
##### Notice the pipeline does not include the classifier. This is done on purpose so we can split it into two jobs.

In [14]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

In [20]:
def make_pipeline():        
    
    stages= []

    for col in cat_cols:

        stringIndexer = StringIndexer(inputCol = col , outputCol = col + '_StringIndex')
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[col + '_ClassVect'])
        stages += [stringIndexer, encoder]

    #Assembling mixed data type transformations:
    assemblerInputs = [c + "_ClassVect" for c in cat_cols] + num_cols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

    stages += [assembler]

    rf = RandomForestClassifier(labelCol = "label")
    
    stages += [rf]
    
    #Creating and running the pipeline:
    pipeline = Pipeline(stages=stages)
    
    return pipeline

In [21]:
pipeline = make_pipeline()

In [22]:
paramGrid = ParamGridBuilder() \
    .addGrid(RandomForestClassifier.numTrees, [10, 20, 30]) \
    .addGrid(RandomForestClassifier.maxDepth, [5, 10]) \
    .build()

In [27]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                          numFolds=5)

In [26]:
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

IllegalArgumentException: 'Field "label" does not exist.\nAvailable fields: RECENCY, HISTORY, USED_DISCOUNT, USED_BOGO, ZIP_CODE, IS_REFERRAL, CHANNEL, OFFER, SCORE, LABEL, CrossValidator_97568ecf7a54_rand, ZIP_CODE_StringIndex, ZIP_CODE_ClassVect, CHANNEL_StringIndex, CHANNEL_ClassVect, OFFER_StringIndex, OFFER_ClassVect, features, rawPrediction, probability, prediction'

In [None]:
print(cvModel.avgMetrics)

In [None]:
bestModel = cvModel.bestModel
print(bestModel)

#### Test Set Evaluation

In [None]:
#Evaluating model with the held out test set:
prediction = cvModel.transform(test)

In [None]:
predictionAndTarget = prediction.select("label", "prediction")

In [None]:
# Create both evaluators
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC')

In [None]:
# Get metrics
acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
auc = evaluator.evaluate(predictionAndTarget)

In [None]:
y_true = predictionAndTarget.select(['label']).collect()
y_pred = predictionAndTarget.select(['prediction']).collect()

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, plot_confusion_matrix, roc_curve
cm = confusion_matrix(y_true, y_pred)
print(classification_report(y_true, y_pred))

In [None]:
from utils.cm import plot_conf_matrix

In [None]:
plot_conf_matrix(pd.DataFrame(cm.T, columns=['neg', 'pos'], 
                       index=['pred_neg', 'pred_pos']))

In [None]:
from plot_metric.functions import BinaryClassification
# Visualisation with plot_metric
bc = BinaryClassification(y_true, y_pred, labels=["Class 1", "Class 2"])

# Figures
plt.figure(figsize=(5,5))
bc.plot_roc_curve()
plt.show()

In [None]:
#Printing metrics 
#print(acc)
#print(f1)
#print(weightedPrecision)
#print(weightedRecall)
#print(auc)

#### Saving Pipeline and Logistic Regression Model to Object Store

In [None]:
import datetime
import os, time

In [None]:
run_time_suffix = datetime.datetime.now()
run_time_suffix_string = run_time_suffix.strftime("%d%m%Y%H%M%S")

In [None]:
bestModel.write().overwrite().save(os.environ["STORAGE"]+"/testpysparkmodels/"+"{}".format(run_time_suffix_string))
pipeline.write().overwrite().save(os.environ["STORAGE"]+"/testpysparkpipelines/"+"{}".format(run_time_suffix_string))

In [None]:
#print("s3a://demo-aws-1/datalake/pdefusco/bestLR_{}".format(run_time_suffix))

In [None]:
spark.stop() 

#### Loading Model and Pipeline Metadata into Sqlite3 table

In [None]:
!pwd

In [None]:
import sqlite3
conn = sqlite3.connect('models.db')
c = conn.cursor()

In [None]:
type(bestModel)

In [None]:
import re
models_insert = [(str(bestModel).split("_")[0], 
                 str(bestModel).split("_")[1], 
                 str(run_time_suffix), 
                 os.environ["STORAGE"]+"/testpysparkmodels/"+"{}".format(run_time_suffix))
                ]

In [None]:
pipelines_insert = [(str(pipeline).split("_")[0], 
                 str(pipeline).split("_")[1], 
                 str(run_time_suffix), 
                 os.environ["STORAGE"]+"/testpysparkpipelines/"+"{}".format(run_time_suffix))
                ]

In [None]:
c.executemany('INSERT INTO pipelines VALUES (?,?,?,?)', pipelines_insert)
c.executemany('INSERT INTO models VALUES (?,?,?,?)', models_insert)

In [None]:
c.executemany('INSERT INTO models VALUES (?,?,?,?)', models_insert)

In [None]:
conn.commit()
conn.close()

In [None]:
#Check pipeline was written to sqlite3 successfully
conn = sqlite3.connect('models.db')
c = conn.cursor()
for i in c.execute("select * from pipelines"): print(i)
for i in c.execute("select * from models"): print(i)

In [None]:
for i in c.execute("select * from models"): print(i)

In [None]:
conn.commit()
conn.close()