In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (countDistinct, avg, stddev, corr, format_number,
                                   regexp_replace, dayofmonth, hour, dayofyear,month,
                                   year)

In [2]:
spark = SparkSession.builder.appName("credit").getOrCreate()

In [3]:
spark

In [4]:
# loading the dataset
df = spark.read.csv("application_record.csv", header = True, inferSchema = True)

In [5]:
df.show()

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|5008804|          M|           Y|              Y|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|    -12005|        -4542|         1

In [6]:
df.show(vertical=True)

-RECORD 0-----------------------------------
 ID                  | 5008804              
 CODE_GENDER         | M                    
 FLAG_OWN_CAR        | Y                    
 FLAG_OWN_REALTY     | Y                    
 CNT_CHILDREN        | 0                    
 AMT_INCOME_TOTAL    | 427500.0             
 NAME_INCOME_TYPE    | Working              
 NAME_EDUCATION_TYPE | Higher education     
 NAME_FAMILY_STATUS  | Civil marriage       
 NAME_HOUSING_TYPE   | Rented apartment     
 DAYS_BIRTH          | -12005               
 DAYS_EMPLOYED       | -4542                
 FLAG_MOBIL          | 1                    
 FLAG_WORK_PHONE     | 1                    
 FLAG_PHONE          | 0                    
 FLAG_EMAIL          | 0                    
 OCCUPATION_TYPE     | null                 
 CNT_FAM_MEMBERS     | 2.0                  
-RECORD 1-----------------------------------
 ID                  | 5008805              
 CODE_GENDER         | M                    
 FLAG_OWN_

In [7]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- FLAG_WORK_PHONE: integer (nullable = true)
 |-- FLAG_PHONE: integer (nullable = true)
 |-- FLAG_EMAIL: integer (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)



In [8]:
df.describe().show()

+-------+-----------------+-----------+------------+---------------+-------------------+------------------+--------------------+--------------------+------------------+-----------------+-------------------+------------------+----------+-------------------+-------------------+-------------------+--------------------+------------------+
|summary|               ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|       CNT_CHILDREN|  AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|         DAYS_BIRTH|     DAYS_EMPLOYED|FLAG_MOBIL|    FLAG_WORK_PHONE|         FLAG_PHONE|         FLAG_EMAIL|     OCCUPATION_TYPE|   CNT_FAM_MEMBERS|
+-------+-----------------+-----------+------------+---------------+-------------------+------------------+--------------------+--------------------+------------------+-----------------+-------------------+------------------+----------+-------------------+-------------------+-------------------+--------------------+---------

In [9]:
# creating a new column Age from DAYS_BIRTH
new_df = df.withColumn('AGE', df['DAYS_BIRTH'] / 365 * (-1))

In [10]:
new_df.select('AGE').show()

+------------------+
|               AGE|
+------------------+
| 32.89041095890411|
| 32.89041095890411|
| 58.83287671232877|
| 52.35616438356164|
| 52.35616438356164|
| 52.35616438356164|
| 52.35616438356164|
| 61.54520547945206|
| 61.54520547945206|
| 61.54520547945206|
|46.224657534246575|
|46.224657534246575|
|46.224657534246575|
|48.706849315068496|
|48.706849315068496|
|48.706849315068496|
|48.706849315068496|
|48.706849315068496|
|48.706849315068496|
| 29.23013698630137|
+------------------+
only showing top 20 rows



In [11]:
# creating a new column YEARS_WORKED from DAYS_EMPLOYED
new_df = new_df.withColumn('YEARS_WORKED', df['DAYS_EMPLOYED'] / 365 * (-1))

In [12]:
new_df.select('YEARS_WORKED').show()

+-------------------+
|       YEARS_WORKED|
+-------------------+
| 12.443835616438356|
| 12.443835616438356|
|  3.106849315068493|
|   8.35890410958904|
|   8.35890410958904|
|   8.35890410958904|
|   8.35890410958904|
|-1000.6657534246575|
|-1000.6657534246575|
|-1000.6657534246575|
|  2.106849315068493|
|  2.106849315068493|
|  2.106849315068493|
|  3.271232876712329|
|  3.271232876712329|
|  3.271232876712329|
|  3.271232876712329|
|  3.271232876712329|
|  3.271232876712329|
|  3.021917808219178|
+-------------------+
only showing top 20 rows



In [13]:
# replacing -1000 with 40 in. YEARS_WORKED
new_df = new_df.replace(-1000.6657534246575, 40, 'YEARS_WORKED')

