In [None]:
!pip install -U pyspark

In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import desc
from pyspark.sql.functions import *
from pyspark.sql.functions import max as sparkMax
from IPython.display import display
import pyspark.sql.functions as F

#importing the required libraries
import pandas as pd
import numpy as np
import seaborn as sns

%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt

from scipy.sparse import csr_matrix

import warnings; warnings.filterwarnings(action='ignore')

In [None]:
SparkSession \
  .builder \
  .master("local[*]")\
  .appName("Pyspark") \
  .config("spark.memory.fraction", 0.8) \
  .config("spark.executor.memory", "16g") \
  .config("spark.driver.memory", "16g")\
  .config("spark.sql.shuffle.partitions" , "800") \
  .config("spark.memory.offHeap.enabled",'true')\
  .config("spark.memory.offHeap.size","16g")\
  .getOrCreate()

In [None]:
#Creating a Spark session
mySpark = SparkSession.builder.getOrCreate()
spark = SparkSession(mySpark)

In [None]:
#loading the dataset
data = spark.read.csv("marketing/data/bank_full_raw.csv", sep = ';', inferSchema="true", header="true")

In [None]:
#Display of column name and data type
data.printSchema()

# 1. Exploratory Data Analysis

In [None]:
data.count()

In [None]:
data.show(10)

In [None]:
#Removal of rows & columns with NaN/Null values and duplicates
data.dropna()
data.dropDuplicates()
data.count()

In [None]:
#check the null column count using sql
from pyspark.sql.functions import col,isnan, when, count
data.select([count(when(isnan(a) | col(a).isNull(), a)).alias(a) for a in data.columns]).show()

In [None]:
df = data
df = df.replace('yes','1')
df = df.replace('no','0')
df = df.withColumn("default",df.default.cast('integer'))
df = df.withColumn("loan",df.loan.cast('integer'))
df = df.withColumn("housing",df.housing.cast('integer'))
df = df.withColumn("y",df.y.cast('integer'))

In [None]:
#Display of column name and data type
df.printSchema()

In [None]:
data.groupBy("month").count().show() 

In [None]:
data.groupBy("job").count().show() 

In [None]:
data.groupBy("marital").count().show()

In [None]:
data.groupBy("education").count().show() 

In [None]:
data.groupBy("housing").count().show()  

In [None]:
data.groupBy("loan").count().show()

In [None]:
data.groupBy("contact").count().show()

In [None]:
data.groupBy("campaign").count().show()

In [None]:
data.groupBy("poutcome").count().show()  

In [None]:
data.groupBy("y").count().show() 

In [None]:
data.filter((F.col('poutcome')=='unknown'))\
    .filter(
        (F.col('y') == 'yes')
    )\
    .show()

In [None]:
data.filter((F.col('poutcome')=='success'))\
    .filter(
        (F.col('y') == 'yes')
    )\
    .show()

In [None]:
data.filter((F.col('poutcome')=='failure'))\
    .filter(
        (F.col('y') == 'yes')
    )\
    .show()

In [None]:
sns.set(style="ticks")

sns.pairplot(data.toPandas())
plt.show()

In [None]:
df.show()

In [None]:
f, ax = plt.subplots(figsize = (15,8)) 
sns.barplot(x="job", y = "y", data = df.toPandas())
sns.despine(left = True, bottom = True)
ax.set(xlabel='job', ylabel='y')

In [None]:
f, ax = plt.subplots(figsize = (15,8)) 
sns.barplot(x="marital", y = "y", data = df.toPandas())
sns.despine(left = True, bottom = True)
ax.set(xlabel='marital', ylabel='y')

In [None]:
f, ax = plt.subplots(figsize = (15,8)) 
sns.barplot(x="education", y = "y", data = df.toPandas())
sns.despine(left = True, bottom = True)
ax.set(xlabel='education', ylabel='y')

In [None]:
f, ax = plt.subplots(figsize = (15,8)) 
sns.barplot(x="housing", y = "y", data = df.toPandas())
sns.despine(left = True, bottom = True)
ax.set(xlabel='housing', ylabel='y')

# 2. Data Preperation

In [None]:
cols = df.columns

#### Encode catagorical features and transform/merge multiple columns into a vector column

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'contact', 'month', 'poutcome']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'y', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'default', 'balance', 'housing', 'loan', 'day', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="Subscribed")
stages += [assembler]

#### Notice that the dataframe now has 2 new columns in the beginning - "label" and "Subscribed"

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'Subscribed'] + cols
df = df.select(selectedCols)
df.printSchema()

In [None]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

#### 80/20 train/test data split 

In [None]:
train, test = df.randomSplit([0.8, 0.2], seed = 2022)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'Subscribed', labelCol = 'label', maxIter=15)
lrModel = lr.fit(train)

In [None]:
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta-Coefficients')
plt.show()

In [None]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
predictions = lrModel.transform(test)
predictions.filter((F.col('prediction') == 1.0))\
    .show()

In [None]:
predictions.filter((F.col('prediction') == 1.0))\
    .count()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

## Decision Tree Classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'Subscribed', labelCol = 'label', maxDepth= 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.filter((F.col('prediction') == 1.0))\
    .show()

In [None]:
predictions.filter((F.col('prediction') == 1.0))\
    .count()

In [None]:
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

## Random Forest Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'Subscribed', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.filter((F.col('prediction') == 1.0))\
    .show()

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions)))

## Gradient-Boosted Tree Classifier

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(featuresCol = 'Subscribed', labelCol = 'label', maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
predictions.filter((F.col('prediction') == 1.0))\
    .show()

In [None]:
predictions.filter((F.col('prediction') == 1.0))\
    .count()

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions)))

## Modifying this model with the ParamGridBuilder as well as the CrossValidator because the Logistic Regression produced the best results

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder().addGrid(lr.maxIter, [500]) \
                                .addGrid(lr.regParam, [0]) \
                                .addGrid(lr.elasticNetParam, [1]) \
                                .build())
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

In [None]:
predictions.show()

In [None]:
predictions.count()