In [1]:
# Assign name to the job
sc.appName='Cedit_Risk_Clssification'
print sc.master

spark://master-172-31-47-102:7077


In [None]:
# local file direcory for the input directory
#project_root='file:///home/ubuntu/spark/Notebooks/german_credit/'
# HDFS path for the input directory
hdfs_project_root='german_credit/'
# Name of the input file
raw_data_file=hdfs_project_root+'german_credit.csv'

from pyspark.sql import Row

# Read file from hdfs
raw_data = sc.textFile(raw_data_file)
# Read first line as header and replace spaces from them
header=raw_data.first().replace(' ','_')

# remove header from actual data
only_data = raw_data.filter(lambda r : r.replace(' ','_') != header)

# By deafault everything is read as string,convert the data to double/float
def to_type(data):
    return [float(i) for i in data]

#only_data=only_data.map(lambda r : r.split(',')).map(to_type).collect()
#print only_data


In [None]:
# import the inbuilt data types from pyspark.sql module
from pyspark.sql.types import *

#This line if you want the deafault header in the schema
#fields = [StructField(field_name,DoubleType(), True) for field_name in header.split(',')]
#schema = StructType(fields)

# Changing the header as the deafault headers are lengthy 
schema = StructType([StructField('creditability', DoubleType(), True),
                     StructField('balance', DoubleType(), True),
                     StructField('duration', DoubleType(), True),
                     StructField('history', DoubleType(), True),
                     StructField('purpose', DoubleType(), True),
                     StructField('amount', DoubleType(), True),
                     StructField('savings', DoubleType(), True),
                     StructField('employment', DoubleType(), True),
                     StructField('instPercent', DoubleType(), True),
                     StructField('sexMarried', DoubleType(), True),
                     StructField('guarantors', DoubleType(), True),
                     StructField('residenceDuration', DoubleType(), True),
                     StructField('assets', DoubleType(), True),
                     StructField('age', DoubleType(), True),
                     StructField('concCredit', DoubleType(), True),
                     StructField('apartment', DoubleType(), True),
                     StructField('credits', DoubleType(), True),
                     StructField('occupation', DoubleType(), True),
                     StructField('dependents', DoubleType(), True),
                     StructField('hasPhone', DoubleType(), True),
                     StructField('foreign', DoubleType(), True)
                    ])
# Map the schema to the data and create data frame
Customers = sqlContext.createDataFrame(only_data, schema)

#check schema
Customers.printSchema()
# Check the actual data now
Customers.show()

In [4]:
# register the data frame as table
Customers.registerTempTable("credit")
# query the credability table to check average balance amount,average loan and average duration for 
# each class of customer i.e. 1 and 0
results =  sqlContext.sql("SELECT creditability, avg(balance) as avgbalance, avg(amount) as avgamt, \
                          avg(duration) as avgdur  FROM credit GROUP BY creditability ")
# cheack the result of the query
results.show()

+-------------+------------------+------------------+------------------+
|creditability|        avgbalance|            avgamt|            avgdur|
+-------------+------------------+------------------+------------------+
|          1.0|2.8657142857142857| 2985.442857142857|19.207142857142856|
|          0.0|1.9033333333333333|3938.1266666666666|             24.86|
+-------------+------------------+------------------+------------------+



In [5]:
# check summary for the column balance
Customers.describe("balance").show()
# use domain specific functions to query
Customers.groupBy("creditability").avg("balance").show()

+-------+-----------------+
|summary|          balance|
+-------+-----------------+
|  count|             1000|
|   mean|            2.577|
| stddev|1.257637727110893|
|    min|              1.0|
|    max|              4.0|
+-------+-----------------+

+-------------+------------------+
|creditability|      avg(balance)|
+-------------+------------------+
|          1.0|2.8657142857142857|
|          0.0|1.9033333333333333|
+-------------+------------------+



In [6]:
# Explicitly form an array with all the feature names 
featureCols = ["balance", "duration", "history", "purpose", "amount",
      "savings", "employment", "instPercent", "sexMarried", "guarantors",
      "residenceDuration", "assets", "age", "concCredit", "apartment",
      "credits", "occupation", "dependents", "hasPhone", "foreign"]

# import VectorAssembler to convert entire data frame into vector representation
from pyspark.ml.feature import VectorAssembler

# pass the settings for the vectoriser.
# pass the input columns to be vectorised and then pass the name of the output column which would hold the vectorised
# output. The output column will have features stored in a DenseVector data structure 
assembler=VectorAssembler(inputCols=featureCols,
                          outputCol="features"
                          )
# Transform the data as per the vectoriser defined
output = assembler.transform(Customers)
# Check the data structure
print(output.select("features", "creditability").take(5))


[Row(features=DenseVector([1.0, 18.0, 4.0, 2.0, 1049.0, 1.0, 2.0, 4.0, 2.0, 1.0, 4.0, 2.0, 21.0, 3.0, 1.0, 1.0, 3.0, 1.0, 1.0, 1.0]), creditability=1.0), Row(features=DenseVector([1.0, 9.0, 4.0, 0.0, 2799.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0, 1.0, 36.0, 3.0, 1.0, 2.0, 3.0, 2.0, 1.0, 1.0]), creditability=1.0), Row(features=DenseVector([2.0, 12.0, 2.0, 9.0, 841.0, 2.0, 4.0, 2.0, 2.0, 1.0, 4.0, 1.0, 23.0, 3.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0]), creditability=1.0), Row(features=DenseVector([1.0, 12.0, 4.0, 0.0, 2122.0, 1.0, 3.0, 3.0, 3.0, 1.0, 2.0, 1.0, 39.0, 3.0, 1.0, 2.0, 2.0, 2.0, 1.0, 2.0]), creditability=1.0), Row(features=DenseVector([1.0, 12.0, 4.0, 0.0, 2171.0, 1.0, 3.0, 4.0, 3.0, 1.0, 4.0, 2.0, 38.0, 1.0, 2.0, 2.0, 2.0, 1.0, 1.0, 2.0]), creditability=1.0)]