In [14]:
new_df.select('YEARS_WORKED').show()

+------------------+
|      YEARS_WORKED|
+------------------+
|12.443835616438356|
|12.443835616438356|
| 3.106849315068493|
|  8.35890410958904|
|  8.35890410958904|
|  8.35890410958904|
|  8.35890410958904|
|              40.0|
|              40.0|
|              40.0|
| 2.106849315068493|
| 2.106849315068493|
| 2.106849315068493|
| 3.271232876712329|
| 3.271232876712329|
| 3.271232876712329|
| 3.271232876712329|
| 3.271232876712329|
| 3.271232876712329|
| 3.021917808219178|
+------------------+
only showing top 20 rows



In [15]:
new_df.groupBy('CODE_GENDER').count().show()

+-----------+------+
|CODE_GENDER| count|
+-----------+------+
|          F|294440|
|          M|144117|
+-----------+------+



In [16]:
new_df = new_df.replace(['F','M'],['0','1'],'CODE_GENDER')

In [17]:
new_df.select('CODE_GENDER').show()

+-----------+
|CODE_GENDER|
+-----------+
|          1|
|          1|
|          1|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          0|
+-----------+
only showing top 20 rows



In [18]:
new_df.groupBy('FLAG_OWN_CAR').count().show()

+------------+------+
|FLAG_OWN_CAR| count|
+------------+------+
|           Y|163098|
|           N|275459|
+------------+------+



In [19]:
new_df = new_df.replace(['Y','N'],['1','0'],'FLAG_OWN_CAR')

In [20]:
new_df.select('FLAG_OWN_CAR').show()

+------------+
|FLAG_OWN_CAR|
+------------+
|           1|
|           1|
|           1|
|           0|
|           0|
|           0|
|           0|
|           0|
|           0|
|           0|
|           1|
|           1|
|           1|
|           1|
|           1|
|           1|
|           1|
|           1|
|           1|
|           1|
+------------+
only showing top 20 rows



In [21]:
new_df.groupBy('FLAG_OWN_REALTY').count().show()

+---------------+------+
|FLAG_OWN_REALTY| count|
+---------------+------+
|              Y|304074|
|              N|134483|
+---------------+------+



In [22]:
new_df = new_df.replace(['Y','N'],['1','0'],'FLAG_OWN_REALTY')

In [23]:
new_df.select('FLAG_OWN_REALTY').show()

+---------------+
|FLAG_OWN_REALTY|
+---------------+
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              0|
+---------------+
only showing top 20 rows



In [24]:
new_df.groupBy('CNT_CHILDREN').count().show()

+------------+------+
|CNT_CHILDREN| count|
+------------+------+
|          12|     4|
|           1| 88527|
|           6|     4|
|           3|  5430|
|           5|   133|
|          19|     1|
|           9|     5|
|           4|   486|
|           7|     9|
|          14|     3|
|           2| 39884|
|           0|304071|
+------------+------+



In [25]:
new_df.groupBy('NAME_INCOME_TYPE').count().show()

+--------------------+------+
|    NAME_INCOME_TYPE| count|
+--------------------+------+
|             Student|    17|
|Commercial associate|100757|
|       State servant| 36186|
|             Working|226104|
|           Pensioner| 75493|
+--------------------+------+



In [26]:
new_df = new_df.replace(['Student','Commercial associate','State servant','Working','Pensioner'],
                        ['1','2','3','4','5'],'NAME_INCOME_TYPE')

new_df.select('NAME_INCOME_TYPE').show()

+----------------+
|NAME_INCOME_TYPE|
+----------------+
|               4|
|               4|
|               4|
|               2|
|               2|
|               2|
|               2|
|               5|
|               5|
|               5|
|               4|
|               4|
|               4|
|               2|
|               2|
|               2|
|               2|
|               2|
|               2|
|               4|
+----------------+
only showing top 20 rows



In [27]:
new_df.groupBy('NAME_EDUCATION_TYPE').count().show()

+--------------------+------+
| NAME_EDUCATION_TYPE| count|
+--------------------+------+
|     Academic degree|   312|
|   Incomplete higher| 14851|
|Secondary / secon...|301821|
|     Lower secondary|  4051|
|    Higher education|117522|
+--------------------+------+



In [28]:
new_df = new_df.replace(['Academic degree','Incomplete higher','Secondary / secondary special','Lower secondary','Higher education'],
                        ['1','2','3','4','5'],'NAME_EDUCATION_TYPE')

