## libraries

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

### Creating the Spark Session

In [49]:
spark=SparkSession.builder.master("local[2]").appName("Machine Learning with Bank Data Using Pyspark").getOrCreate()

In [4]:
spark

In [5]:
sc = spark.sparkContext
sc

In [6]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext

<pyspark.sql.context.SQLContext at 0x24fb85f0288>

### Load The Data From a File Into a Dataframe

In [7]:
df = spark.read.csv('C:\\Users\\Majid\\Desktop\\Jupeyetr\\pyspark\\bank.csv',header = 'True',inferSchema='True')

In [8]:
type(df)

pyspark.sql.dataframe.DataFrame

In [9]:
# We also cache the data so that we only read it from disk once.
df.cache()

df.is_cached            # Checks if df is cached

True

In [10]:
df.dtypes
#column names and their data types

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('deposit', 'string')]

In [11]:
df.printSchema()
#Print out the schema 

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [12]:
df.show(5, truncate=False, vertical=True)
## Print the first 5 rows 

-RECORD 0---------------
 age       | 59         
 job       | admin.     
 marital   | married    
 education | secondary  
 default   | no         
 balance   | 2343       
 housing   | yes        
 loan      | no         
 contact   | unknown    
 day       | 5          
 month     | may        
 duration  | 1042       
 campaign  | 1          
 pdays     | -1         
 previous  | 0          
 poutcome  | unknown    
 deposit   | yes        
-RECORD 1---------------
 age       | 56         
 job       | admin.     
 marital   | married    
 education | secondary  
 default   | no         
 balance   | 45         
 housing   | no         
 loan      | no         
 contact   | unknown    
 day       | 5          
 month     | may        
 duration  | 1467       
 campaign  | 1          
 pdays     | -1         
 previous  | 0          
 poutcome  | unknown    
 deposit   | yes        
-RECORD 2---------------
 age       | 41         
 job       | technician 
 marital   | married    


In [14]:
df.describe().show(5)
# Compute basic statistics for numeric and string columns.This include count, mean, stddev, min, and max. If no columns are
# given, this function computes statistics for all numerical or string columns.

+-------+------------------+-------+--------+---------+-------+------------------+-------+-----+--------+------------------+-----+------------------+------------------+------------------+------------------+--------+-------+
|summary|               age|    job| marital|education|default|           balance|housing| loan| contact|               day|month|          duration|          campaign|             pdays|          previous|poutcome|deposit|
+-------+------------------+-------+--------+---------+-------+------------------+-------+-----+--------+------------------+-----+------------------+------------------+------------------+------------------+--------+-------+
|  count|             11162|  11162|   11162|    11162|  11162|             11162|  11162|11162|   11162|             11162|11162|             11162|             11162|             11162|             11162|   11162|  11162|
|   mean|41.231947679627304|   null|    null|     null|   null|1528.5385235620856|   null| null|    null

In [15]:
df.describe(["age", "balance", "education", "loan"]).show(5)

+-------+------------------+------------------+---------+-----+
|summary|               age|           balance|education| loan|
+-------+------------------+------------------+---------+-----+
|  count|             11162|             11162|    11162|11162|
|   mean|41.231947679627304|1528.5385235620856|     null| null|
| stddev|11.913369192215518| 3225.413325946149|     null| null|
|    min|                18|             -6847|  primary|   no|
|    max|                95|             81204|  unknown|  yes|
+-------+------------------+------------------+---------+-----+



In [18]:
df.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'balance',
 'housing',
 'loan',
 'contact',
 'day',
 'month',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'deposit']

In [19]:
df = df.select('age', 'job', 'marital', 'education', 'default',  'balance', 
'housing', 'loan', 'contact', 'duration',  'campaign',
 'pdays', 'previous', 'poutcome', 'deposit')
# a new DataFrame using numerical and categorical column

In [20]:
df.show(5)

+---+----------+-------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+-------+
|age|       job|marital|education|default|balance|housing|loan|contact|duration|campaign|pdays|previous|poutcome|deposit|
+---+----------+-------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+-------+
| 59|    admin.|married|secondary|     no|   2343|    yes|  no|unknown|    1042|       1|   -1|       0| unknown|    yes|
| 56|    admin.|married|secondary|     no|     45|     no|  no|unknown|    1467|       1|   -1|       0| unknown|    yes|
| 41|technician|married|secondary|     no|   1270|    yes|  no|unknown|    1389|       1|   -1|       0| unknown|    yes|
| 55|  services|married|secondary|     no|   2476|    yes|  no|unknown|     579|       1|   -1|       0| unknown|    yes|
| 54|    admin.|married| tertiary|     no|    184|     no|  no|unknown|     673|       2|   -1|       0| unknown|    yes|
+---+----------+-------+

In [16]:
catCols = ['job', 'marital', 'education', 'default','housing', 'loan', 'contact', 'poutcome']
# The index of string vlaues multiple columns

In [17]:
inputs=catCols
outputs=["{0}_indexed".format(c) for c in catCols]


indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in catCols]
#  label indexer that maps a string column of labels to an ML column of label indices

In [18]:
# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]
# A one-hot encoder that maps a column of category indices to a column of binary vectors, with
# at most a single one-value per row that indicates the input category index.

