# Data Cleaning

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('DataCleaning').getOrCreate()

# Read Data. 
The data is of accepted applicants for loan from year 2007 to 2019 third quarter of Lending Club from Kaggle.

In [3]:
applicants_data = spark.read.csv('Data/appl_accepted_20072019Q3.csv', header = True, inferSchema = True)
applicants_data.printSchema()

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: double (nullable = true)
 |-- funded_amnt: double (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string 

In [4]:
# total rows
total_rows = applicants_data.count()

In [5]:
print(f'Number of Columns: {len(applicants_data.columns)}')
print(f'Number of Rows: {total_rows}')

Number of Columns: 150
Number of Rows: 2650550


# Check Null Values in Columns

Finding what percentage of each columns are null values, allows us to safely remove the columns.

In [6]:
import pyspark.sql.functions as f

In [7]:
data_null = applicants_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in applicants_data.columns[:]])

Percentage of Null Values in each Columns:

In [8]:
null_values = data_null.head(1)[0]
null_values_dict = null_values.asDict()

In [9]:
for i in null_values_dict.keys():
    null_values_dict[i] = (null_values_dict[i]*100 // total_rows)
print('Percentage of columns have null value: ')
null_values_dict

Percentage of columns have null value: 


{'id': 0,
 'member_id': 100,
 'loan_amnt': 0,
 'funded_amnt': 0,
 'funded_amnt_inv': 0,
 'term': 0,
 'int_rate': 0,
 'installment': 0,
 'grade': 0,
 'sub_grade': 0,
 'emp_title': 8,
 'emp_length': 6,
 'home_ownership': 0,
 'annual_inc': 0,
 'verification_status': 0,
 'issue_d': 0,
 'loan_status': 0,
 'pymnt_plan': 0,
 'url': 0,
 'desc': 95,
 'purpose': 0,
 'title': 0,
 'zip_code': 0,
 'addr_state': 0,
 'dti': 0,
 'delinq_2yrs': 0,
 'earliest_cr_line': 0,
 'fico_range_low': 0,
 'fico_range_high': 0,
 'inq_last_6mths': 0,
 'mths_since_last_delinq': 52,
 'mths_since_last_record': 84,
 'open_acc': 0,
 'pub_rec': 0,
 'revol_bal': 0,
 'revol_util': 0,
 'total_acc': 0,
 'initial_list_status': 0,
 'out_prncp': 0,
 'out_prncp_inv': 0,
 'total_pymnt': 0,
 'total_pymnt_inv': 0,
 'total_rec_prncp': 0,
 'total_rec_int': 0,
 'total_rec_late_fee': 0,
 'recoveries': 0,
 'collection_recovery_fee': 0,
 'last_pymnt_d': 0,
 'last_pymnt_amnt': 0,
 'next_pymnt_d': 52,
 'last_credit_pull_d': 0,
 'last_fico_r

## Remove columns

Columns that have null values greater than 25% can be safely removed

### Analysing Columns to remove :

* The 'member_id' does not have any value. So we can remove it. We can also remove 'id' as is in an indexer and all the values are unique.
* Columns like 'url', 'desc', 'purpose', 'title' can also be removed as this can be arbitary values.
* Removing cloumns related to joint account, hardship plans, secondary applicant as we want to address only individual applicants 
* Also removing some unwanted columns or columns that have redundant data

In [10]:
columns_need = ['loan_amnt',
                'funded_amnt', 'funded_amnt_inv',
                'term', 'int_rate',
                'installment', 'grade',
                'emp_length', 'home_ownership',
                'annual_inc', 'verification_status', 
                'loan_status', 'pymnt_plan',
                'addr_state', 'dti',
                'delinq_2yrs', 'fico_range_low',
                'fico_range_high', 'pub_rec',
                'revol_bal', 'revol_util',
                'initial_list_status', 
                'total_pymnt', 'total_pymnt_inv',
                'total_rec_prncp', 'total_rec_int',
                'policy_code', 'application_type',
                'acc_now_delinq',
                'tot_cur_bal', 'total_rev_hi_lim',
                'avg_cur_bal', 'bc_open_to_buy',
                'bc_util', 'chargeoff_within_12_mths', 
                'delinq_amnt', 'mort_acc',
                'num_accts_ever_120_pd',
                'num_actv_bc_tl', 'num_actv_rev_tl',
                'num_bc_sats', 'num_bc_tl',
                'num_il_tl', 'num_op_rev_tl',
                'num_rev_accts', 'num_rev_tl_bal_gt_0',
                'num_sats', 'pct_tl_nvr_dlq',
                'pub_rec_bankruptcies',
                'tax_liens', 'tot_hi_cred_lim', 
                'total_bal_ex_mort', 'total_bc_limit',
                'total_il_high_credit_limit'
]

print(f'Columns needed: {len(columns_need)}')

Columns needed: 54


In [11]:
data = applicants_data.select(columns_need)
print(f'Number of Columns left: {len(data.columns)}')

Number of Columns left: 54


# Analysing Data

Check which columns needs to clean and what data type the columns need to be converted.

In [12]:
from pyspark.sql.functions import desc

In [13]:
data.printSchema()

root
 |-- loan_amnt: double (nullable = true)
 |-- funded_amnt: double (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- fico_range_low: string (nullable = true)
 |-- fico_range_high: string (nullable = true)
 |-- pub_rec: string (nullable = true)
 |-- revol_bal: string (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- initial_list_status: string (nullable = true)
 |-- total_pymnt: string (nullable = true)
 |-- total_py

In [14]:
# Check if the following columns are within valid range.

data.describe(['loan_amnt', 'funded_amnt', 'funded_amnt_inv', 'int_rate', 'installment']).show()

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|         loan_amnt|       funded_amnt|   funded_amnt_inv|          int_rate|       installment|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|           2650517|           2650517|           2650517|           2650517|           2650517|
|   mean|15265.709010732624|15261.216556996238|15245.323552900643|13.092829115115583|450.39694753891115|
| stddev| 9389.330622863554|   9387.9057302899|  9391.76261888991| 4.832138364571083|271.04262288987746|
|    min|             500.0|             500.0|               0.0|              10.0|              4.93|
|    max|           40000.0|           40000.0|           40000.0|              9.99|           1719.83|
+-------+------------------+------------------+------------------+------------------+------------------+



In [15]:
# term
data.select('term').distinct().show()

+----------+
|      term|
+----------+
| 36 months|
|      null|
| 60 months|
+----------+



In [16]:
# grade
data.select('grade').distinct().show()

+-----+
|grade|
+-----+
|    F|
| null|
|    E|
|    B|
|    D|
|    C|
|    A|
|    G|
+-----+



In [17]:
# emp_length
data.groupBy('emp_length').count().show()

+----------+------+
|emp_length| count|
+----------+------+
|   5 years|164437|
|   9 years| 88764|
| reactors"|     1|
|      null|181960|
|    1 year|175093|
|   2 years|237167|
|   7 years|106311|
|   8 years|104055|
|   4 years|159896|
|   6 years|118842|
|   3 years|210773|
| 10+ years|865494|
|  < 1 year|237757|
+----------+------+



In [18]:
# home_ownership
data.groupBy('home_ownership').count().show()

+--------------+-------+
|home_ownership|  count|
+--------------+-------+
|          null|     33|
|           OWN| 298621|
|          RENT|1044044|
|       2 years|      1|
|      MORTGAGE|1304266|
|           ANY|   3348|
|         OTHER|    182|
|          NONE|     55|
+--------------+-------+



In [19]:
# annual_inc
data.select('annual_inc').orderBy(desc('annual_inc')).show() # need to convert in double

+----------+
|annual_inc|
+----------+
|  MORTGAGE|
|  999999.0|
|   99999.9|
|  99999.85|
|  99999.84|
|  99999.84|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
|   99999.0|
+----------+
only showing top 20 rows



The 'annual_inc' columns contain some arbitrary string values that need to be converted into double data type.


In [20]:
# verification_status
data.select('verification_status').distinct().show()

+-------------------+
|verification_status|
+-------------------+
|            38000.0|
|           Verified|
|               null|
|    Source Verified|
|       Not Verified|
+-------------------+



In [21]:
# loan_status
data.select('loan_status').distinct().show()

+--------------------+
|         loan_status|
+--------------------+
|          Fully Paid|
|             Default|
|                null|
|     In Grace Period|
|Does not meet the...|
|         Charged Off|
|            Oct-2015|
|  Late (31-120 days)|
|             Current|
|Does not meet the...|
|   Late (16-30 days)|
+--------------------+



In [22]:
# pymnt_plan
data.select('pymnt_plan').distinct().show()

+-----------+
| pymnt_plan|
+-----------+
|       null|
|          n|
|Charged Off|
|          y|
+-----------+



In [23]:
# addr_state
data.select('addr_state').distinct().count()

271

The 'addr_state' column contains some arbitrary values because the number if states is more than the valid states in US.

In [24]:
# Specifing valid USA states
states = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA',
          'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
          'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT',
          'VA', 'WA', 'WV', 'WI', 'WY', 'DC']
data = data.filter(data['addr_state'].isin(states))

In [25]:
# dti
data.describe('dti').distinct().orderBy(desc('dti')).show() # need to convert to double

+-------+----------------+
|summary|             dti|
+-------+----------------+
|    max|           999.0|
|  count|         2647636|
|   mean|19.0949296126808|
| stddev|15.2492954899586|
|    min|            -1.0|
+-------+----------------+



In [26]:
data.describe(['dti', 'delinq_2yrs']).show()

+-------+----------------+-------------------+
|summary|             dti|        delinq_2yrs|
+-------+----------------+-------------------+
|  count|         2647636|            2650232|
|   mean|19.0949296126808|0.29613030104534244|
| stddev|15.2492954899586| 0.8534141318110475|
|    min|            -1.0|                0.0|
|    max|           999.0|                9.0|
+-------+----------------+-------------------+



'dti' cannot be negative so filtering out negative values


In [27]:
data = data.filter(data['dti'] > 0)  # filter out negative dti values

In [28]:
# fico_range_low, fico_range_high
data.select(['fico_range_low', 'fico_range_high']).describe().show() # need to convert to double

+-------+-----------------+-----------------+
|summary|   fico_range_low|  fico_range_high|
+-------+-----------------+-----------------+
|  count|          2634468|          2634468|
|   mean|699.6339659468249|703.6341508038814|
| stddev|33.27281959633915|33.27362998844954|
|    min|            610.0|            614.0|
|    max|            845.0|            850.0|
+-------+-----------------+-----------------+



In [29]:
# policy_code
data.groupBy('policy_code').count().orderBy(desc('count')).show() # Can remove policy_code because it only contain
                                                                  # value 1 

# remove 'policy_code' column
data = data.drop('policy_code')

+-----------+-------+
|policy_code|  count|
+-----------+-------+
|        1.0|2634468|
+-----------+-------+



In [30]:
# int_rate
data.groupBy('int_rate').count().show()

+--------+-----+
|int_rate|count|
+--------+-----+
|   13.87|   39|
|    20.5| 1415|
|   15.41| 2743|
|   15.49| 3795|
|   12.85| 2070|
|   11.83|  225|
|  28.95%|   29|
|   14.49|16080|
|   12.35| 2543|
|  25.65%|  889|
|   26.14|  134|
|   28.69| 1411|
|    5.42|  557|
|  18.94%| 2004|
|    8.49| 2796|
|  23.40%| 1194|
|   18.84| 1958|
|   16.01| 5161|
|    9.32|  168|
|    12.8|   25|
+--------+-----+
only showing top 20 rows



**Some of the values in 'int_rate' have '%' sign at the end that needs to be removed.**

In [31]:
# remove '%' from end of the number
data = data.withColumn("int_rate", f.regexp_replace(data["int_rate"], "%", ""))

**Similary, 'revol_util' also have '%' sign in some values which can be removed **

In [32]:
# revol_util
data.groupBy('revol_util').count().show()

# remove '%' from end of the number
data = data.withColumn("revol_util", f.regexp_replace(data["revol_util"], "%", ""))

+----------+-----+
|revol_util|count|
+----------+-----+
|     102.0|  198|
|      78.9| 1988|
|      58.7| 2973|
|      10.7| 1392|
|       8.5| 1104|
|      73.7| 2294|
|      91.8| 1387|
|     110.9|    5|
|     118.5|    1|
|     119.8|    1|
|     122.2|    1|
|    15.20%|  432|
|   105.10%|    3|
|      107%|    1|
|      34.4| 2897|
|      69.3| 2585|
|      47.5| 2947|
|      45.4| 2944|
|      20.5| 2015|
|       1.0|  896|
+----------+-----+
only showing top 20 rows



In [33]:
data.describe(['pub_rec', 'revol_bal', 'revol_util', 'total_pymnt', 'total_pymnt_inv']).show()

+-------+-------------------+------------------+-----------------+-----------------+------------------+
|summary|            pub_rec|         revol_bal|       revol_util|      total_pymnt|   total_pymnt_inv|
+-------+-------------------+------------------+-----------------+-----------------+------------------+
|  count|            2634440|           2634468|          2632350|          2634468|           2634468|
|   mean|0.18476412444390458|16888.801473390453|49.65538769160617|11018.45744976548|11003.349826454498|
| stddev| 0.5417298527092995|22940.049844157882|24.71200593988717| 9719.85776999284| 9714.429928562506|
|    min|                0.0|               0.0|                0|              0.0|               0.0|
|    max|                9.0|           99998.0|            99.90|          9999.99|           9999.99|
+-------+-------------------+------------------+-----------------+-----------------+------------------+



## Type Casting

In [34]:
data.printSchema()

root
 |-- loan_amnt: double (nullable = true)
 |-- funded_amnt: double (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- fico_range_low: string (nullable = true)
 |-- fico_range_high: string (nullable = true)
 |-- pub_rec: string (nullable = true)
 |-- revol_bal: string (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- initial_list_status: string (nullable = true)
 |-- total_pymnt: string (nullable = true)
 |-- total_py

Converting columns to Appropriate Data type

In [35]:
# Columns to cast in Double
toDouble = ['int_rate',
            'annual_inc',
            'dti',
            'fico_range_low',
            'fico_range_high',
            'revol_bal',
            'revol_util',
            'total_pymnt',
            'total_pymnt_inv',
            'total_rec_prncp',
            'total_rec_int',
            'tot_cur_bal',
            'total_rev_hi_lim',
            'avg_cur_bal',
            'bc_util',
            'acc_now_delinq'
           ]

# Columns to cast in Int
toInt = ['delinq_2yrs',
         'pub_rec']
            

In [36]:
from pyspark.sql.types import *

#cast columns to double datatype
for column in toDouble:
    data = data.withColumn(column, data[column].cast(DoubleType()))

In [37]:
#cast columns to int datatype
for column in toInt:
    data = data.withColumn(column, data[column].cast(IntegerType()))

data.printSchema()

root
 |-- loan_amnt: double (nullable = true)
 |-- funded_amnt: double (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- fico_range_low: double (nullable = true)
 |-- fico_range_high: double (nullable = true)
 |-- pub_rec: integer (nullable = true)
 |-- revol_bal: double (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- initial_list_status: string (nullable = true)
 |-- total_pymnt: double (nullable = true)
 |-- total_

## Removing all rows with null values

### Check the null values in each column

In [38]:
print(f'Total number of rows {data.count()}')

Total number of rows 2634468


In [39]:
df_null = data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in data.columns[:]])
null_values = df_null.head(1)[0]
df_null_to_remove = null_values.asDict() 

for i in df_null_to_remove.keys():
    df_null_to_remove[i] = (df_null_to_remove[i]*100 // total_rows)
print('Percentage of columns have null value: ')
df_null_to_remove

Percentage of columns have null value: 


{'loan_amnt': 0,
 'funded_amnt': 0,
 'funded_amnt_inv': 0,
 'term': 0,
 'int_rate': 0,
 'installment': 0,
 'grade': 0,
 'emp_length': 6,
 'home_ownership': 0,
 'annual_inc': 0,
 'verification_status': 0,
 'loan_status': 0,
 'pymnt_plan': 0,
 'addr_state': 0,
 'dti': 0,
 'delinq_2yrs': 0,
 'fico_range_low': 0,
 'fico_range_high': 0,
 'pub_rec': 0,
 'revol_bal': 0,
 'revol_util': 0,
 'initial_list_status': 0,
 'total_pymnt': 0,
 'total_pymnt_inv': 0,
 'total_rec_prncp': 0,
 'total_rec_int': 0,
 'application_type': 0,
 'acc_now_delinq': 0,
 'tot_cur_bal': 2,
 'total_rev_hi_lim': 2,
 'avg_cur_bal': 2,
 'bc_open_to_buy': 2,
 'bc_util': 2,
 'chargeoff_within_12_mths': 0,
 'delinq_amnt': 0,
 'mort_acc': 1,
 'num_accts_ever_120_pd': 2,
 'num_actv_bc_tl': 2,
 'num_actv_rev_tl': 2,
 'num_bc_sats': 2,
 'num_bc_tl': 2,
 'num_il_tl': 2,
 'num_op_rev_tl': 2,
 'num_rev_accts': 2,
 'num_rev_tl_bal_gt_0': 2,
 'num_sats': 2,
 'pct_tl_nvr_dlq': 2,
 'pub_rec_bankruptcies': 0,
 'tax_liens': 0,
 'tot_hi_cre

In [40]:
final_data = data.na.drop()

In [41]:
total_final_data = final_data.count()
print(f'Number of Columns: {len(final_data.columns)}')
print(f'Number of Rows: {total_final_data}')

Number of Columns: 53
Number of Rows: 2362633


In [42]:
print(f'Data loss after cleaning: {((total_rows-total_final_data)*100) //total_rows }%')

Data loss after cleaning: 10%


## Save Cleaned dataframe to csv file

In [43]:
final_data.coalesce(1).write.mode('overwrite').option('header', 'true').csv('Data/CleanedDataOutput')