new_df.select('NAME_EDUCATION_TYPE').show()

+-------------------+
|NAME_EDUCATION_TYPE|
+-------------------+
|                  5|
|                  5|
|                  3|
|                  3|
|                  3|
|                  3|
|                  3|
|                  5|
|                  5|
|                  5|
|                  5|
|                  5|
|                  5|
|                  3|
|                  3|
|                  3|
|                  3|
|                  3|
|                  3|
|                  2|
+-------------------+
only showing top 20 rows



In [29]:
new_df.groupBy('NAME_FAMILY_STATUS').count().show()

+--------------------+------+
|  NAME_FAMILY_STATUS| count|
+--------------------+------+
|           Separated| 27251|
|             Married|299828|
|Single / not married| 55271|
|               Widow| 19675|
|      Civil marriage| 36532|
+--------------------+------+



In [30]:
new_df = new_df.replace(['Separated','Married','Single / not married','Widow','Civil marriage'],
                        ['1','2','3','4','5'],'NAME_FAMILY_STATUS')

new_df.select('NAME_FAMILY_STATUS').show()

+------------------+
|NAME_FAMILY_STATUS|
+------------------+
|                 5|
|                 5|
|                 2|
|                 3|
|                 3|
|                 3|
|                 3|
|                 1|
|                 1|
|                 1|
|                 2|
|                 2|
|                 2|
|                 2|
|                 2|
|                 2|
|                 2|
|                 2|
|                 2|
|                 2|
+------------------+
only showing top 20 rows



In [31]:
new_df.groupBy('NAME_HOUSING_TYPE').count().show()

+-------------------+------+
|  NAME_HOUSING_TYPE| count|
+-------------------+------+
|  House / apartment|393831|
|Municipal apartment| 14214|
|    Co-op apartment|  1539|
|   Rented apartment|  5974|
|   Office apartment|  3922|
|       With parents| 19077|
+-------------------+------+



In [32]:
new_df = new_df.replace(['House / apartment','Municipal apartment','Co-op apartment','Rented apartment','Office apartment','With parents'],
                        ['1','2','3','4','5','6'],'NAME_HOUSING_TYPE')

new_df.select('NAME_HOUSING_TYPE').show()

+-----------------+
|NAME_HOUSING_TYPE|
+-----------------+
|                4|
|                4|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
|                1|
+-----------------+
only showing top 20 rows



In [33]:
new_df.groupBy('NAME_HOUSING_TYPE').count().show()

+-----------------+------+
|NAME_HOUSING_TYPE| count|
+-----------------+------+
|                3|  1539|
|                5|  3922|
|                6| 19077|
|                1|393831|
|                4|  5974|
|                2| 14214|
+-----------------+------+



In [34]:
new_df.groupBy('OCCUPATION_TYPE').count().show()

+--------------------+------+
|     OCCUPATION_TYPE| count|
+--------------------+------+
|                null|134203|
|            Managers| 35487|
|            HR staff|   774|
|      Medicine staff| 13520|
|         Accountants| 15985|
|            Laborers| 78240|
|      Cleaning staff|  5845|
|Private service s...|  3456|
|             Drivers| 26090|
|         Sales staff| 41098|
|       Realty agents|  1041|
|            IT staff|   604|
|      Security staff|  7993|
|         Secretaries|  2044|
|  Low-skill Laborers|  2140|
|          Core staff| 43007|
|       Cooking staff|  8076|
|High skill tech s...| 17289|
|Waiters/barmen staff|  1665|
+--------------------+------+



In [35]:
new_df.groupBy('FLAG_MOBIL').count().show()

+----------+------+
|FLAG_MOBIL| count|
+----------+------+
|         1|438557|
+----------+------+



In [36]:
new_df.groupBy('FLAG_EMAIL').count().show()

+----------+------+
|FLAG_EMAIL| count|
+----------+------+
|         1| 47455|
|         0|391102|
+----------+------+



In [37]:
new_df.groupBy('FLAG_WORK_PHONE').count().show()

+---------------+------+
|FLAG_WORK_PHONE| count|
+---------------+------+
|              1| 90401|
|              0|348156|
+---------------+------+



In [38]:
new_df.groupBy('FLAG_PHONE').count().show()

+----------+------+
|FLAG_PHONE| count|
+----------+------+
|         1|126204|
|         0|312353|
+----------+------+



In [39]:
new_df.groupBy('CNT_FAM_MEMBERS').count().show()

