In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=d804a6476f6db2c8a828b1b21de67fa87708f93a414325cd9561d746861c042c
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [3]:
spark = SparkSession.builder.master('local[*]').appName("Default Prediction with Spark").getOrCreate()

In [6]:
dataset = spark.read.csv('dataset.csv', sep=',', header=True, inferSchema=True)

In [7]:
#The response variable is assigned the value 0 for customers who paid on time and the value 1 for default customers
#Although the database has many more customers who paid on time than default customers, no treatment was performed to balance the database
dataset.groupBy('TARGET').count().show()

+------+------+
|TARGET| count|
+------+------+
|     1| 24825|
|     0|282686|
+------+------+



In [9]:
#Selecting only the variables of interest for the models
dataset = dataset.select('TARGET','NAME_CONTRACT_TYPE','CODE_GENDER','FLAG_OWN_CAR','FLAG_OWN_REALTY','CNT_CHILDREN','AMT_INCOME_TOTAL','AMT_CREDIT',
                     'AMT_ANNUITY', 'AMT_GOODS_PRICE','NAME_INCOME_TYPE','NAME_EDUCATION_TYPE','NAME_HOUSING_TYPE','REGION_POPULATION_RELATIVE',
                     'DAYS_BIRTH','DAYS_EMPLOYED','FLAG_MOBIL','OCCUPATION_TYPE', 'CNT_FAM_MEMBERS','ORGANIZATION_TYPE', 
                     'NAME_TYPE_SUITE','NAME_FAMILY_STATUS', 'EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3')

In [12]:
dataset.show(5)

+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+----------------+--------------------+-----------------+--------------------------+----------+-------------+----------+---------------+---------------+--------------------+---------------+--------------------+-------------------+------------------+-------------------+
|TARGET|NAME_CONTRACT_TYPE|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|AMT_CREDIT|AMT_ANNUITY|AMT_GOODS_PRICE|NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_HOUSING_TYPE|REGION_POPULATION_RELATIVE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|   ORGANIZATION_TYPE|NAME_TYPE_SUITE|  NAME_FAMILY_STATUS|       EXT_SOURCE_1|      EXT_SOURCE_2|       EXT_SOURCE_3|
+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+----------------+--------------------+-------------

In [13]:
dataset.printSchema()

root
 |-- TARGET: integer (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)
 |-- ORGANIZATION_TYPE: string (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_FAMILY_ST

In [14]:
dataset.select('DAYS_BIRTH').show()

+----------+
|DAYS_BIRTH|
+----------+
|     -9461|
|    -16765|
|    -19046|
|    -19005|
|    -19932|
|    -16941|
|    -13778|
|    -18850|
|    -20099|
|    -14469|
|    -10197|
|    -20417|
|    -13439|
|    -14086|
|    -14583|
|     -8728|
|    -12931|
|     -9776|
|    -17718|
|    -11348|
+----------+
only showing top 20 rows



In [15]:
#The DAYS_BIRTH column, which indicates the customer's date of birth, has negative values, so we'll multiply it by -1 to make the values positive
dataset = dataset.withColumn("DAYS_BIRTH", f.col("DAYS_BIRTH") * -1)

##Label Encoding



In [16]:
#Selecting categorical binary columns
binary_cols = list()
output_cols = list()
for col in dataset.dtypes:
  if col[1] == 'string':
    if len(dataset.select(col[0]).distinct().collect()) == 2:
      print(col[0]+" , "+col[1])
      binary_cols.append(col[0])
      output_cols.append(col[0] + '_' + 'ENCODED')

NAME_CONTRACT_TYPE , string
FLAG_OWN_CAR , string
FLAG_OWN_REALTY , string


In [82]:
#Label encoding the categorical binary columns
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=binary_cols, outputCols=output_cols) 
dataset_indexed = indexer.fit(dataset).transform(dataset) 

In [18]:
dataset_indexed = dataset_indexed.drop(*binary_cols)

##One-hot Encoding

In [19]:
#Selecting categorical polytomous columns
poly_cols = list()
output_poly_cols = list()
output_onehot = list()
for col in dataset_indexed.dtypes:
  if col[1] == 'string':
    if len(dataset_indexed.select(col[0]).distinct().collect()) > 2:
      print(col[0]+" , "+col[1])
      poly_cols.append(col[0])
      output_poly_cols.append(col[0] + '_' + 'ENCODED')
      output_onehot.append(col[0] + '_' + 'ONEHOT')

