#Bank direct marketing prediction using Logistic Regression

The data is related with direct marketing campaigns of a Portuguese banking institution.The marketing campaigns were based on phone calls. Often, more than one contact to the same client was required, in order to access if the product (bank term deposit) would be (or not) subscribed.

## Objective

The classification goal is to predict if the client will subscribe a term deposit (variable result).


Input data variables are as follows:
## Bank client data:

1 - age (numeric) 

2 - job : type of job 

3 - marital : marital status 

5 - default: has credit in default? 

6 - housing: has housing loan? 

7 - loan: has personal loan? 

## Related with the last contact of the current campaign:

8 - contact: contact communication type 

9 - month: last contact month of year

10 - day_of_week: last contact day of the week

11 - duration: last contact duration, in seconds (numeric). Important note: this attribute highly affects the output target (e.g., if duration=0 then y='no'). Yet, the duration is not known before a call is performed. Also, after the end of the call y is obviously known. Thus, this input should only be included for benchmark purposes and should be discarded if the intention is to have a realistic predictive model.

## other attributes:

12 - campaign: number of contacts performed during this campaign and for this client

13 - pdays: number of days that passed by after the client was last contacted from a previous campaign (numeric; 999 means client was not previously contacted)

14 - previous: number of contacts performed before this campaign and for this client (numeric)

15 - poutcome: outcome of the previous marketing campaign (categorical: 'failure','nonexistent','success')

## social and economic context attributes
16 - emp_var_rate: employment variation rate - quarterly indicator (numeric)

17 - cons_price_idx: consumer price index - monthly indicator (numeric) 

18 - cons_conf_idx: consumer confidence index - monthly indicator (numeric) 

19 - euribor3m: euribor 3 month rate - daily indicator (numeric)

20 - no_employed: number of employees - quarterly indicator (numeric)

# Data download

In [4]:
%sh

wget --no-check-certificate 'https://onedrive.live.com/download?cid=EA5ED07A88AEEA57&resid=EA5ED07A88AEEA57%2115521&authkey=ALUhyTQc2xMJwas' -O bank.csv 


In [5]:
from pyspark.sql.functions import lit
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

from pyspark.sql.functions import *  
import numpy as np # library for working with Arrays
import pandas as pd # makes working with data tables easier
import matplotlib.pyplot as plt # module for plotting
import seaborn as sns #

In [6]:
bankDF = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load("file:/databricks/driver/bank.csv")


## Data explored and explained

In [8]:
display(bankDF)

## Data cleaning

In [10]:
 from pyspark.ml import Pipeline
 from pyspark.ml.classification import LogisticRegression
 from pyspark.ml.feature import HashingTF, Tokenizer
 from pyspark.sql import Row
 from pyspark.sql.functions import UserDefinedFunction
 from pyspark.sql.types import *

In [11]:
bankDF.printSchema()

In [12]:
# Remove columns with bad (null) ID or bad result column
bank_cleaned = bankDF.filter(bankDF.age.isNotNull() & bankDF.duration.isNotNull() & bankDF.campaign.isNotNull() & bankDF.pdays.isNotNull() & bankDF.previous.isNotNull() & bankDF.duration.isNotNull() & bankDF.emp_var_rate.isNotNull() & bankDF.cons_price_idx.isNotNull() & bankDF.cons_conf_idx.isNotNull() & bankDF.euribor3m.isNotNull() & bankDF.no_employed.isNotNull())
display(bank_cleaned)

By exploring the data, here we see that people who are more than 21yrs have subscribed to the product.

In [14]:
bank_cleaned.select('age','result').distinct().show()

In [15]:
#bank_cleaned.write.saveAsTable('bank') 

In [16]:
#Drop the table

#%sql
#drop table bank

In [17]:
%sql
SELECT duration,poutcome,result FROM bank GROUP BY duration,poutcome,result ORDER BY duration desc

In [18]:
%sql
SELECT campaign, COUNT(campaign) AS cnt FROM bank GROUP BY campaign

## Data transformation

In [20]:
# Convert results for to MLlib input, which requires labels as a float
def labelForResults(s):
     if s == 'yes':
         return 1.0
     else:
         return 0.0

label = UserDefinedFunction(labelForResults, DoubleType())  
labeledData = bank_cleaned.select(label(bank_cleaned.result).alias('label'),bank_cleaned.duration).where('label >= 0')


In [21]:
display(labeledData)

In [22]:
reduced_numeric_cols = ['age',
                          'duration',
                          'campaign',
                          'pdays',
                          'previous',
                          'emp_var_rate',
                          'cons_price_idx',
                          'cons_conf_idx'
                         ]

In [23]:
#reduced_numeric_cols.show()

In [24]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

label_indexer = StringIndexer(inputCol = 'result', outputCol = 'label')
assembler = VectorAssembler(
    inputCols = reduced_numeric_cols,
    outputCol = 'features')

## Data modeling

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

classifier = LogisticRegression(regParam=0.01, labelCol = 'label', featuresCol = 'features')

pipeline = Pipeline(stages=[label_indexer, assembler, classifier])

(train, test) = bank_cleaned.randomSplit([0.7, 0.3])
model = pipeline.fit(train)

In [27]:
display(train)

In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
raw = evaluator.evaluate(predictions)
"The AUROC is %s and the AUPR is %s and raw is %s." % (auroc, aupr, raw)

## Prediction

## Model evaluation

In [31]:
Successes = predictions.where("(label = 0 AND prediction = 0) OR  (label = 1 AND prediction = 1)").count()
Result = predictions.count()

print "There were", Result, "outputs and there were", Successes, "successful predictions"
print "This is a", str((float(Successes) / float(Result)) * 100) + "%", "success rate"

## Visualization

In [33]:
truePositive = int(predictions.where("(label = 1 AND prediction = 1)").count())
trueNegative = int(predictions.where("(label = 0 AND prediction = 0)").count())
falsePositive = int(predictions.where("(label = 0 AND prediction = 1)").count())
falseNegative = int(predictions.where("(label = 1 AND prediction = 0)").count())

print [['TP', truePositive], ['TN', trueNegative], ['FP', falsePositive], ['FN', falseNegative]]
resultDF = sqlContext.createDataFrame([['TP', truePositive], ['TN', trueNegative], ['FP', falsePositive], ['FN', falseNegative]], ['metric', 'value'])
display(resultDF)