In [1]:
#installing pyspark
! pip install pyspark



In [2]:
from pyspark.sql import SparkSession

In [3]:
# Creating a new SparkSession named 'Project'.
spark=SparkSession.builder.appName('Project').getOrCreate()

In [4]:
spark

In [5]:
#Creating a PySpark dataframe ' df_application_record ' by reading a CSV file named 'application_record.csv' 
#The option('header', 'true') method specifies that the first row of the CSV file contains the column names.
#The inferSchema=True option specifies that Spark should attempt to infer the data types of each column.
df_application_record=spark.read.csv('dataset/application_record.csv', header = True , inferSchema = True)

In [6]:
# Printing the schema of dataframe df_application_record.
df_application_record.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- FLAG_WORK_PHONE: integer (nullable = true)
 |-- FLAG_PHONE: integer (nullable = true)
 |-- FLAG_EMAIL: integer (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)



In [7]:
# Displaying the df_application_record dataframe 
df_application_record.show()

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|5008804|          M|           Y|              Y|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|    -12005|        -4542|         1

In [8]:
# Displayimg the number of rows and column in df_application_record dataframe
num_rows1 = df_application_record.count()
num_cols1 = len(df_application_record.columns)
print("No. of Rows in application_record data frame:" ,num_rows1 )
print("No. of Columns in application_record data frame:" , num_cols1)

No. of Rows in application_record data frame: 438557
No. of Columns in application_record data frame: 18


In [10]:
#Creating a PySpark dataframe ' df_credit_record ' by reading a CSV file named 'credit_record.csv' 
#The option('header', 'true') method specifies that the first row of the CSV file contains the column names.
#The inferSchema=True option specifies that Spark should attempt to infer the data types of each column.
df_credit_record = spark.read.csv('dataset/credit_record.csv', header = True , inferSchema=True)

In [11]:
# Printing the schema of dataframe df_credit_record.
df_credit_record.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- MONTHS_BALANCE: integer (nullable = true)
 |-- STATUS: string (nullable = true)



In [12]:
# Displaying the df_credit_record dataframe 
df_credit_record.show()

+-------+--------------+------+
|     ID|MONTHS_BALANCE|STATUS|
+-------+--------------+------+
|5001711|             0|     X|
|5001711|            -1|     0|
|5001711|            -2|     0|
|5001711|            -3|     0|
|5001712|             0|     C|
|5001712|            -1|     C|
|5001712|            -2|     C|
|5001712|            -3|     C|
|5001712|            -4|     C|
|5001712|            -5|     C|
|5001712|            -6|     C|
|5001712|            -7|     C|
|5001712|            -8|     C|
|5001712|            -9|     0|
|5001712|           -10|     0|
|5001712|           -11|     0|
|5001712|           -12|     0|
|5001712|           -13|     0|
|5001712|           -14|     0|
|5001712|           -15|     0|
+-------+--------------+------+
only showing top 20 rows



In [13]:
# Displayimg the number of rows and column in df_credit_record dataframe
num_rows2 = df_credit_record.count()
num_cols2 = len(df_credit_record.columns)
print("No. of Rows in credit_record data frame: " ,num_rows2)
print("No. of Columns in credit_record data frame: " ,num_cols2)

No. of Rows in credit_record data frame:  1048575
No. of Columns in credit_record data frame:  3


In [14]:
# Mergeing both application_record and credit record on ID and creating new dataframe df_final
df_final= df_application_record.join(df_credit_record, on="ID")

In [15]:
# Displayimg the number of rows and column in df_final dataframe
num_rows3 = df_final.count()
num_cols3 = len(df_final.columns)
print("No. of Rows in final data frame:" ,num_rows3)
print("No. of Columns in final data frame:",num_cols3)

No. of Rows in final data frame: 777715
No. of Columns in final data frame: 20


In [16]:
# Checking Null values
from pyspark.sql.functions import col,sum
null_counts = df_final.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_final.columns))
null_counts.show()

+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
| ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|MONTHS_BALANCE|STATUS|
+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
|  0|          0|           0|              0|           0|               0|               0|                  0|                 0|                0|         0|  

In [17]:
# Removing Null values
df_final = df_final.dropna()

In [18]:
# Checking Null values again
null_counts2 = df_final.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_final.columns))
null_counts2.show()

+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
| ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|MONTHS_BALANCE|STATUS|
+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
|  0|          0|           0|              0|           0|               0|               0|                  0|                 0|                0|         0|  