CODE_GENDER , string
NAME_INCOME_TYPE , string
NAME_EDUCATION_TYPE , string
NAME_HOUSING_TYPE , string
OCCUPATION_TYPE , string
ORGANIZATION_TYPE , string
NAME_TYPE_SUITE , string
NAME_FAMILY_STATUS , string


In [20]:
#Checking null values in categorical polytomous columns
dataset_indexed.select([f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in dataset_indexed.columns]
   ).show()

+------+-----------+------------+----------------+----------+-----------+---------------+----------------+-------------------+-----------------+--------------------------+----------+-------------+----------+---------------+---------------+-----------------+---------------+------------------+------------+------------+------------+--------------------------+--------------------+-----------------------+
|TARGET|CODE_GENDER|CNT_CHILDREN|AMT_INCOME_TOTAL|AMT_CREDIT|AMT_ANNUITY|AMT_GOODS_PRICE|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_HOUSING_TYPE|REGION_POPULATION_RELATIVE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|ORGANIZATION_TYPE|NAME_TYPE_SUITE|NAME_FAMILY_STATUS|EXT_SOURCE_1|EXT_SOURCE_2|EXT_SOURCE_3|NAME_CONTRACT_TYPE_ENCODED|FLAG_OWN_CAR_ENCODED|FLAG_OWN_REALTY_ENCODED|
+------+-----------+------------+----------------+----------+-----------+---------------+----------------+-------------------+-----------------+--------------------------+----------+----------

In [21]:
#Dropping the null values in columns OCCUPATION_TYPE and NAME_TYPE_SUITE
#Column EXT_SOURCE_1 has too many null values, so let's drop it

dataset_indexed = dataset_indexed.na.drop(subset=["OCCUPATION_TYPE","NAME_TYPE_SUITE"])
dataset_indexed = dataset_indexed.drop("EXT_SOURCE_1")

In [83]:
#The first step of one-hot encoding in Spark is transform the categorical polytomous columns in numerical columns
#Then we can convert the resultant numerical columns into one-hot encoded columns

#First step: Label encoding the categorical polytomous columns

indexer = StringIndexer(inputCols=poly_cols, outputCols=output_poly_cols) 
dataset_indexed = indexer.fit(dataset_indexed).transform(dataset_indexed) 

In [84]:
#Second step: Convert the resultant numerical columns into one-hot encoded columns

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCols=output_poly_cols, outputCols=output_onehot)
df_onehot = encoder.fit(dataset_indexed).transform(dataset_indexed)

In [24]:
df_onehot = df_onehot.drop(*poly_cols).drop(*output_poly_cols)

In [25]:
df_onehot.printSchema()

root
 |-- TARGET: integer (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)
 |-- EXT_SOURCE_2: double (nullable = true)
 |-- EXT_SOURCE_3: double (nullable = true)
 |-- NAME_CONTRACT_TYPE_ENCODED: double (nullable = false)
 |-- FLAG_OWN_CAR_ENCODED: double (nullable = false)
 |-- FLAG_OWN_REALTY_ENCODED: double (nullable = false)
 |-- CODE_GENDER_ONEHOT: vector (nullable = true)
 |-- NAME_INCOME_TYPE_ONEHOT: vector (nullable = true)
 |-- NAME_EDUCATION_TYPE_ONEHOT: vector (nullable = true)
 |-- NAME_HOUSING_TYPE_ONEHOT: vector (nullable = true)
 |-- OCCUPATION_TYP

##Imputing Missing Values

In [26]:
from pyspark.ml.feature import Imputer

In [27]:
#Imputing missing values in columns AMT_ANNUITY, EXT_SOURCE_2 and EXT_SOURCE_3 using median values

imputer = Imputer(strategy='median')

imputer.setInputCols(["AMT_ANNUITY", "EXT_SOURCE_2", "EXT_SOURCE_3"])
imputer.setOutputCols(["AMT_ANNUITY_OUT", "EXT_SOURCE_2_OUT", "EXT_SOURCE_3_OUT"])

Imputer_69991882bc33

In [28]:
model = imputer.fit(df_onehot).transform(df_onehot)

In [29]:
model = model.drop("AMT_ANNUITY", "EXT_SOURCE_2", "EXT_SOURCE_3")

In [30]:
model.printSchema()