In [None]:
# Import the tarnsformer to be used with the labels
from pyspark.ml.feature import StringIndexer
# Define the indexer for the label with input and output column
indexer = StringIndexer(inputCol="creditability", outputCol="label")
# tarnsform the label
indexed = indexer.fit(output).transform(output)
indexed.show()

In [8]:
# split the training data into train and validation set
splitSeed = 5043
trainingData, testData = indexed.randomSplit([0.7, 0.3], splitSeed)

In [9]:
testData.count()

299

Now we have the training data in a vectorised form which most of the ML algorithms Understands.
Also we have the validation data in the same form of training data.

In [10]:
# import RandomForest classifier
from pyspark.ml.classification import RandomForestClassifier

# Define the classifier with all the ncessary settings and hyper-parameters
classifier = RandomForestClassifier(numTrees=20, maxDepth=3, labelCol="label", seed=42,impurity="gini", \
                                    featureSubsetStrategy="auto")
# Pass the data to the Random Forst classfier function for the transformation which would return the tranformed 
# values aka model
model = classifier.fit(trainingData)

# Now pass the new data to the learned model to see the prediction
predictions = model.transform(testData)

# Since the predictions is a data frame we can use the the domain specific function to query the predicted label ,
# actual label and the actual vectorised data 
predictions.select("prediction", "label", "features").show(10)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  1.0|[2.0,36.0,2.0,5.0...|
|       0.0|  0.0|[1.0,6.0,4.0,0.0,...|
|       0.0|  0.0|[1.0,8.0,4.0,0.0,...|
|       0.0|  0.0|[1.0,11.0,4.0,0.0...|
|       0.0|  0.0|[2.0,36.0,4.0,3.0...|
|       0.0|  1.0|[4.0,18.0,4.0,6.0...|
|       0.0|  0.0|[1.0,15.0,2.0,3.0...|
|       0.0|  0.0|[2.0,18.0,2.0,6.0...|
|       0.0|  0.0|[2.0,24.0,4.0,9.0...|
|       0.0|  0.0|[3.0,36.0,2.0,3.0...|
+----------+-----+--------------------+
only showing top 10 rows



### We have successfuly build a Random Forest classfication model to categorise a bank customer's credebility. Now let's check its performance with some evaluation  metrics

##### Let's check the misclassification ratio by comparing the actual label to the predicted label

In [11]:
# Import an evaluator module for binary classification
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# define the settings for the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="label")
# pass the test data for the evaluation
predictions = model.transform(testData)

accuracy = evaluator.evaluate(predictions)
print("accuracy before pipeline fitting" ,accuracy)



('accuracy before pipeline fitting', 0.750615763546798)


#### Also if needed you can check the statistical significance of the model by checking other errors

In [15]:
from pyspark.mllib.evaluation import RegressionMetrics
ab = predictions.select("prediction","label")
rm =  RegressionMetrics(ab.map(lambda a:(a[0],a[1])))

print("MSE: " , rm.meanSquaredError)
print("MAE: " , rm.meanAbsoluteError)
print("RMSE Squared: " , rm.rootMeanSquaredError)
print("R Squared: " , rm.r2)
print("Explained Variance: " , rm.explainedVariance )

('MSE: ', 0.31772575250836116)
('MAE: ', 0.3177257525083612)
('RMSE Squared: ', 0.563671670840713)
('R Squared: ', -0.4575636288998355)
('Explained Variance: ', 0.12343262379615441)


#### Till now we have build and validated the model with a fixed set of hyper-meters.But in a real apllication we should 
#### try a set of parameters before finalising the model. This procedure is known as Model-Selection.

#### We will perform the model selction with parameter grid which has multiple values for the hyper-parametrs and it gets 
#### passed to the training process.

In [16]:

from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

grid = ParamGridBuilder() \
      .addGrid(classifier.maxBins, [35, 40]) \
      .addGrid(classifier.maxDepth, [9, 18]) \
      .addGrid(classifier.numTrees, [50, 80]) \
      .addGrid(classifier.impurity, ["entropy", "gini"]) \
      .build()

from pyspark.ml.evaluation import BinaryClassificationEvaluator
cv = CrossValidator(estimator=classifier, estimatorParamMaps=grid, evaluator=evaluator,numFolds=10)
pipelineFittedModel = cv.fit(trainingData)

predictions2 = pipelineFittedModel.transform(testData)
accuracy2 = evaluator.evaluate(predictions2)
print("accuracy after pipeline fitting" , accuracy2)

('accuracy after pipeline fitting', 0.7671130952380953)


## As we can see there is slight change in accuracy. Right now e have the pipeline built and ready for the classification.
### To Enhance the accuracy any further you can tune the features and play around with parameters your self.

In [17]:
ab2 = predictions2.select("prediction","label")
rm2 =  RegressionMetrics(ab2.map(lambda a:(a[0],a[1])))

print("MSE: " , rm2.meanSquaredError)
print("MAE: " , rm2.meanAbsoluteError)
print("RMSE Squared: " , rm2.rootMeanSquaredError)
print("R Squared: " , rm2.r2)
print("Explained Variance: " , rm2.explainedVariance )

('MSE: ', 0.2809364548494983)
('MAE: ', 0.2809364548494983)
('RMSE Squared: ', 0.5300343902517065)
('R Squared: ', -0.2887931034482758)
('Explained Variance: ', 0.1605351170568561)