In [19]:
# Displayimg the number of rows and column in df_final dataframe after removing Null values
num_rows4 = df_final.count()
num_cols4 = len(df_final.columns)
print("No. of Rows in final data frame after droping null value: ",num_rows4)
print("No. of Columns in final data frame after droping null value: " ,num_cols4)

No. of Rows in final data frame after droping null value:  537667
No. of Columns in final data frame after droping null value:  20


In [20]:
# Transforming Status column values
# 0: 1-29 days past due 
# 1: 30-59 days past due
# 2: 60-89 days overdue 
# 3: 90-119 days overdue
# 4: 120-149 days overdue 
# 5: Overdue or bad debts, write-offs for more than 150 days 
# C: paid off that month 
# X: No loan for the month
df_final.groupBy("STATUS").count().show()

+------+------+
|STATUS| count|
+------+------+
|     3|   181|
|     0|200930|
|     5|  1087|
|     C|226185|
|     X|102167|
|     1|  6423|
|     4|   152|
|     2|   542|
+------+------+



In [21]:
df_final.groupBy("STATUS").count().show()

+------+------+
|STATUS| count|
+------+------+
|     3|   181|
|     0|200930|
|     5|  1087|
|     C|226185|
|     X|102167|
|     1|  6423|
|     4|   152|
|     2|   542|
+------+------+



In [22]:
# replacing the status values as good (1) = 'X','C','0' & bad (0)= Others.
# replacing FLAG_OWN_CAR, FLAG_OWN_REALITY, CODE_GENDER value as 0 & 1
from pyspark.sql.functions import when
df_final = (df_final
            .withColumn("STATUS", when(df_final.STATUS.isin(['X','C']),1).otherwise(0))
            .withColumn('CODE_GENDER', when(df_final['CODE_GENDER'] == 'M', 0).otherwise(1))
            .withColumn('FLAG_OWN_CAR', when(df_final['FLAG_OWN_CAR'] == 'Y', 0).otherwise(1))
            .withColumn('FLAG_OWN_REALTY', when(df_final['FLAG_OWN_REALTY'] == 'Y', 0).otherwise(1))
           )

In [23]:
df_final.groupBy("STATUS").count().show()

+------+------+
|STATUS| count|
+------+------+
|     1|328352|
|     0|209315|
+------+------+



In [24]:
#checking result on first 10 rows of dataframe
df_final.show(10)

+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|MONTHS_BALANCE|STATUS|
+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
|5008806|          0|           0|              0|           0|        112500.0|         Working|Secondary / secon...|           Married|House / apa

In [25]:
# taking a random sample of 1% of the rows in df_final and then the first 10 rows of that sample will be displayed using the show() method.
df_final.sample(fraction=0.01).show(10)

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-------------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|  NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|MONTHS_BALANCE|STATUS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-------------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
|5008930|          1|           1|              0|           0|        297000.0|Commercial associate|Secondary / secon...|Si

In [26]:
#counting the unique values of column NAME_INCOME_TYPE
from pyspark.sql.functions import count
df_final.groupBy('NAME_INCOME_TYPE').agg(count('*').alias('count')).show()

+--------------------+------+
|    NAME_INCOME_TYPE| count|
+--------------------+------+
|             Student|   322|
|Commercial associate|151412|
|       State servant| 52733|
|             Working|332868|
|           Pensioner|   332|
+--------------------+------+



In [27]:
# Merging the columns Student and Pensioner into new column - Others
df_final = df_final.withColumn('NAME_INCOME_TYPE', when(col('NAME_INCOME_TYPE').isin(['Working', 'Commercial associate', 'State servant']), col('NAME_INCOME_TYPE')).otherwise('others'))

In [28]:
df_final.groupBy('NAME_INCOME_TYPE').agg(count('*').alias('count')).show()

+--------------------+------+
|    NAME_INCOME_TYPE| count|
+--------------------+------+
|Commercial associate|151412|
|              others|   654|
|       State servant| 52733|
|             Working|332868|
+--------------------+------+



In [29]:
#counting the unique values of column NAME_EDUCATION_TYPE
df_final.groupBy('NAME_EDUCATION_TYPE').agg(count('*').alias('count')).show()

+--------------------+------+
| NAME_EDUCATION_TYPE| count|
+--------------------+------+
|   Incomplete higher| 20590|
|Secondary / secon...|358317|
|     Lower secondary|  4556|
|    Higher education|153770|
|     Academic degree|   434|
+--------------------+------+



In [30]:
from pyspark.sql.functions import split
df_final = df_final.withColumn('NAME_EDUCATION_TYPE', split(col('NAME_EDUCATION_TYPE'), '/').getItem(0))

