In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [3]:
spark = SparkSession.builder.appName("Bank").getOrCreate()

In [4]:
df = spark.read.csv('bank.csv', header=True, inferSchema=True)

In [13]:
df.cache()

DataFrame[age: int, job: string, marital: string, education: string, default: string, balance: int, housing: string, loan: string, contact: string, day: int, month: string, duration: int, campaign: int, pdays: int, previous: int, poutcome: string, deposit: string]

In [14]:
df.is_cached

True

In [7]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [12]:
df.count()

11162

In [11]:
len(df.columns)

17

In [16]:
catCols = ['job', 'marital', 'education', 'default','housing', 'loan', 'contact', 'poutcome']
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in catCols
]
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="rawFeatures")

numericCols = ['age', 'balance', 'duration',  'campaign', 'pdays', 'previous']

pipeline = Pipeline(stages=indexers + encoders+ [assembler])
model=pipeline.fit(df)
transformed = model.transform(df)
transformed.show(5)

+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+-------------------+-----------------------+-------------------------+-----------------------+-----------------------+--------------------+-----------------------+------------------------+--------------------+
|age|       job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_indexed|marital_indexed|education_indexed|default_indexed|housing_indexed|loan_indexed|contact_indexed|poutcome_indexed|job_indexed_encoded|marital_indexed_encoded|education_indexed_encoded|default_indexed_encoded|housing_indexed_encoded|loan_indexed_encoded|contact_indexed_encoded|poutcome_indexed_encoded|         rawFeatures|
+---+----------+-------+---------+-------+-------+-------+----+-------

In [17]:
(trainingData, testData) = transformed.randomSplit([0.7, 0.3],seed = 11)

In [18]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="loan_indexed", featuresCol="rawFeatures")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "loan_indexed", "rawFeatures").show()
evaluator = MulticlassClassificationEvaluator(labelCol="loan_indexed", predictionCol="prediction", metricName="accuracy")

+----------+------------+--------------------+
|prediction|loan_indexed|         rawFeatures|
+----------+------------+--------------------+
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       1.0|         1.0|(32,[3,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,17,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       1.0|         1.0|(32,[0,13,16,19,2...|
|       0.0|         0.0|(32,[7,13,17,19,2...|
|       0.0| 

In [20]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

Accuracy of LogisticRegression is = 1
Test Error of LogisticRegression = 0 
