## Import spark library

In [1]:
import pyspark
import numpy as np # linear algebra
import pandas as pd # data processing
import matplotlib.pyplot as plt
%matplotlib inline
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("Classification") \
    .getOrCreate()

# Import data

In [3]:
df = spark.read.load("D:/KULIAHHH/semester 8/BDATA/PROYEK/bank-additional-full.csv",
                     format="csv", inferSchema="true", header="true")

# Rename some variable

In [4]:
df = df.withColumnRenamed("emp.var.rate","emp_var_rate")
df = df.withColumnRenamed("cons.price.idx","cons_price_idx")
df = df.withColumnRenamed("cons.conf.idx","cons_conf_idx")
df = df.withColumnRenamed("nr.employed","nr_employed")

### Select data 

In [5]:
df = df.select(
    'age', 'job', 'marital', 'education', 'default', 'housing', 'loan', 
    'contact', 'duration', 'campaign', 'pdays', 'poutcome', 'emp_var_rate',
    'cons_price_idx', 'cons_conf_idx', 'euribor3m', 'nr_employed', 'y'
)
cols = df.columns
# df.printSchema()

# Encode Categorical Feature

3 tahap yang akan dilakukan adalah :
- String Indexing, Konsepnya mirip dengan Label Encoding dimana setiap nilai unik pada fitur kategorikal akan diberi label dengan nilai numerik. 

- One-Hot Encoding, fitur yang sudah di indexing ditransformasi menggunakan OneHotEncoder sehingga output yang akan dihasilkan memberikan sebuah vektor biner. 

- Vector Assembler, menggabungkan semua fitur ke dalam sebuah kolom vektor yang sudah siap digunakan untuk membangun model machine learning. 


In [6]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

stages = []
categoricalColumns = [
    'job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome'
]

for categoricalCol in categoricalColumns:
    # String Indexing
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    # One Hot Encoding
    encoder = OneHotEncoder(
        inputCols=[stringIndexer.getOutputCol()], 
        outputCols=[categoricalCol + "classVec"]
    )
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol = 'y', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'duration', 'campaign', 'pdays', 'emp_var_rate', 'cons_price_idx', 
               'cons_conf_idx', 'euribor3m', 'nr_employed']

# Vector Assembler
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

# ML Pipeline and Modeling

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
# make a model
stage_gbtClassifier = GBTClassifier(featuresCol='features',labelCol='label', maxIter=10)

- menambahkan model kedalam stages yang berisi tahapan string indexing, one hot encoding, dan label encoding.
- memanggil pipeline model dan melakukan train model

In [8]:
stages.append(stage_gbtClassifier)
pipeline = Pipeline(stages= stages)
# gbt_pipeline = Pipeline(stages= [stages, stage_5])
# fit the pipeline for the trainind data
model = pipeline.fit(df)

# transform the data
sample_data_train = model.transform(df)

# view some of the columns generated
sample_data_train.select('features', 'label', 'rawPrediction', 'probability', 'prediction').show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(39,[8,11,18,21,2...|  0.0|[1.30176325134733...|[0.93108819481038...|       0.0|
|(39,[3,11,15,22,2...|  0.0|[1.31292810233643...|[0.93250721620814...|       0.0|
|(39,[3,11,15,21,2...|  0.0|[1.30573432492754...|[0.93159604511677...|       0.0|
|(39,[0,11,19,21,2...|  0.0|[1.31292810233643...|[0.93250721620814...|       0.0|
|(39,[3,11,15,21,2...|  0.0|[1.30047513377923...|[0.93092271222613...|       0.0|
|(39,[3,11,16,22,2...|  0.0|[1.31292810233643...|[0.93250721620814...|       0.0|
|(39,[0,11,17,21,2...|  0.0|[1.31292810233643...|[0.93250721620814...|       0.0|
|(39,[1,11,20,22,2...|  0.0|[1.30573432492754...|[0.93159604511677...|       0.0|
|(39,[2,12,17,21,2...|  0.0|[1.26977656877971...|[0.92686854256749...|       0.0|
|(39,[3,12,15,21

# Evaluate Model

import dan preproses data test

In [20]:
test_df = spark.read.load("D:/KULIAHHH/semester 8/BDATA/PROYEK/bank-additional-full.csv",
                     format="csv", inferSchema="true", header="true")

In [21]:
test_df = test_df.withColumnRenamed("emp.var.rate","emp_var_rate")
test_df = test_df.withColumnRenamed("cons.price.idx","cons_price_idx")
test_df = test_df.withColumnRenamed("cons.conf.idx","cons_conf_idx")
test_df = test_df.withColumnRenamed("nr.employed","nr_employed")

In [22]:
test_df = test_df.select(
    'age', 'job', 'marital', 'education', 'default', 'housing', 'loan', 
    'contact', 'duration', 'campaign', 'pdays', 'poutcome', 'emp_var_rate',
    'cons_price_idx', 'cons_conf_idx', 'euribor3m', 'nr_employed', 'y'
)
cols = test_df.columns
# df.printSchema()

# Testing Model

In [23]:
sample_data_test = model.transform(test_df)

In [24]:
sample_data_test.select('label', 'features', 'rawPrediction', 'probability', 'prediction').show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(39,[8,11,18,21,2...|[1.30176325134733...|[0.93108819481038...|       0.0|
|  0.0|(39,[3,11,15,22,2...|[1.31292810233643...|[0.93250721620814...|       0.0|
|  0.0|(39,[3,11,15,21,2...|[1.30573432492754...|[0.93159604511677...|       0.0|
|  0.0|(39,[0,11,19,21,2...|[1.31292810233643...|[0.93250721620814...|       0.0|
|  0.0|(39,[3,11,15,21,2...|[1.30047513377923...|[0.93092271222613...|       0.0|
|  0.0|(39,[3,11,16,22,2...|[1.31292810233643...|[0.93250721620814...|       0.0|
|  0.0|(39,[0,11,17,21,2...|[1.31292810233643...|[0.93250721620814...|       0.0|
|  0.0|(39,[1,11,20,22,2...|[1.30573432492754...|[0.93159604511677...|       0.0|
|  0.0|(39,[2,12,17,21,2...|[1.26977656877971...|[0.92686854256749...|       0.0|
|  0.0|(39,[3,12

# Evaluate Model

In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

gbtEval = BinaryClassificationEvaluator()
gbtROC = gbtEval.evaluate(sample_data_test, {gbtEval.metricName: "areaUnderROC"})
print("Test Area Under ROC: " + str(gbtROC))

Test Area Under ROC: 0.9476449693695207


# Save Model

In [1]:
import pickle
Pkl_Filename = "model.pkl"  

with open(Pkl_Filename, 'wb') as file:  
    pickle.dump(model, file)

In [2]:
# Load the Model back from file
with open(Pkl_Filename, 'rb') as file:  
    Pickled_LR_Model = pickle.load(file)

Pickled_LR_Model