In [1]:
# Import packages
import os
import time
import pyspark
import findspark
import numpy as np
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.mllib.regression import LabeledPoint

from pyspark.ml import Pipeline
#from pyspark.ml.feature import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString

from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when, explode
from pyspark.sql import SparkSession, SQLContext, Row, HiveContext

In [2]:
findspark.init()

# Creatingt Spark SQL environment
spark =SparkSession\
   .builder\
   .appName("test")\
   .enableHiveSupport().getOrCreate()

sc= spark.sparkContext
sqlContext= SQLContext(sc)

findspark.find()

'C:\\spark\\spark-3.0.1-bin-hadoop2.7'

In [None]:
spark.sql.debug.maxToStringFields

In [3]:
# spark is an existing SparkSession
train = sqlContext.read.format("csv")\
   .option("header", "true")\
   .load("C:/Users/affiqazrin/Desktop/dataset/Data_FinalProject.csv")

In [4]:
#target (y=deposit)
train.groupBy("y").count().show()

+---+-----+
|  y|count|
+---+-----+
| no|36548|
|yes| 4640|
+---+-----+



In [5]:
# Convert Feature Types
train.createOrReplaceTempView("train")

train = spark.sql("select \
                    cast(age as int) as age, \
                    cast(job as string) as job, \
                    cast(marital as string) as marital, \
                    cast(education as string) as education, \
                    cast(default as string) as default, \
                    cast(housing as string) as housing, \
                    cast(loan as string) as loan, \
                    cast(contact as string) as contact, \
                    cast(day_of_week as string) as day, \
                    cast(month as string) as month, \
                    cast(duration as int) as duration, \
                    cast(campaign as int) as campaign, \
                    cast(pdays as int) as pdays, \
                    cast(previous as int) as previous, \
                    cast(poutcome as string) as poutcome, \
                    cast(y as string) as deposit \
                from train")

# Data Types
train.dtypes
[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'string'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('deposit', 'string')]

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'string'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('deposit', 'string')]

In [6]:
# dropping null values
train = train.dropna()

In [7]:
# Spliting in train and test set. Beware : It sorts the dataset
(traindf, testdf) = train.randomSplit([0.7,0.3])

In [8]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
jobIndexer = StringIndexer(inputCol='job', outputCol="indexedJob")
maritalIndexer = StringIndexer(inputCol='marital', outputCol="indexedMarital")
educationIndexer = StringIndexer(inputCol='education', outputCol="indexedEducation")

housingIndexer = StringIndexer(inputCol='housing', outputCol="indexedHousing")
loanIndexer = StringIndexer(inputCol='loan', outputCol="indexedLoan")
contactIndexer = StringIndexer(inputCol='contact', outputCol="indexedContact")

dayIndexer = StringIndexer(inputCol='day', outputCol="indexedDay")
monthIndexer = StringIndexer(inputCol='month', outputCol="indexedMonth")
poutcomeIndexer = StringIndexer(inputCol='poutcome', outputCol="indexedPoutcome")

depositIndexer = StringIndexer(inputCol='deposit', outputCol="indexedDeposit")

In [9]:
# One Hot Encoder on indexed features
jobEncoder = OneHotEncoder(inputCol='indexedJob', outputCol="jobVec")
maritalEncoder = OneHotEncoder(inputCol='indexedMarital', outputCol="maritalVec")
educationEncoder = OneHotEncoder(inputCol='indexedEducation', outputCol="educationVec")

housingEncoder = OneHotEncoder(inputCol='indexedHousing', outputCol="housingVec")
loanEncoder = OneHotEncoder(inputCol='indexedLoan', outputCol="loanVec")
contactEncoder = OneHotEncoder(inputCol='indexedContact', outputCol="contactVec")

dayEncoder = OneHotEncoder(inputCol='indexedDay', outputCol="dayVec")
monthEncoder = OneHotEncoder(inputCol='indexedMonth', outputCol="monthVec")
poutcomeEncoder = OneHotEncoder(inputCol='indexedPoutcome', outputCol="poutcomeVec")