+---------------+------+
|CNT_FAM_MEMBERS| count|
+---------------+------+
|            8.0|     4|
|            7.0|   124|
|            1.0| 84492|
|            4.0| 37356|
|           11.0|     5|
|           14.0|     4|
|            3.0| 77128|
|            2.0|233891|
|            6.0|   459|
|           20.0|     1|
|            5.0|  5081|
|           15.0|     3|
|            9.0|     9|
+---------------+------+



In [40]:
new_df = new_df.drop("OCCUPATION_TYPE")
new_df = new_df.drop("DAYS_BIRTH")
new_df = new_df.drop("DAYS_EMPLOYED")
new_df = new_df.drop("FLAG_MOBIL")
new_df = new_df.drop("FLAG_WORK_PHONE")
new_df = new_df.drop("FLAG_PHONE")
new_df = new_df.drop("FLAG_EMAIL")

In [41]:
new_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- YEARS_WORKED: double (nullable = true)



In [42]:
final_df = new_df

In [43]:
final_df = final_df.select(final_df['ID'].cast('integer'),
               final_df['CODE_GENDER'].cast('integer'),
               final_df['FLAG_OWN_CAR'].cast('integer'),
               final_df['FLAG_OWN_REALTY'].cast('integer'),
               final_df['CNT_CHILDREN'].cast('integer'),
               final_df['AMT_INCOME_TOTAL'].cast('float'),
               final_df['NAME_INCOME_TYPE'].cast('integer'),
               final_df['NAME_EDUCATION_TYPE'].cast('integer'),
               final_df['NAME_FAMILY_STATUS'].cast('integer'),
               final_df['NAME_HOUSING_TYPE'].cast('integer'),
               final_df['CNT_FAM_MEMBERS'].cast('integer'),
               final_df['AGE'].cast('integer'),
               final_df['YEARS_WORKED'].cast('integer'))

