In [1]:
#import all PySpark related libraries

from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator


import pandas as pd
from pandas.tools.plotting import scatter_matrix

In [3]:
#create an instance of a Spark session
#add an app_name

spark = SparkSession.builder.appName('bin-class-spark').getOrCreate()

In [4]:
#creating dataframe from CSV file
#Add the csv file to the DBFS and then you cna refer it from there.

df = spark.read.csv('/FileStore/tables/bank.csv', header=True, inferSchema=True)   

In [5]:
#printing the schema of created dataframe 
df.printSchema()  

In [6]:
#extracting numeric features from dataframe, where i is used to iterate 'int' type members of dataframe 
numeric_features = [i[0] for i in df.dtypes if i[1] == 'int']      

In [7]:
#print numeric_features 
numeric_features

In [8]:
#convert numeric features of database into a Pandas dataframe, which is a very memory intensive process as it is run on only one executer  
numeric_data = df.select(numeric_features).toPandas()

#create a visual of a scatter matrix from the numeric_data 
axs = pd.scatter_matrix(numeric_data, figsize=(8,8))

#count number of coloumns of the numeric_data dataframe  
num_cols = len(numeric_data.columns)

#iterating through all columns in axs & plotting it
for i in range(num_cols):
  v = axs[i,0]
  v.yaxis.label.set_rotation(0)
  v.yaxis.label.set_ha('right')
  v.set_yticks(())
  h=axs[num_cols-1, i]
  h.xaxis.label.set_rotation(90)
  h.set_xticks(())

In [9]:
#array of features 
features = ['age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit']

#create a dataframe of columns 
cols = df.columns

#create a dataframe using the features array
df = df[features]

In [10]:
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']

stages = []

#iterating through categorical columns to modify the dataframe 
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [11]:
#creating a pipeline of all stages & fitting it on the dataframe 
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)

#printing relevant schema of the transformed dataframe 
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

In [12]:
#display top five rows of dataframe 
df.show(5)

In [13]:
#split the dataset into train and test data (70:30) with seed for stability 
train, test = df.randomSplit([0.7, 0.3], seed = 2019)

#print number of traning and testing records 
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [14]:
#import GBTClassifier model 
from pyspark.ml.classification import GBTClassifier

#create instance of GBTClassifier 
gbt = GBTClassifier(maxIter=10)

#train the GBTClassifier model on the train data 
gbtModel = gbt.fit(train)

#test the trained model on test data 
predictions = gbtModel.transform(test)

#show projected predicitions made by trained model 
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

In [15]:
#testing accuracy of trained model using BinaryClassifierEvaluator 
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [16]:
#cross validate the model by using parameter grid builder 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

#evaluating our model. 
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)