In [31]:
df_final.groupBy('NAME_EDUCATION_TYPE').agg(count('*').alias('count')).show()

+-------------------+------+
|NAME_EDUCATION_TYPE| count|
+-------------------+------+
|  Incomplete higher| 20590|
|    Lower secondary|  4556|
|         Secondary |358317|
|   Higher education|153770|
|    Academic degree|   434|
+-------------------+------+



In [32]:
#counting the unique values of column NAME_FAMILY_STATUS
df_final.groupBy('NAME_FAMILY_STATUS').agg(count('*').alias('count')).show()
#df_final.groupBy("NAME_FAMILY_STATUS").count().show()

+--------------------+------+
|  NAME_FAMILY_STATUS| count|
+--------------------+------+
|           Separated| 31394|
|             Married|384003|
|Single / not married| 65944|
|               Widow| 12243|
|      Civil marriage| 44083|
+--------------------+------+



In [33]:
df_final = df_final.withColumn("NAME_FAMILY_STATUS", split(col("NAME_FAMILY_STATUS"), "/").getItem(0))

In [34]:
df_final.groupBy("NAME_FAMILY_STATUS").count().show()

+------------------+------+
|NAME_FAMILY_STATUS| count|
+------------------+------+
|           Single | 65944|
|         Separated| 31394|
|           Married|384003|
|             Widow| 12243|
|    Civil marriage| 44083|
+------------------+------+



In [35]:
#counting the unique values of column NAME_HOUSING_TYPE
df_final.groupBy("NAME_HOUSING_TYPE").count().show()

+-------------------+------+
|  NAME_HOUSING_TYPE| count|
+-------------------+------+
|  House / apartment|474177|
|Municipal apartment| 18023|
|    Co-op apartment|  3396|
|   Rented apartment|  8561|
|   Office apartment|  4159|
|       With parents| 29351|
+-------------------+------+



In [36]:
df_final = df_final.withColumn("NAME_HOUSING_TYPE", split(col("NAME_HOUSING_TYPE"), "/").getItem(0))

In [37]:
df_final.groupBy("NAME_HOUSING_TYPE").count().show()

+-------------------+------+
|  NAME_HOUSING_TYPE| count|
+-------------------+------+
|             House |474177|
|Municipal apartment| 18023|
|    Co-op apartment|  3396|
|   Rented apartment|  8561|
|   Office apartment|  4159|
|       With parents| 29351|
+-------------------+------+



In [38]:
#checking the result on random sample from dataframe
df_final.sample(fraction=0.01).show(100)

+-------+-----------+------------+---------------+------------+----------------+--------------------+-------------------+------------------+-------------------+----------+-------------+----------+---------------+----------+----------+--------------------+---------------+--------------+------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|  NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|     OCCUPATION_TYPE|CNT_FAM_MEMBERS|MONTHS_BALANCE|STATUS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+-------------------+------------------+-------------------+----------+-------------+----------+---------------+----------+----------+--------------------+---------------+--------------+------+
|5008895|          1|           1|              0|           0|        297000.0|Commercial associate|         Secondar

In [39]:
#rounding off the values in column AMT_INCOME_TOTAL
df_final = df_final.withColumn('AMT_INCOME_TOTAL', col('AMT_INCOME_TOTAL')/1000)
df_final = df_final.withColumn('AMT_INCOME_TOTAL', col('AMT_INCOME_TOTAL').cast('integer'))
df_final.show(5)

+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|MONTHS_BALANCE|STATUS|
+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+
|5008806|          0|           0|              0|           0|             112|         Working|         Secondary |           Married|           Hous

In [40]:
#converting days into years in column DAYS_BIRTH and DAYS_EMPLOYED and assigning them into new columns Age and employee_from_years 
from pyspark.sql.functions import round
df_final = df_final.withColumn('Age', round(-(col('DAYS_BIRTH')/365)))
df_final = df_final.withColumn('employee_from_years', round(-(col('DAYS_EMPLOYED')/365)))

# Drop the column 'DAYS_BIRTH' & 'DAYS_EMPLOYED'.
df_final = df_final.drop('DAYS_BIRTH', 'DAYS_EMPLOYED')
df_final.show(5)

+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+----+-------------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|MONTHS_BALANCE|STATUS| Age|employee_from_years|
+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+--------------+------+----+-------------------+
|5008806|          0|           0|              0|           0|             112|         Working|         Secondary |           Married|           Hous