In [44]:
final_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: integer (nullable = true)
 |-- FLAG_OWN_CAR: integer (nullable = true)
 |-- FLAG_OWN_REALTY: integer (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: float (nullable = true)
 |-- NAME_INCOME_TYPE: integer (nullable = true)
 |-- NAME_EDUCATION_TYPE: integer (nullable = true)
 |-- NAME_FAMILY_STATUS: integer (nullable = true)
 |-- NAME_HOUSING_TYPE: integer (nullable = true)
 |-- CNT_FAM_MEMBERS: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- YEARS_WORKED: integer (nullable = true)



In [45]:
final_df.show(vertical = True )

-RECORD 0-----------------------
 ID                  | 5008804  
 CODE_GENDER         | 1        
 FLAG_OWN_CAR        | 1        
 FLAG_OWN_REALTY     | 1        
 CNT_CHILDREN        | 0        
 AMT_INCOME_TOTAL    | 427500.0 
 NAME_INCOME_TYPE    | 4        
 NAME_EDUCATION_TYPE | 5        
 NAME_FAMILY_STATUS  | 5        
 NAME_HOUSING_TYPE   | 4        
 CNT_FAM_MEMBERS     | 2        
 AGE                 | 32       
 YEARS_WORKED        | 12       
-RECORD 1-----------------------
 ID                  | 5008805  
 CODE_GENDER         | 1        
 FLAG_OWN_CAR        | 1        
 FLAG_OWN_REALTY     | 1        
 CNT_CHILDREN        | 0        
 AMT_INCOME_TOTAL    | 427500.0 
 NAME_INCOME_TYPE    | 4        
 NAME_EDUCATION_TYPE | 5        
 NAME_FAMILY_STATUS  | 5        
 NAME_HOUSING_TYPE   | 4        
 CNT_FAM_MEMBERS     | 2        
 AGE                 | 32       
 YEARS_WORKED        | 12       
-RECORD 2-----------------------
 ID                  | 5008806  
 CODE_GEND

# SECOND DATASET

In [46]:
sdf = spark.read.csv("credit_record.csv", header=True, inferSchema=True)

In [47]:
sdf.show()

+-------+--------------+------+
|     ID|MONTHS_BALANCE|STATUS|
+-------+--------------+------+
|5001711|             0|     X|
|5001711|            -1|     0|
|5001711|            -2|     0|
|5001711|            -3|     0|
|5001712|             0|     C|
|5001712|            -1|     C|
|5001712|            -2|     C|
|5001712|            -3|     C|
|5001712|            -4|     C|
|5001712|            -5|     C|
|5001712|            -6|     C|
|5001712|            -7|     C|
|5001712|            -8|     C|
|5001712|            -9|     0|
|5001712|           -10|     0|
|5001712|           -11|     0|
|5001712|           -12|     0|
|5001712|           -13|     0|
|5001712|           -14|     0|
|5001712|           -15|     0|
+-------+--------------+------+
only showing top 20 rows



In [48]:
sdf.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- MONTHS_BALANCE: integer (nullable = true)
 |-- STATUS: string (nullable = true)



In [49]:
sdf.describe().show()

+-------+------------------+-------------------+-------------------+
|summary|                ID|     MONTHS_BALANCE|             STATUS|
+-------+------------------+-------------------+-------------------+
|  count|           1048575|            1048575|            1048575|
|   mean| 5068286.424673486|-19.136998307226474|0.05824863961501482|
| stddev|46150.578505385296| 14.023497688326556|  0.394987864205249|
|    min|           5001711|                -60|                  0|
|    max|           5150487|                  0|                  X|
+-------+------------------+-------------------+-------------------+



In [50]:
sdf.groupBy('STATUS').count().show()

+------+------+
|STATUS| count|
+------+------+
|     3|   320|
|     0|383120|
|     5|  1693|
|     C|442031|
|     X|209230|
|     1| 11090|
|     4|   223|
|     2|   868|
+------+------+



In [51]:
# Making Status to binary classification by making Good clients = 0 and bad clients =1
sdf = sdf.replace(['C', 'X'],['0','0'],'STATUS')
sdf = sdf.replace(['2', '3', '4', '5'],['1', '1', '1', '1'],'STATUS')
sdf.select('STATUS').show()

+------+
|STATUS|
+------+
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
+------+
only showing top 20 rows



In [52]:
sdf = sdf.drop('MONTHS_BALANCE')

In [53]:
sdf.show()

+-------+------+
|     ID|STATUS|
+-------+------+
|5001711|     0|
|5001711|     0|
|5001711|     0|
|5001711|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
|5001712|     0|
+-------+------+
only showing top 20 rows



In [54]:
print((sdf.count(), len(sdf.columns)))

(1048575, 2)


In [55]:
sdf.groupBy('STATUS').count().show()

+------+-------+
|STATUS|  count|
+------+-------+
|     0|1034381|
|     1|  14194|
+------+-------+



In [56]:
sdf.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- STATUS: string (nullable = true)



In [57]:
sdf = sdf.select(sdf['ID'].cast('integer'),
    sdf['STATUS'].cast('integer'))

In [58]:
sdf.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- STATUS: integer (nullable = true)



# Merging Datasets

In [59]:
data = final_df.join(sdf, on=['ID'], how = 'inner')
data.show(vertical=True)

-RECORD 0-----------------------
 ID                  | 5008804  
 CODE_GENDER         | 1        
 FLAG_OWN_CAR        | 1        
 FLAG_OWN_REALTY     | 1        
 CNT_CHILDREN        | 0        
 AMT_INCOME_TOTAL    | 427500.0 
 NAME_INCOME_TYPE    | 4        
 NAME_EDUCATION_TYPE | 5        
 NAME_FAMILY_STATUS  | 5        
 NAME_HOUSING_TYPE   | 4        
 CNT_FAM_MEMBERS     | 2        
 AGE                 | 32       
 YEARS_WORKED        | 12       
 STATUS              | 0        
-RECORD 1-----------------------
 ID                  | 5008804  
 CODE_GENDER         | 1        
 FLAG_OWN_CAR        | 1        
 FLAG_OWN_REALTY     | 1        
 CNT_CHILDREN        | 0        
 AMT_INCOME_TOTAL    | 427500.0 
 NAME_INCOME_TYPE    | 4        
 NAME_EDUCATION_TYPE | 5        
 NAME_FAMILY_STATUS  | 5        
 NAME_HOUSING_TYPE   | 4        
 CNT_FAM_MEMBERS     | 2        
 AGE                 | 32       
 YEARS_WORKED        | 12       
 STATUS              | 0        
-RECORD 2-

In [60]:
print((data.count(), len(data.columns)))

(777715, 14)


In [61]:
data = data.drop_duplicates()
print((data.count(), len(data.columns)))

(40747, 14)


In [62]:
data.describe().show()

+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|summary|                ID|       CODE_GENDER|      FLAG_OWN_CAR|    FLAG_OWN_REALTY|      CNT_CHILDREN|  AMT_INCOME_TOTAL|  NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS| NAME_HOUSING_TYPE|   CNT_FAM_MEMBERS|               AGE|      YEARS_WORKED|             STATUS|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|  count|             40747|             40747|             40747|              40747|             40747|             40747|             40747|              40747|

In [63]:
data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: integer (nullable = true)
 |-- FLAG_OWN_CAR: integer (nullable = true)
 |-- FLAG_OWN_REALTY: integer (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: float (nullable = true)
 |-- NAME_INCOME_TYPE: integer (nullable = true)
 |-- NAME_EDUCATION_TYPE: integer (nullable = true)
 |-- NAME_FAMILY_STATUS: integer (nullable = true)
 |-- NAME_HOUSING_TYPE: integer (nullable = true)
 |-- CNT_FAM_MEMBERS: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- YEARS_WORKED: integer (nullable = true)
 |-- STATUS: integer (nullable = true)



In [64]:
# data.toPandas().to_csv("updated.csv")

# Machine Learning

In [65]:
data.columns

['ID',
 'CODE_GENDER',
 'FLAG_OWN_CAR',
 'FLAG_OWN_REALTY',
 'CNT_CHILDREN',
 'AMT_INCOME_TOTAL',
 'NAME_INCOME_TYPE',
 'NAME_EDUCATION_TYPE',
 'NAME_FAMILY_STATUS',
 'NAME_HOUSING_TYPE',
 'CNT_FAM_MEMBERS',
 'AGE',
 'YEARS_WORKED',
 'STATUS']

In [66]:
data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: integer (nullable = true)
 |-- FLAG_OWN_CAR: integer (nullable = true)
 |-- FLAG_OWN_REALTY: integer (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: float (nullable = true)
 |-- NAME_INCOME_TYPE: integer (nullable = true)
 |-- NAME_EDUCATION_TYPE: integer (nullable = true)
 |-- NAME_FAMILY_STATUS: integer (nullable = true)
 |-- NAME_HOUSING_TYPE: integer (nullable = true)
 |-- CNT_FAM_MEMBERS: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- YEARS_WORKED: integer (nullable = true)
 |-- STATUS: integer (nullable = true)



In [67]:
from pyspark.ml.feature import VectorAssembler

In [68]:
assembler = VectorAssembler (inputCols= ['CODE_GENDER','FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'CNT_CHILDREN',
                                         'AMT_INCOME_TOTAL', 'NAME_INCOME_TYPE', 'NAME_EDUCATION_TYPE',
                                         'NAME_FAMILY_STATUS', 'NAME_HOUSING_TYPE', 'CNT_FAM_MEMBERS',
                                         'AGE', 'YEARS_WORKED'],
                             outputCol= 'features')

In [69]:
output = assembler.transform(data)

In [70]:
final_data = output.select('features', 'STATUS')

In [71]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [72]:
final_data.groupBy('STATUS').count().show()

+------+-----+
|STATUS|count|
+------+-----+
|     1| 4291|
|     0|36456|
+------+-----+



In [73]:
(train, test) = final_data.randomSplit([0.7, 0.3])

train.show(5,False)
test.show(5,False)

+-------------------------------------------------------+------+
|features                                               |STATUS|
+-------------------------------------------------------+------+
|[0.0,0.0,0.0,0.0,45000.0,3.0,3.0,2.0,1.0,2.0,50.0,10.0]|0     |
|[0.0,0.0,0.0,0.0,67500.0,4.0,3.0,2.0,1.0,2.0,50.0,3.0] |0     |
|[0.0,0.0,0.0,0.0,67500.0,5.0,3.0,1.0,1.0,1.0,58.0,40.0]|1     |
|[0.0,0.0,0.0,0.0,76500.0,5.0,3.0,4.0,1.0,1.0,65.0,40.0]|0     |
|[0.0,0.0,0.0,0.0,90000.0,5.0,3.0,3.0,2.0,1.0,64.0,40.0]|0     |
+-------------------------------------------------------+------+
only showing top 5 rows

+--------------------------------------------------------+------+
|features                                                |STATUS|
+--------------------------------------------------------+------+
|[0.0,0.0,0.0,0.0,157500.0,5.0,3.0,2.0,1.0,2.0,55.0,40.0]|0     |
|[0.0,0.0,0.0,0.0,202500.0,2.0,2.0,3.0,1.0,1.0,58.0,0.0] |0     |
|[0.0,0.0,0.0,1.0,76500.0,4.0,3.0,2.0,1.0,3.0,27.0,2.0]  |0 

# Logistic Regression

In [74]:
from pyspark.ml.classification import LogisticRegression

In [75]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [76]:
lreg = LogisticRegression(featuresCol = 'features', labelCol = 'STATUS')
lregModel = lreg.fit(train) 

In [77]:
predictions = lregModel.transform(test)
predictions.select('features', 'STATUS', 'rawPrediction', 'prediction', 'probability').show(10)

+--------------------+------+--------------------+----------+--------------------+
|            features|STATUS|       rawPrediction|prediction|         probability|
+--------------------+------+--------------------+----------+--------------------+
|[0.0,0.0,0.0,0.0,...|     0|[2.13393727567106...|       0.0|[0.89415820805598...|
|[0.0,0.0,0.0,0.0,...|     0|[2.02171387478188...|       0.0|[0.88305811117260...|
|[0.0,0.0,0.0,1.0,...|     0|[2.02314938981491...|       0.0|[0.88320627026601...|
|[0.0,0.0,0.0,2.0,...|     0|[2.08758438674291...|       0.0|[0.88969057739383...|
|[0.0,0.0,1.0,0.0,...|     1|[2.39709261232921...|       0.0|[0.91660533181562...|
|[0.0,0.0,1.0,0.0,...|     0|[2.27226358190389...|       0.0|[0.90655372155000...|
|[0.0,0.0,1.0,0.0,...|     0|[2.15328489047525...|       0.0|[0.89597533875285...|
|[0.0,0.0,1.0,0.0,...|     0|[2.30063868342582...|       0.0|[0.90892992065843...|
|[0.0,0.0,1.0,0.0,...|     0|[2.32426314096299...|       0.0|[0.91086666583257...|
|[0.

In [78]:
# Evaluate model
evl = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = 'STATUS')
accuracy = evl.evaluate(predictions)
print("Accuracy =  ", accuracy)
print("Test Error = %g " % (1.0 - accuracy))

Accuracy =   0.5269756285671539
Test Error = 0.473024 


## DecisionTreeClassifier

In [79]:
from pyspark.ml.classification import DecisionTreeClassifier

In [80]:
dc = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'STATUS')
dcmodel = dc.fit(train)
predictions = dcmodel.transform(test)
predictions.select('features', 'STATUS', 'rawPrediction', 'prediction', 'probability').show(10)

+--------------------+------+----------------+----------+--------------------+
|            features|STATUS|   rawPrediction|prediction|         probability|
+--------------------+------+----------------+----------+--------------------+
|[0.0,0.0,0.0,0.0,...|     0|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,0.0,0.0,...|     0|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,0.0,1.0,...|     0|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,0.0,2.0,...|     0|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,1.0,0.0,...|     1|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,1.0,0.0,...|     0|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,1.0,0.0,...|     0|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,1.0,0.0,...|     0|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,1.0,0.0,...|     0|[25692.0,2992.0]|       0.0|[0.89569097754845...|
|[0.0,0.0,1.0,0.0,...|     0|[25692.0,2992.0]|      

In [81]:
# Evaluate model
evl = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = 'STATUS')
accuracy = evl.evaluate(predictions)
print("Accuracy =  ", accuracy)
print("Test Error = %g " % (1.0 - accuracy))

Accuracy =   0.5
Test Error = 0.5 


## RandomForestClassifier

In [82]:
from pyspark.ml.classification import RandomForestClassifier
rfrst = RandomForestClassifier(featuresCol = 'features', labelCol = 'STATUS')
rfrstModel = rfrst.fit(train)
predictions = rfrstModel.transform(test)
predictions.select('features', 'STATUS', 'rawPrediction', 'prediction', 'probability').show(10)

+--------------------+------+--------------------+----------+--------------------+
|            features|STATUS|       rawPrediction|prediction|         probability|
+--------------------+------+--------------------+----------+--------------------+
|[0.0,0.0,0.0,0.0,...|     0|[17.9949839358774...|       0.0|[0.89974919679387...|
|[0.0,0.0,0.0,0.0,...|     0|[17.7845216636363...|       0.0|[0.88922608318181...|
|[0.0,0.0,0.0,1.0,...|     0|[17.7028377655262...|       0.0|[0.88514188827631...|
|[0.0,0.0,0.0,2.0,...|     0|[17.9929882611810...|       0.0|[0.89964941305905...|
|[0.0,0.0,1.0,0.0,...|     1|[18.0699447280737...|       0.0|[0.90349723640368...|
|[0.0,0.0,1.0,0.0,...|     0|[17.8015003872434...|       0.0|[0.89007501936217...|
|[0.0,0.0,1.0,0.0,...|     0|[17.8508092037969...|       0.0|[0.89254046018984...|
|[0.0,0.0,1.0,0.0,...|     0|[18.0699447280737...|       0.0|[0.90349723640368...|
|[0.0,0.0,1.0,0.0,...|     0|[18.0467225298870...|       0.0|[0.90233612649435...|
|[0.

In [83]:
# Evaluate model
evl = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = 'STATUS')
accuracy = evl.evaluate(predictions)
print("Accuracy =  ", accuracy)
print("Test Error = %g " % (1.0 - accuracy))

Accuracy =   0.5345151946341824
Test Error = 0.465485 


## NaiveBayes

In [84]:
from pyspark.ml.classification import NaiveBayes
NB = NaiveBayes(featuresCol = 'features', labelCol = 'STATUS')
NBModel = NB.fit(train)
predictions = NBModel.transform(test)
predictions.select('features', 'STATUS', 'rawPrediction', 'prediction', 'probability').show(10)

+--------------------+------+--------------------+----------+--------------------+
|            features|STATUS|       rawPrediction|prediction|         probability|
+--------------------+------+--------------------+----------+--------------------+
|[0.0,0.0,0.0,0.0,...|     0|[-1049.1675229179...|       0.0|[0.99787355022894...|
|[0.0,0.0,0.0,0.0,...|     0|[-662.3888302716,...|       0.0|[0.80340354150288...|
|[0.0,0.0,0.0,1.0,...|     0|[-431.70580479543...|       0.0|[0.93264492518956...|
|[0.0,0.0,0.0,2.0,...|     0|[-537.33747423581...|       0.0|[0.96182830812134...|
|[0.0,0.0,1.0,0.0,...|     1|[-631.79040225493...|       0.0|[0.97718427460799...|
|[0.0,0.0,1.0,0.0,...|     0|[-582.50609573859...|       0.0|[0.95884995068855...|
|[0.0,0.0,1.0,0.0,...|     0|[-555.40063124321...|       0.0|[0.93158512722771...|
|[0.0,0.0,1.0,0.0,...|     0|[-558.29745069345...|       0.0|[0.90704575980158...|
|[0.0,0.0,1.0,0.0,...|     0|[-1164.8880204302...|       0.0|[0.99938824434019...|
|[0.

In [85]:
# Evaluate model
evl = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = 'STATUS')
accuracy = evl.evaluate(predictions)
print("Accuracy =  ", accuracy)
print("Test Error = %g " % (1.0 - accuracy))

Accuracy =   0.5142509860227502
Test Error = 0.485749 


## LinearSVC

In [86]:
from pyspark.ml.classification import LinearSVC
LSVC = LinearSVC(featuresCol = 'features', labelCol = 'STATUS')
LSVCMODEL = LSVC.fit(train)
predictions = LSVCMODEL.transform(test)
predictions.select('features', 'STATUS', 'rawPrediction', 'prediction').show(10)

+--------------------+------+--------------------+----------+
|            features|STATUS|       rawPrediction|prediction|
+--------------------+------+--------------------+----------+
|[0.0,0.0,0.0,0.0,...|     0|[1.04333809884336...|       0.0|
|[0.0,0.0,0.0,0.0,...|     0|[1.05928766898310...|       0.0|
|[0.0,0.0,0.0,1.0,...|     0|[1.03672497224395...|       0.0|
|[0.0,0.0,0.0,2.0,...|     0|[1.05766386091303...|       0.0|
|[0.0,0.0,1.0,0.0,...|     1|[1.15364625179383...|       0.0|
|[0.0,0.0,1.0,0.0,...|     0|[1.07757010379782...|       0.0|
|[0.0,0.0,1.0,0.0,...|     0|[1.04663184608456...|       0.0|
|[0.0,0.0,1.0,0.0,...|     0|[1.10317394888460...|       0.0|
|[0.0,0.0,1.0,0.0,...|     0|[1.03553594915269...|       0.0|
|[0.0,0.0,1.0,0.0,...|     0|[1.06834355905424...|       0.0|
+--------------------+------+--------------------+----------+
only showing top 10 rows



In [87]:
# Evaluate model
evl = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = 'STATUS')
accuracy = evl.evaluate(predictions)
print("Accuracy =  ", accuracy)
print("Test Error = %g " % (1.0 - accuracy))

Accuracy =   0.4982691141944081
Test Error = 0.501731 
