In [None]:
import os
import numpy as np
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql import functions as F

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = """
    --jars xgboost4j-spark_2.12-1.2.0.jar,xgboost4j_2.12-1.2.0.jar
    pyspark-shell"""

In [None]:
spark = SparkSession\
        .builder\
        .appName("your_session_name")\
        .master("local[*]")\
        .getOrCreate()

In [None]:
spark.sparkContext.addPyFile("sparkxgb_1.24.zip")

### Import supporting libs for Model development

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from sparkxgb.xgboost import XGBoostClassificationModel, XGBoostClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics

In [None]:
data = spark.read.parquet('train.parquet')

In [None]:
data = data.withColumn('label', F.when(F.col('Loan_Status')=='Y', 1) \
                                            .otherwise(0)
                                  )

#### StringIndexer to convert the categorical/string values to numeric form

In [None]:
index1 = StringIndexer().setInputCol("Gender").setOutputCol("GenderIndex").setHandleInvalid("keep")
index2 = StringIndexer().setInputCol("Married").setOutputCol("MarriedIndex").setHandleInvalid("keep")
index3 = StringIndexer().setInputCol("Education").setOutputCol("EducationIndex").setHandleInvalid("keep")
index4 = StringIndexer().setInputCol("Self_Employed").setOutputCol("SelfEmployedIndex").setHandleInvalid("keep")
index5 = StringIndexer().setInputCol("Property_Area").setOutputCol("PropertyAreaIndex").setHandleInvalid("keep")

In [None]:
features = ['GenderIndex', 'MarriedIndex', 'EducationIndex', 'SelfEmployedIndex', 'PropertyAreaIndex',
           'ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History']

vec_assembler = VectorAssembler(inputCols=features, outputCol='features', handleInvalid='keep')

xgb = XGBoostClassifier(objective="binary:logistic",seed=1712,
                        featuresCol="features",
                        labelCol="label",
                        missing=0.0,
                        )

In [None]:
pipeline = Pipeline().setStages([index1, index2, index3, index4, index5, vec_assembler, xgb])

In [None]:
trainDF, testDF = train_data.randomSplit([0.7, 0.3], seed=1712)

In [None]:
model = pipeline.fit(trainDF)

In [None]:
predictions = model.transform(testDF)[['Loan_ID', 'prediction', 'label']]

In [None]:
predictions.show(5)

### Performance check

In [None]:
from pyspark.sql.types import DoubleType

predictionAndLabels = predictions.select(['prediction', 'label']\
                                  ).withColumn('label',F.col('label').cast(DoubleType())).rdd

metrics = MulticlassMetrics(predictionAndLabels)

cm = metrics.confusionMatrix().toArray()

accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[1][1])/(cm[0][1]+cm[1][1])
recall=(cm[1][1])/(cm[1][0]+cm[1][1])

In [None]:
accuracy, precision, recall

#### For Hyper Parameter Tuning
- Be careful to execute this as it is very expensive and take very long time to run as it check on each and every combination of the params

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [None]:
xgbEval = BinaryClassificationEvaluator()

In [None]:
"""
Model tuning using cross validation and param grid
"""

paramGrid = (ParamGridBuilder()
             .addGrid(xgb.alpha,[1e-5, 1e-2, 0.1])
             .addGrid(xgb.eta, [0.001, 0.01])
             .addGrid(xgb.numRound, [150,160])
             .addGrid(xgb.maxDepth, range(3,7,3))
             .addGrid(xgb.minChildWeight, [3.0, 4.0])
             .addGrid(xgb.gamma, [i/10.0 for i in range(0,2)])
             .addGrid(xgb.colsampleBytree, [i/10.0 for i in range(3,6)])
             .addGrid(xgb.subsample, [0.4,0.6])
             .build())

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=xgbEval, numFolds=3)
cvModel = cv.fit(trainDF)
cvPreds = cvModel.transform(testDF)
xgbEval.evaluate(cvPreds)

In [None]:
cvModel.bestModel.extractParamMap()