In [1]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
ss = SparkSession.builder.master('local[*]').appName('LoanStat2018Q4').getOrCreate()
sc = ss.sparkContext
ss

In [3]:
loan_stat_path = f'{os.path.abspath(os.path.curdir)}/data/LoanStats_2018Q4.csv'
loan_df = ss.read.csv(loan_stat_path, inferSchema=True, header=True)

In [8]:
loan_df.count()

128416

In [9]:
loan_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: doubl

In [16]:
df_sel = loan_df.select(
    'term', 'home_ownership', 'grade', 'purpose', 'int_rate', 'addr_state', 'loan_status', 
    'application_type', 'loan_amnt', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util',
    'total_acc', 'num_tl_90g_dpd_24m', 'dti_joint'
)

In [20]:
df_sel.describe().show()

+-------+----------+--------------+------+--------+--------+----------+------------------+----------------+------------------+----------+------------------+------------------+-------------------+----------+------------------+--------------------+------------------+
|summary|      term|home_ownership| grade| purpose|int_rate|addr_state|       loan_status|application_type|         loan_amnt|emp_length|        annual_inc|               dti|        delinq_2yrs|revol_util|         total_acc|  num_tl_90g_dpd_24m|         dti_joint|
+-------+----------+--------------+------+--------+--------+----------+------------------+----------------+------------------+----------+------------------+------------------+-------------------+----------+------------------+--------------------+------------------+
|  count|    128412|        128412|128412|  128412|  128412|    128412|            128412|          128412|            128412|    128412|            128412|            128175|             128412|    128

In [21]:
df_sel.describe('loan_amnt', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util',
    'total_acc').show()

+-------+------------------+----------+------------------+------------------+-------------------+----------+------------------+
|summary|         loan_amnt|emp_length|        annual_inc|               dti|        delinq_2yrs|revol_util|         total_acc|
+-------+------------------+----------+------------------+------------------+-------------------+----------+------------------+
|  count|            128412|    128412|            128412|            128175|             128412|    128256|            128412|
|   mean| 15971.32102139987|      null|  82797.3278609476|19.933177530719757|0.22783696227766875|      null|22.677413325857398|
| stddev|10150.384232741908|      null|108298.46579150065|20.143542243475515| 0.7337929617806063|      null|12.129215673024758|
|    min|              1000|    1 year|               0.0|               0.0|                  0|        0%|                 2|
|    max|             40000|       n/a|         9757200.0|             999.0|                 24|    99.

In [23]:
df_sel.select(
    regexp_extract(col('emp_length'), '\\d+', 0).alias('emplength_cleaned'),
    col('emp_length')
).show()

+-----------------+----------+
|emplength_cleaned|emp_length|
+-----------------+----------+
|                1|  < 1 year|
|               10| 10+ years|
|                1|  < 1 year|
|                 |       n/a|
|                5|   5 years|
|                9|   9 years|
|                3|   3 years|
|               10| 10+ years|
|                 |       n/a|
|               10| 10+ years|
|                1|  < 1 year|
|               10| 10+ years|
|               10| 10+ years|
|                6|   6 years|
|                5|   5 years|
|               10| 10+ years|
|                2|   2 years|
|               10| 10+ years|
|                1|  < 1 year|
|                6|   6 years|
+-----------------+----------+
only showing top 20 rows



In [31]:
df_sel = df_sel.withColumn('term_cleaned', regexp_replace(col('term'), 'months', '')).withColumn('emplen_cleaned', regexp_extract(col('emp_length'), '\\d+', 0))

In [32]:
df_sel.select('term', 'term_cleaned', 'emp_length', 'emplen_cleaned').show()

+----------+------------+----------+--------------+
|      term|term_cleaned|emp_length|emplen_cleaned|
+----------+------------+----------+--------------+
| 36 months|         36 |  < 1 year|             1|
| 36 months|         36 | 10+ years|            10|
| 60 months|         60 |  < 1 year|             1|
| 60 months|         60 |       n/a|              |
| 60 months|         60 |   5 years|             5|
| 36 months|         36 |   9 years|             9|
| 36 months|         36 |   3 years|             3|
| 36 months|         36 | 10+ years|            10|
| 36 months|         36 |       n/a|              |
| 36 months|         36 | 10+ years|            10|
| 60 months|         60 |  < 1 year|             1|
| 60 months|         60 | 10+ years|            10|
| 60 months|         60 | 10+ years|            10|
| 36 months|         36 |   6 years|             6|
| 60 months|         60 |   5 years|             5|
| 60 months|         60 | 10+ years|            10|
| 36 months|