root
 |-- TARGET: integer (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)
 |-- NAME_CONTRACT_TYPE_ENCODED: double (nullable = false)
 |-- FLAG_OWN_CAR_ENCODED: double (nullable = false)
 |-- FLAG_OWN_REALTY_ENCODED: double (nullable = false)
 |-- CODE_GENDER_ONEHOT: vector (nullable = true)
 |-- NAME_INCOME_TYPE_ONEHOT: vector (nullable = true)
 |-- NAME_EDUCATION_TYPE_ONEHOT: vector (nullable = true)
 |-- NAME_HOUSING_TYPE_ONEHOT: vector (nullable = true)
 |-- OCCUPATION_TYPE_ONEHOT: vector (nullable = true)
 |-- ORGANIZATION_TYPE_ONEHOT: vector (nullable = true)
 |-- NAME_TYPE_SUITE_ONEHOT: vector (nul

##Creating the vector

In [31]:
#We need to transform the dataset into a single sparse vector in order to use Logistic Regression and Random Forest

from pyspark.ml.feature import VectorAssembler

In [32]:
model = model.withColumnRenamed('TARGET', 'label')

In [33]:
X = model.columns
X.remove('label')
X

['CNT_CHILDREN',
 'AMT_INCOME_TOTAL',
 'AMT_CREDIT',
 'AMT_GOODS_PRICE',
 'REGION_POPULATION_RELATIVE',
 'DAYS_BIRTH',
 'DAYS_EMPLOYED',
 'FLAG_MOBIL',
 'CNT_FAM_MEMBERS',
 'NAME_CONTRACT_TYPE_ENCODED',
 'FLAG_OWN_CAR_ENCODED',
 'FLAG_OWN_REALTY_ENCODED',
 'CODE_GENDER_ONEHOT',
 'NAME_INCOME_TYPE_ONEHOT',
 'NAME_EDUCATION_TYPE_ONEHOT',
 'NAME_HOUSING_TYPE_ONEHOT',
 'OCCUPATION_TYPE_ONEHOT',
 'ORGANIZATION_TYPE_ONEHOT',
 'NAME_TYPE_SUITE_ONEHOT',
 'NAME_FAMILY_STATUS_ONEHOT',
 'AMT_ANNUITY_OUT',
 'EXT_SOURCE_2_OUT',
 'EXT_SOURCE_3_OUT']

In [34]:
assembler = VectorAssembler(inputCols=X, outputCol='features')

In [35]:
dataset_prep = assembler.transform(model).select('features', 'label')

In [36]:
dataset_prep.show(10, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                                                                            |label|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(115,[1,2,3,4,5,6,7,8,13,14,20,24,29,46,102,109,112,113,114],[202500.0,406597.5,351000.0,0.018801,9461.0,-637.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,24700.5,0.2629485927471776,0.13937578009978951])                            |1    |
|(115,[1,2,3,4,5,6,7,8,11,12,16,21,24,31,52,103,108,

In [37]:
#Splitting the dataset into train and test

train, test = dataset_prep.randomSplit([0.7, 0.3])

In [38]:
train.count()

147269

In [39]:
test.count()

62944

##Logistic Regression

In [40]:
from pyspark.ml.classification import LogisticRegression

In [53]:
lr = LogisticRegression(threshold=0.15)

model_lr = lr.fit(train)
predictions_lr_test = model_lr.transform(test)
predictions_lr_test.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(115,[0,1,2,3,4,5...|    0|[4.06926086986920...|[0.98319714546013...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[3.60116012629837...|[0.97343302513155...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[3.14546131039739...|[0.95872951182987...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[3.74708412773923...|[0.97695707934645...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[3.03051431930395...|[0.95393377955126...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[2.63547122638438...|[0.93310985097400...|       0.0|
|(115,[0,1,2,3,4,5...|    1|[3.39879080261609...|[0.96766672351968...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[3.03155676784471...|[0.95397956736792...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[2.59150534919907...|[0.93031287340339...|       0.0|
|(115,[0,1,2,3,4

##Metrics

In [54]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [55]:
evaluator = MulticlassClassificationEvaluator()
print("Accuracy: %f" % evaluator.evaluate(predictions_lr_test, {evaluator.metricName: "accuracy"}))
print("Precision: %f" % evaluator.evaluate(predictions_lr_test, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
print("Recall: %f" % evaluator.evaluate(predictions_lr_test, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
print("F1: %f" % evaluator.evaluate(predictions_lr_test, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))

Accuracy: 0.827593
Precision: 0.233415
Recall: 0.401590
F1: 0.295233


In [56]:
#ROC CURVE
my_eval = BinaryClassificationEvaluator(labelCol='label')
print(my_eval.evaluate(predictions_lr_test))

0.7364931666486951


In [57]:
def confusion_matrix(df):
  tp = df.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count()
  tn = df.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 0)).count()
  fp = df.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 1)).count()
  fn = df.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 0)).count()

  print(' '*20, 'Predicted')
  print(' '*20, 'Defaulter', ' '*3 ,'Non defaulter')
  print(' '*4, 'Defaulter', ' '*6, int(tp), ' '*7, int(fn))
  print('Actual')
  print(' '*4, 'Non defaulter', ' '*2, int(fp), ' '*7, int(tn))

In [58]:
confusion_matrix(predictions_lr_test)

                     Predicted
                     Defaulter     Non defaulter
     Defaulter        2273         3387
Actual
     Non defaulter    7465         49819


##Random Forest


In [59]:
from pyspark.ml.classification import RandomForestClassifier

In [67]:
rfc = RandomForestClassifier().setThresholds([0.9,0.1])

In [68]:
model_rfc = rfc.fit(train)
predictions_rfc_test = model_rfc.transform(test)

In [69]:
predictions_rfc_test.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(115,[0,1,2,3,4,5...|    0|[18.3567116232080...|[0.91783558116040...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[18.3156218945254...|[0.91578109472627...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[18.3071018645747...|[0.91535509322873...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[18.3071018645747...|[0.91535509322873...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[18.2180497592217...|[0.91090248796108...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[17.8794815792501...|[0.89397407896250...|       1.0|
|(115,[0,1,2,3,4,5...|    1|[18.3699217990762...|[0.91849608995381...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[18.2502147393508...|[0.91251073696754...|       0.0|
|(115,[0,1,2,3,4,5...|    0|[18.2154600944029...|[0.91077300472014...|       0.0|
|(115,[0,1,2,3,4

In [70]:
#Get only the first element of column probability
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

firstelement=udf(lambda v:float(v[0]),FloatType())

predictions_rfc_test = predictions_rfc_test.withColumn("proba", firstelement('probability'))

predictions_rfc_test.show()

+--------------------+-----+--------------------+--------------------+----------+----------+
|            features|label|       rawPrediction|         probability|prediction|     proba|
+--------------------+-----+--------------------+--------------------+----------+----------+
|(115,[0,1,2,3,4,5...|    0|[18.3567116232080...|[0.91783558116040...|       0.0| 0.9178356|
|(115,[0,1,2,3,4,5...|    0|[18.3156218945254...|[0.91578109472627...|       0.0| 0.9157811|
|(115,[0,1,2,3,4,5...|    0|[18.3071018645747...|[0.91535509322873...|       0.0| 0.9153551|
|(115,[0,1,2,3,4,5...|    0|[18.3071018645747...|[0.91535509322873...|       0.0| 0.9153551|
|(115,[0,1,2,3,4,5...|    0|[18.2180497592217...|[0.91090248796108...|       0.0| 0.9109025|
|(115,[0,1,2,3,4,5...|    0|[17.8794815792501...|[0.89397407896250...|       1.0|0.89397407|
|(115,[0,1,2,3,4,5...|    1|[18.3699217990762...|[0.91849608995381...|       0.0| 0.9184961|
|(115,[0,1,2,3,4,5...|    0|[18.2502147393508...|[0.91251073696754...|

In [71]:
print("Accuracy: %f" % evaluator.evaluate(predictions_rfc_test, {evaluator.metricName: "accuracy"}))
print("Precision: %f" % evaluator.evaluate(predictions_rfc_test, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
print("Recall: %f" % evaluator.evaluate(predictions_rfc_test, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
print("F1: %f" % evaluator.evaluate(predictions_rfc_test, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))

Accuracy: 0.840255
Precision: 0.221730
Recall: 0.309364
F1: 0.258317


In [72]:
#ROC CURVE
my_eval = BinaryClassificationEvaluator(labelCol='label')
print(my_eval.evaluate(predictions_rfc_test))

0.6966032147063185


In [73]:
confusion_matrix(predictions_rfc_test)

                     Predicted
                     Defaulter     Non defaulter
     Defaulter        1751         3909
Actual
     Non defaulter    6146         51138