In [10]:
# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=['age',
                                       'duration',
                                       'campaign',
                                       'pdays',
                                       'previous',
                                       
                                       "jobVec",
                                       "maritalVec",
                                       "educationVec",
                                       "housingVec",
                                       "loanVec",
                                       "contactVec",
                                       "dayVec",
                                       "monthVec",
                                       "poutcomeVec"],outputCol="features")

In [None]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedDeposit", featuresCol="features")

In [None]:
# Train a GradientBoostedTreeClassifier model.
gbt = GBTClassifier(labelCol="indexedDeposit", featuresCol="features", maxIter=10)

In [11]:
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedDeposit", featuresCol="features")

In [12]:
# Chain indexers and forest in a Pipeline
# Change the model choosen in the cell previously in the Pipeline
pipeline = Pipeline(stages=[jobIndexer,
                            maritalIndexer,
                            educationIndexer,
                            housingIndexer,
                            loanIndexer,
                            contactIndexer,
                            dayIndexer,
                            monthIndexer,
                            poutcomeIndexer, 
                            
                            jobEncoder,
                            maritalEncoder,
                            educationEncoder,
                            housingEncoder,
                            loanEncoder,
                            contactEncoder,
                            dayEncoder,
                            monthEncoder,
                            poutcomeEncoder,
                            
                            assembler,
                            
                            depositIndexer,
                            
                            dt])
 
# Train model.  This also runs the indexers.
model = pipeline.fit(traindf)
 
# Predictions
predictions = model.transform(testdf)

In [13]:
# Select example rows to display.
predictions.columns 
 
# Select example rows to display.
predictions.select("prediction",
                   "deposit",
                   "indexedDeposit",
                   "features").show()

+----------+-------+--------------+--------------------+
|prediction|deposit|indexedDeposit|            features|
+----------+-------+--------------+--------------------+
|       0.0|     no|           0.0|(46,[0,1,2,3,15,1...|
|       0.0|    yes|           1.0|(46,[0,1,2,3,15,1...|
|       0.0|    yes|           1.0|(46,[0,1,2,3,15,1...|
|       0.0|     no|           0.0|(46,[0,1,2,3,15,1...|
|       0.0|     no|           0.0|(46,[0,1,2,3,15,1...|
|       0.0|    yes|           1.0|(46,[0,1,2,3,4,15...|
|       0.0|     no|           0.0|(46,[0,1,2,3,15,1...|
|       0.0|     no|           0.0|(46,[0,1,2,3,5,17...|
|       0.0|     no|           0.0|(46,[0,1,2,3,6,17...|
|       0.0|     no|           0.0|(46,[0,1,2,3,6,17...|
|       0.0|     no|           0.0|(46,[0,1,2,3,6,17...|
|       0.0|     no|           0.0|(46,[0,1,2,3,11,1...|
|       0.0|     no|           0.0|(46,[0,1,2,3,11,1...|
|       0.0|     no|           0.0|(46,[0,1,2,3,8,17...|
|       0.0|     no|           

In [None]:
predictions.toPandas().to_csv('Data_FinalProject_READY5_dt.csv')

In [14]:
# Select (prediction, true label) and compute test error
predictions = predictions.select(col("indexedDeposit").cast("Float"),col("prediction"))

evaluator = MulticlassClassificationEvaluator(labelCol="indexedDeposit", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.096454


In [15]:
rfModel = model.stages[6]
print(rfModel)  # summary only
 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedDeposit", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
 
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="indexedDeposit", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)
 
evaluatorwp = MulticlassClassificationEvaluator(labelCol="indexedDeposit", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)
 
evaluatorwr = MulticlassClassificationEvaluator(labelCol="indexedDeposit", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)

StringIndexerModel: uid=StringIndexer_126b03b9aa5e, handleInvalid=error
Accuracy = 0.903546
f1 = 0.891051
weightedPrecision = 0.888666
weightedRecall = 0.903546