In [41]:
df_final.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: integer (nullable = false)
 |-- FLAG_OWN_CAR: integer (nullable = false)
 |-- FLAG_OWN_REALTY: integer (nullable = false)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: integer (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- FLAG_WORK_PHONE: integer (nullable = true)
 |-- FLAG_PHONE: integer (nullable = true)
 |-- FLAG_EMAIL: integer (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)
 |-- MONTHS_BALANCE: integer (nullable = true)
 |-- STATUS: integer (nullable = false)
 |-- Age: double (nullable = true)
 |-- employee_from_years: double (nullable = true)



In [44]:
#Dropping columns ID , CNT_CHILDREN , FLAG_MOBIL , FLAG_WORK_PHONE , FLAG_EMAIL 
df_final = df_final.drop('ID', 'CNT_CHILDREN', 'FLAG_MOBIL', 'FLAG_WORK_PHONE', 'FLAG_EMAIL')
df_final.printSchema()

root
 |-- CODE_GENDER: integer (nullable = false)
 |-- FLAG_OWN_CAR: integer (nullable = false)
 |-- FLAG_OWN_REALTY: integer (nullable = false)
 |-- AMT_INCOME_TOTAL: integer (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- FLAG_PHONE: integer (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)
 |-- MONTHS_BALANCE: integer (nullable = true)
 |-- STATUS: integer (nullable = false)
 |-- Age: double (nullable = true)
 |-- employee_from_years: double (nullable = true)



In [53]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
df_pandas = df_final.toPandas()
plt.figure(figsize = (15,10))
sns.heatmap(df_pandas.corr(), annot = True )
plt.title('Correlation between the Features', size = 15)
plt.show()

ValueError: could not convert string to float: 'Working'

<Figure size 1500x1000 with 0 Axes>

In [54]:
#df_pandas.head()
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 537667 entries, 0 to 537666
Data columns (total 15 columns):
 #   Column               Non-Null Count   Dtype  
---  ------               --------------   -----  
 0   CODE_GENDER          537667 non-null  int32  
 1   FLAG_OWN_CAR         537667 non-null  int32  
 2   FLAG_OWN_REALTY      537667 non-null  int32  
 3   AMT_INCOME_TOTAL     537667 non-null  int32  
 4   NAME_INCOME_TYPE     537667 non-null  object 
 5   NAME_EDUCATION_TYPE  537667 non-null  object 
 6   NAME_FAMILY_STATUS   537667 non-null  object 
 7   NAME_HOUSING_TYPE    537667 non-null  object 
 8   FLAG_PHONE           537667 non-null  int32  
 9   OCCUPATION_TYPE      537667 non-null  object 
 10  CNT_FAM_MEMBERS      537667 non-null  float64
 11  MONTHS_BALANCE       537667 non-null  int32  
 12  STATUS               537667 non-null  int32  
 13  Age                  537667 non-null  float64
 14  employee_from_years  537667 non-null  float64
dtypes: float64(3), in

In [55]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
def one_hot_encode(df, columns):
    for col_name in columns:
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index")
        df = indexer.fit(df).transform(df)

        encoder = OneHotEncoder(inputCols=[f"{col_name}_index"], outputCols=[f"{col_name}_one_hot"], dropLast=True)
        df = encoder.fit(df).transform(df)

        df = df.drop(col_name).drop(f"{col_name}_index")

    return df

In [56]:
categories=['NAME_INCOME_TYPE','NAME_EDUCATION_TYPE','NAME_FAMILY_STATUS','NAME_HOUSING_TYPE','OCCUPATION_TYPE']
final_df = one_hot_encode(df_final, categories)
final_df.show(5)

+-----------+------------+---------------+----------------+----------+---------------+--------------+------+----+-------------------+------------------------+---------------------------+--------------------------+-------------------------+-----------------------+
|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|AMT_INCOME_TOTAL|FLAG_PHONE|CNT_FAM_MEMBERS|MONTHS_BALANCE|STATUS| Age|employee_from_years|NAME_INCOME_TYPE_one_hot|NAME_EDUCATION_TYPE_one_hot|NAME_FAMILY_STATUS_one_hot|NAME_HOUSING_TYPE_one_hot|OCCUPATION_TYPE_one_hot|
+-----------+------------+---------------+----------------+----------+---------------+--------------+------+----+-------------------+------------------------+---------------------------+--------------------------+-------------------------+-----------------------+
|          0|           0|              0|             112|         0|            2.0|             0|     1|59.0|                3.0|           (3,[0],[1.0])|              (4,[0],[1.0])|             (4,[0],[1

In [None]:
# from sklearn.model_selection import train_test_split
# x_train, x_test, y_train, y_test=train_test_split(X,y,test_size=0.25,random_state=50)

In [57]:
n_rows = final_df.count()
n_cols = len(final_df.columns)
print("No. of Rows in Final data frame:")
print(n_rows)
print("No. of Columns in Final data frame:")
print(n_cols)

No. of Rows in Final data frame:
537667
No. of Columns in Final data frame:
15


In [58]:
from pyspark.sql.functions import countDistinct
unique_count1 = final_df.select(countDistinct('CODE_GENDER')).collect()[0][0]
unique_count2 = final_df.select(countDistinct('STATUS')).collect()[0][0]
unique_count3 = final_df.select(countDistinct('FLAG_OWN_CAR')).collect()[0][0]
unique_count4 = final_df.select(countDistinct('FLAG_OWN_REALTY')).collect()[0][0]
print("Number of unique values in 'CODE_GENDER','FLAG_OWN_CAR','FLAG_OWN_REALTY','STATUS' is" , unique_count1 ,unique_count2 ,unique_count3 ,unique_count4)

Number of unique values in 'CODE_GENDER','FLAG_OWN_CAR','FLAG_OWN_REALTY','STATUS' is 2 2 2 2


**Logistic Regression**

In [59]:
# Logistic Regression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Combine all input features into a single vector
assembler = VectorAssembler(inputCols=[c for c in final_df.columns if c != 'STATUS'],outputCol='features')
data = assembler.transform(final_df)

# Split data into train and test sets
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=2)

# Define the logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='STATUS')

# Train the model
lr_model = lr.fit(train_data)

# Make predictions on test data
predictions = lr_model.transform(test_data)

# Evaluate the model's performance on test data
evaluator = BinaryClassificationEvaluator(labelCol='STATUS')
auc = evaluator.evaluate(predictions)

print(f"Test AUC: {auc:.4f}")

Test AUC: 0.5851


**Random Forest Classifier**

In [60]:
# Random Forest classifier
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Combine all input features into a single vector
assembler = VectorAssembler(inputCols=[c for c in final_df.columns if c != 'STATUS'], outputCol='features')
data = assembler.transform(final_df)

# Scale the features
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
scalerModel = scaler.fit(data)
data = scalerModel.transform(data)

# Split data into train and test sets
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=2)

# Define the Random Forest classifier model
rf = RandomForestClassifier(featuresCol='scaledFeatures', labelCol='STATUS')

# Train the model
rf_model = rf.fit(train_data)

# Make predictions on test data
predictions = rf_model.transform(test_data)

# Evaluate the model's performance on test data
evaluator = MulticlassClassificationEvaluator(labelCol='STATUS', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

print(f"Test accuracy: {accuracy:.4f}")

Test accuracy: 0.6117


**Support Vector machine**

In [61]:
# Support Vector machine
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

# Combine all input features into a single vector
assembler = VectorAssembler(inputCols=[c for c in final_df.columns if c != 'STATUS'],
                            outputCol='features')
data = assembler.transform(final_df)

# Split data into train and test sets
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=2)

# Define the SVM model
svm = LinearSVC(featuresCol='features', labelCol='STATUS', maxIter=10, regParam=0.1)

# Train the model
svm_model = svm.fit(train_data)

# Make predictions on test data
predictions = svm_model.transform(test_data)

# Evaluate the model's performance on test data
evaluator = BinaryClassificationEvaluator(labelCol='STATUS')
auc = evaluator.evaluate(predictions)

print(f"Test AUC: {auc:.4f}")

Test AUC: 0.5708


**Decision Tree Classifier**

In [62]:
# Decision Tree Classifier
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Combine all input features into a single vector
assembler = VectorAssembler(inputCols=[c for c in final_df.columns if c != 'STATUS'], outputCol='features')
data = assembler.transform(final_df)

# Scale the features
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
scalerModel = scaler.fit(data)
data = scalerModel.transform(data)

# Split data into train and test sets
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=2)

# Define the Decision Tree Classifier model
dt = DecisionTreeClassifier(featuresCol='scaledFeatures', labelCol='STATUS')

# Train the model
dt_model = dt.fit(train_data)

# Make predictions on test data
predictions = dt_model.transform(test_data)

# Evaluate the model's performance on test data
evaluator = MulticlassClassificationEvaluator(labelCol='STATUS', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

print(f"Test accuracy: {accuracy:.4f}")



# Evaluate the model's performance on test data
evaluator = BinaryClassificationEvaluator(labelCol='STATUS')
auc = evaluator.evaluate(predictions)

Test accuracy: 0.6254