In [33]:
df_sel.stat.crosstab('loan_status', 'grade').show()

+------------------+-----+-----+-----+-----+----+---+---+----+
| loan_status_grade|    A|    B|    C|    D|   E|  F|  G|null|
+------------------+-----+-----+-----+-----+----+---+---+----+
|   In Grace Period|   74|  112|  146|  122|  54|  4|  1|   0|
|        Fully Paid| 1188| 1333| 1175|  807| 360| 36|  9|   0|
|              null|    0|    0|    0|    0|   0|  0|  0|   4|
|Late (31-120 days)|   68|  164|  234|  220| 142| 14|  7|   0|
| Late (16-30 days)|   26|   78|  102|   81|  46|  9|  2|   0|
|           Current|36639|34139|29323|15823|5352|321| 79|   0|
|       Charged Off|   16|   35|   38|   19|   8|  3|  3|   0|
+------------------+-----+-----+-----+-----+----+---+---+----+



In [37]:
freq = df_sel.stat.freqItems(['purpose', 'grade'], 0.3)

In [39]:
freq.collect()

[Row(purpose_freqItems=['debt_consolidation', 'credit_card', None], grade_freqItems=['A', 'C', 'B'])]

In [41]:
df_sel.groupby('purpose').count().orderBy(col('count').desc()).show()

+------------------+-----+
|           purpose|count|
+------------------+-----+
|debt_consolidation|70603|
|       credit_card|34961|
|  home_improvement| 7512|
|             other| 7094|
|    major_purchase| 2303|
|           medical| 1499|
|    small_business| 1051|
|               car| 1037|
|             house|  823|
|          vacation|  802|
|            moving|  656|
|  renewable_energy|   71|
|              null|    4|
+------------------+-----+



In [49]:
quantile_probs = [0.25, 0.5, 0.75, 0.9]
rel_err = 0.05
df_sel.stat.approxQuantile('loan_amnt', quantile_probs, rel_err)

[8000.0, 13000.0, 20000.0, 30000.0]

In [51]:
quantile_probs = [0.25, 0.5, 0.75, 0.9]
rel_err = 0.5
df_sel.stat.approxQuantile('loan_amnt', quantile_probs, rel_err)

[1000.0, 1000.0, 40000.0, 40000.0]