In [29]:
# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="rawFeatures")

numericCols = ['age', 'balance', 'duration',  'campaign', 'pdays', 'previous']

pipeline = Pipeline(stages=indexers + encoders+ [assembler])
model=pipeline.fit(df)
transformed = model.transform(df)

In [30]:
transformed.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'balance',
 'housing',
 'loan',
 'contact',
 'day',
 'month',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'deposit',
 'job_indexed',
 'marital_indexed',
 'education_indexed',
 'default_indexed',
 'housing_indexed',
 'loan_indexed',
 'contact_indexed',
 'poutcome_indexed',
 'job_indexed_encoded',
 'marital_indexed_encoded',
 'education_indexed_encoded',
 'default_indexed_encoded',
 'housing_indexed_encoded',
 'loan_indexed_encoded',
 'contact_indexed_encoded',
 'poutcome_indexed_encoded',
 'rawFeatures']

In [31]:
transformed.select('rawFeatures').first()

Row(rawFeatures=SparseVector(32, {3: 1.0, 12: 1.0, 15: 1.0, 19: 1.0, 22: 1.0, 23: 1.0, 26: 1.0, 28: 1.0}))

In [32]:
transformed.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)
 |-- job_indexed: double (nullable = false)
 |-- marital_indexed: double (nullable = false)
 |-- education_indexed: double (nullable = false)
 |-- default_indexed: double (nullable = false)
 |-- housing_indexed: double (nullable = false)
 |-- loan_indexed: double (nullable = false)
 |-- contact_indexed: double (nullable = false)
 |-- po

### Training and Testing Data

In [37]:
(trainingData, testData) = transformed.randomSplit([0.7, 0.3],seed = 11)

In [38]:
trainingData.show(5,truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------
 age                       | 18                                                              
 job                       | student                                                         
 marital                   | single                                                          
 education                 | primary                                                         
 default                   | no                                                              
 balance                   | 608                                                             
 housing                   | no                                                              
 loan                      | no                                                              
 contact                   | cellular                                                        
 day                       | 12                             

In [39]:
testData.show(5,truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------
 age                       | 18                                                              
 job                       | student                                                         
 marital                   | single                                                          
 education                 | unknown                                                         
 default                   | no                                                              
 balance                   | 108                                                             
 housing                   | no                                                              
 loan                      | no                                                              
 contact                   | cellular                                                        
 day                       | 9                              

## Modeling

Next step is to Modeling ... 

#### LogisticRegression

In [42]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="loan_indexed", featuresCol="rawFeatures")
#Training algo
lrModel = lr.fit(trainingData)

lr_prediction = lrModel.transform(testData)

lr_prediction.select("prediction", "loan_indexed", "rawFeatures").show()

evaluator = MulticlassClassificationEvaluator(labelCol="loan_indexed", predictionCol="prediction", metricName="accuracy")

+----------+------------+--------------------+
|prediction|loan_indexed|         rawFeatures|
+----------+------------+--------------------+
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       1.0|         1.0|(32,[3,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,17,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       1.0|         1.0|(32,[0,13,16,19,2...|
|       0.0|         0.0|(32,[7,13,17,19,2...|
|       0.0| 

In [43]:
lr_accuracy = evaluator.evaluate(lr_prediction)

print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

Accuracy of LogisticRegression is = 1
Test Error of LogisticRegression = 0 


#### DecisionTreeClassifier

In [44]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="loan_indexed", featuresCol="rawFeatures")
dt_model = dt.fit(trainingData)
dt_prediction = dt_model.transform(testData)
dt_prediction.select("prediction", "loan_indexed", "rawFeatures").show()

+----------+------------+--------------------+
|prediction|loan_indexed|         rawFeatures|
+----------+------------+--------------------+
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       1.0|         1.0|(32,[3,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,17,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       1.0|         1.0|(32,[0,13,16,19,2...|
|       0.0|         0.0|(32,[7,13,17,19,2...|
|       0.0| 

In [46]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))

Accuracy of DecisionTreeClassifier is = 1
Test Error of DecisionTreeClassifier = 0 


#### RandomForestClassifier

In [47]:
from pyspark.ml.classification import RandomForestClassifier
rf = DecisionTreeClassifier(labelCol="loan_indexed", featuresCol="rawFeatures")
rf_model = rf.fit(trainingData)
rf_prediction = rf_model.transform(testData)
rf_prediction.select("prediction", "loan_indexed", "rawFeatures").show()

+----------+------------+--------------------+
|prediction|loan_indexed|         rawFeatures|
+----------+------------+--------------------+
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       1.0|         1.0|(32,[3,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,17,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,15,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       0.0|         0.0|(32,[7,13,18,19,2...|
|       1.0|         1.0|(32,[0,13,16,19,2...|
|       0.0|         0.0|(32,[7,13,17,19,2...|
|       0.0| 

In [48]:
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier is = %g"% (rf_accuracy))
print("Test Error of RandomForestClassifier  = %g " % (1.0 - rf_accuracy))

Accuracy of RandomForestClassifier is = 1
Test Error of RandomForestClassifier  = 0 
