In [1]:
# importing
from mleap import pyspark
from pyspark.sql import functions as f
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

In [7]:
# Loading
file_path = "/sparta/ey_an/input/german_credit_data_labels.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show()

+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+----+
| ID|Age|   Sex|Job|Housing|Saving accounts|Checking account|Credit amount|Duration|            Purpose|Risk|
+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+----+
|  0| 67|  male|  2|    own|             NA|          little|         1169|       6|           radio/TV|good|
|  1| 22|female|  2|    own|         little|        moderate|         5951|      48|           radio/TV| bad|
|  2| 49|  male|  1|    own|         little|              NA|         2096|      12|          education|good|
|  3| 45|  male|  2|   free|         little|          little|         7882|      42|furniture/equipment|good|
|  4| 53|  male|  2|   free|         little|          little|         4870|      24|                car| bad|
|  5| 35|  male|  1|   free|             NA|              NA|         9055|      36|          education|good|
|  6| 53| 

Dropping unwanted columns that doesnt add to model featuring

In [8]:
# Data dropping and vectorization
df = df.drop('Sex', 'Job')
df.show()

+---+---+-------+---------------+----------------+-------------+--------+-------------------+----+
| ID|Age|Housing|Saving accounts|Checking account|Credit amount|Duration|            Purpose|Risk|
+---+---+-------+---------------+----------------+-------------+--------+-------------------+----+
|  0| 67|    own|             NA|          little|         1169|       6|           radio/TV|good|
|  1| 22|    own|         little|        moderate|         5951|      48|           radio/TV| bad|
|  2| 49|    own|         little|              NA|         2096|      12|          education|good|
|  3| 45|   free|         little|          little|         7882|      42|furniture/equipment|good|
|  4| 53|   free|         little|          little|         4870|      24|                car| bad|
|  5| 35|   free|             NA|              NA|         9055|      36|          education|good|
|  6| 53|    own|     quite rich|              NA|         2835|      24|furniture/equipment|good|
|  7| 35| 

Vectorisation of Risk Column

In [9]:
encoder = StringIndexer(inputCol='Risk', outputCol='Binary_Risk')
df = encoder.fit(df).transform(df)
df = df.drop('Risk')
df.show()

+---+---+-------+---------------+----------------+-------------+--------+-------------------+-----------+
| ID|Age|Housing|Saving accounts|Checking account|Credit amount|Duration|            Purpose|Binary_Risk|
+---+---+-------+---------------+----------------+-------------+--------+-------------------+-----------+
|  0| 67|    own|             NA|          little|         1169|       6|           radio/TV|        0.0|
|  1| 22|    own|         little|        moderate|         5951|      48|           radio/TV|        1.0|
|  2| 49|    own|         little|              NA|         2096|      12|          education|        0.0|
|  3| 45|   free|         little|          little|         7882|      42|furniture/equipment|        0.0|
|  4| 53|   free|         little|          little|         4870|      24|                car|        1.0|
|  5| 35|   free|             NA|              NA|         9055|      36|          education|        0.0|
|  6| 53|    own|     quite rich|             

Vectorisation of entire data

In [10]:
df = df.withColumn('Checking_little', f.when(f.col('Checking account') == "little", 1).otherwise(0))
df = df.withColumn('Checking_null', f.when(f.col('Checking account') == "NA", 1.0).otherwise(0))
df = df.withColumn('Checking_moderate', f.when(f.col('Checking account') == "moderate", 1).otherwise(0))
df = df.withColumn('Savings_little', f.when(f.col('Saving accounts') == "little", 1).otherwise(0))
df = df.withColumn('Savings_null', f.when(f.col('Saving accounts') == "little", 1).otherwise(0))
df = df.withColumn('Purpose_radio/TV', f.when(f.col('Purpose') == "radio/TV", 1).otherwise(0))
df = df.withColumn('Housing_own', f.when(f.col('Housing') == "own", 1).otherwise(0))
df = df.withColumn('Credit_big', f.when(f.col('Credit amount') > 10000, 1).otherwise(0))
df = df.withColumn('Duration_short', f.when(f.col('Duration') < 12, 1).otherwise(0))
df = df.withColumn('Age_young', f.when(f.col('Age') < 27, 1).otherwise(0)) 

df = df.drop('Age', 'Housing', 'Saving accounts', 'Checking account', 'Credit amount', 'Duration', 'Purpose')
df.show()

+---+-----------+---------------+-------------+-----------------+--------------+------------+----------------+-----------+----------+--------------+---------+
| ID|Binary_Risk|Checking_little|Checking_null|Checking_moderate|Savings_little|Savings_null|Purpose_radio/TV|Housing_own|Credit_big|Duration_short|Age_young|
+---+-----------+---------------+-------------+-----------------+--------------+------------+----------------+-----------+----------+--------------+---------+
|  0|        0.0|              1|          0.0|                0|             0|           0|               1|          1|         0|             1|        0|
|  1|        1.0|              0|          0.0|                1|             1|           1|               1|          1|         0|             0|        1|
|  2|        0.0|              0|          1.0|                0|             1|           1|               0|          1|         0|             0|        0|
|  3|        0.0|              1|          0.0

In [11]:
# Defining features
features = df.columns
features = features[2:]
features

['Checking_little',
 'Checking_null',
 'Checking_moderate',
 'Savings_little',
 'Savings_null',
 'Purpose_radio/TV',
 'Housing_own',
 'Credit_big',
 'Duration_short',
 'Age_young']

In [13]:
# Feature pipeline
continuous_feature_assembler = VectorAssembler(inputCols=features, outputCol="all_features")
featurePipeline = Pipeline(stages=[continuous_feature_assembler])
sparkFeaturePipelineModel = featurePipeline.fit(df)
print("Finished constructing the pipeline")
df.schema

Finished constructing the pipeline


StructType(List(StructField(ID,IntegerType,true),StructField(Binary_Risk,DoubleType,true),StructField(Checking_little,IntegerType,false),StructField(Checking_null,DoubleType,false),StructField(Checking_moderate,IntegerType,false),StructField(Savings_little,IntegerType,false),StructField(Savings_null,IntegerType,false),StructField(Purpose_radio/TV,IntegerType,false),StructField(Housing_own,IntegerType,false),StructField(Credit_big,IntegerType,false),StructField(Duration_short,IntegerType,false),StructField(Age_young,IntegerType,false)))

In [22]:
# Train a Decision Tree Model
# Create model
dt = DecisionTreeClassifier(featuresCol="all_features", labelCol="Binary_Risk")
pipeline_dt = [sparkFeaturePipelineModel] + [dt]
sparkPipelineEstimatordt = Pipeline(stages = pipeline_dt)
sparkPipelinedt = sparkPipelineEstimatordt.fit(df)
print("Complete: Training Decision Tree")

Complete: Training Decision Tree


In [23]:
# saving pipeline
%savePipeline   --pipelineName an-decisiontree-model \
                --pipelineModelObject sparkPipelinedt \
                --dataframe df \
                --description "Decision Tree model for predicting risk probability"

{"message":"Model 'an-decisiontree-model' correctly uploaded."}


In [24]:
# deploy model
%deployPipeline --pipelineName an-decisiontree-model

Model 'an-decisiontree-model' correctly deployed from repository.