In [52]:
df_sel.select([count(when(isnan(colmn) | col(colmn).isNull(), colmn)).alias(colmn) for colmn in df_sel.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+------------+------+--------------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|term_cleaned|emplen|emplen_cleaned|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+------------+------+--------------+
|   4|             4|    4|      4|       4|         4|          4|               4|        4|         4|         4|241|          4|       160|        4|                 4|   111634|           4|     4|             4|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+--------

In [53]:
df_sel = df_sel.na.drop('all', subset=['loan_status'])

In [54]:
df_sel.select([count(when(isnan(colmn) | col(colmn).isNull(), colmn)).alias(colmn) for colmn in df_sel.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+------------+------+--------------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|term_cleaned|emplen|emplen_cleaned|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+------------+------+--------------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|           0|     0|             0|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+--------

In [55]:
df_sel.describe('dti', 'revol_util').show()

+-------+------------------+----------+
|summary|               dti|revol_util|
+-------+------------------+----------+
|  count|            128175|    128256|
|   mean|19.933177530719757|      null|
| stddev|20.143542243475515|      null|
|    min|               0.0|        0%|
|    max|             999.0|    99.90%|
+-------+------------------+----------+



In [57]:
df_sel = df_sel.withColumn('revolutil_cleaned', regexp_extract(col('revol_util'), '\\d+', 0))

In [58]:
df_sel.describe('revol_util', 'revolutil_cleaned').show()

+-------+----------+------------------+
|summary|revol_util| revolutil_cleaned|
+-------+----------+------------------+
|  count|    128256|            128256|
|   mean|      null| 43.76206961077844|
| stddev|      null|24.801849528206994|
|    min|        0%|                 0|
|    max|    99.90%|                99|
+-------+----------+------------------+



In [61]:
def fill_avg(df, colmn):
    return df.select(colmn).agg(avg(colmn))

In [65]:
rev_avg = fill_avg(df_sel, 'revolutil_cleaned').first()[0]
print(rev_avg)
df_sel = df_sel.withColumn('rev_avg', lit(rev_avg))

43.76206961077847


In [63]:
df_sel = df_sel.withColumn('revolutil_cleaned', coalesce(col('revolutil_cleaned'), col('rev_avg')))

In [66]:
df_sel = df_sel.withColumn('revolutil_cleaned', df_sel['revolutil_cleaned'].cast('double'))

In [67]:
df_sel.describe('revolutil_cleaned', 'revol_util').show()

+-------+-----------------+----------+
|summary|revolutil_cleaned|revol_util|
+-------+-----------------+----------+
|  count|           128412|    128256|
|   mean|43.76206961077847|      null|
| stddev|24.78677969645399|      null|
|    min|              0.0|        0%|
|    max|            183.0|    99.90%|
+-------+-----------------+----------+



In [68]:
df_sel.select([count(when(isnan(colmn) | col(colmn).isNull(), colmn)).alias(colmn) for colmn in df_sel.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+------------+------+--------------+-----------------+-------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|term_cleaned|emplen|emplen_cleaned|revolutil_cleaned|rev_avg|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+------------+------+--------------+-----------------+-------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|           0|     0|             0|                0|      0|
+----+--------------+---

In [69]:
df_sel = df_sel.withColumn('dti_cleaned', coalesce(col('dti'), col('dti_joint')))

In [70]:
df_sel.select([count(when(isnan(colmn) | col(colmn).isNull(), colmn)).alias(colmn) for colmn in df_sel.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+------------+------+--------------+-----------------+-------+-----------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|term_cleaned|emplen|emplen_cleaned|revolutil_cleaned|rev_avg|dti_cleaned|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+------------+------+--------------+-----------------+-------+-----------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|           0|     0|             0|               

In [72]:
df_sel.groupby('loan_status').count().show()

+------------------+------+
|       loan_status| count|
+------------------+------+
|        Fully Paid|  4908|
|   In Grace Period|   513|
|       Charged Off|   122|
|Late (31-120 days)|   849|
|           Current|121676|
| Late (16-30 days)|   344|
+------------------+------+



In [74]:
df_sel.where(df_sel['loan_status'].isin(['Late (16-30 days)', 'Charged Off', 'In Grace Period', 'Late (31-120 days)'])).show()

+----------+--------------+-----+------------------+--------+----------+------------------+----------------+---------+----------+----------+-----+-----------+----------+---------+------------------+---------+------------+------+--------------+-----------------+-----------------+-----------+
|      term|home_ownership|grade|           purpose|int_rate|addr_state|       loan_status|application_type|loan_amnt|emp_length|annual_inc|  dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|term_cleaned|emplen|emplen_cleaned|revolutil_cleaned|          rev_avg|dti_cleaned|
+----------+--------------+-----+------------------+--------+----------+------------------+----------------+---------+----------+----------+-----+-----------+----------+---------+------------------+---------+------------+------+--------------+-----------------+-----------------+-----------+
| 36 months|          RENT|    B|debt_consolidation|  10.33%|        PA|Late (31-120 days)|      Individual|    16000|   3 y

In [75]:
df_sel = df_sel.withColumn(
    'bad_loan', 
    when(
        df_sel['loan_status'].isin(['Late (16-30 days)', 'Charged Off', 'In Grace Period', 'Late (31-120 days)']), 
        'Yes'
    ).otherwise('No')
)

In [76]:
df_sel.groupby('bad_loan').count().show()

+--------+------+
|bad_loan| count|
+--------+------+
|      No|126584|
|     Yes|  1828|
+--------+------+



In [77]:
df_sel_final = df_sel.drop('revol_util', 'dti', 'dti_joint')

In [78]:
df_sel_final.printSchema()

root
 |-- term: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- num_tl_90g_dpd_24m: integer (nullable = true)
 |-- term_cleaned: string (nullable = true)
 |-- emplen: string (nullable = true)
 |-- emplen_cleaned: string (nullable = true)
 |-- revolutil_cleaned: double (nullable = true)
 |-- rev_avg: double (nullable = false)
 |-- dti_cleaned: double (nullable = true)
 |-- bad_loan: string (nullable = false)



In [79]:
df_sel_final.crosstab('bad_loan', 'grade').show()

+--------------+-----+-----+-----+-----+----+---+---+
|bad_loan_grade|    A|    B|    C|    D|   E|  F|  G|
+--------------+-----+-----+-----+-----+----+---+---+
|            No|37827|35472|30498|16630|5712|357| 88|
|           Yes|  184|  389|  520|  442| 250| 30| 13|
+--------------+-----+-----+-----+-----+----+---+---+



In [80]:
df_sel_final.describe('dti_cleaned').show()

+-------+------------------+
|summary|       dti_cleaned|
+-------+------------------+
|  count|            128412|
|   mean|19.930878500451705|
| stddev| 20.12807970280371|
|    min|               0.0|
|    max|             999.0|
+-------+------------------+

