In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('LendingClub').getOrCreate()

In [0]:
df_full = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("dbfs:/FileStore/shared_uploads/goenkaadityakol@gmail.com/LoanStats_2018Q4.csv")

In [0]:
df = df_full.select("term","home_ownership","grade","purpose","int_rate","addr_state","loan_status","application_type","loan_amnt","emp_length","annual_inc",
                   "dti","delinq_2yrs","revol_util","total_acc","num_tl_90g_dpd_24m","dti_joint")

In [0]:
df.cache()

Out[4]: DataFrame[term: string, home_ownership: string, grade: string, purpose: string, int_rate: string, addr_state: string, loan_status: string, application_type: string, loan_amnt: int, emp_length: string, annual_inc: double, dti: double, delinq_2yrs: int, revol_util: string, total_acc: int, num_tl_90g_dpd_24m: int, dti_joint: double]

In [0]:
temp_table_name = "loanstats"
df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql
select * from loanstats

term,home_ownership,grade,purpose,int_rate,addr_state,loan_status,application_type,loan_amnt,emp_length,annual_inc,dti,delinq_2yrs,revol_util,total_acc,num_tl_90g_dpd_24m,dti_joint
36 months,MORTGAGE,B,debt_consolidation,10.33%,OR,Current,Individual,10000,< 1 year,280000.0,6.15,2,38%,23,0,
36 months,RENT,C,debt_consolidation,13.56%,NY,Current,Individual,2500,10+ years,55000.0,18.24,0,10.30%,34,0,
60 months,MORTGAGE,C,debt_consolidation,13.56%,PA,Current,Individual,12000,< 1 year,40000.0,19.23,0,55.50%,9,0,
60 months,MORTGAGE,C,debt_consolidation,14.47%,TX,Current,Joint App,15000,,30000.0,41.6,0,37.20%,30,0,34.95
60 months,MORTGAGE,D,debt_consolidation,17.97%,NC,Current,Individual,16000,5 years,51000.0,21.91,0,24.80%,27,0,
36 months,RENT,E,credit_card,23.40%,WV,Current,Individual,9600,9 years,65000.0,23.01,1,37.50%,20,0,
36 months,RENT,E,debt_consolidation,23.40%,NJ,Current,Individual,4000,3 years,90000.0,26.33,0,19.20%,20,0,
36 months,MORTGAGE,D,car,20.89%,NJ,Current,Joint App,3500,10+ years,40000.0,9.09,0,33.50%,18,0,10.59
36 months,MORTGAGE,B,home_improvement,12.98%,KY,Current,Individual,9600,,35704.0,0.84,0,11.50%,23,0,
36 months,OWN,E,debt_consolidation,23.40%,AL,Current,Individual,8000,10+ years,43000.0,33.24,0,81.30%,16,0,


In [0]:
%sql
select distinct emp_length from loanstats limit 50

emp_length
5 years
9 years
1 year
""
2 years
7 years
8 years
4 years
6 years
3 years


In [0]:
from pyspark.sql.functions import regexp_extract,regexp_replace,col

In [0]:
regex_string = "\\d+"
df.select(regexp_extract('emp_length',regex_string,0).alias('emp_length_converted'),'emp_length').show(10)

+--------------------+----------+
|emp_length_converted|emp_length|
+--------------------+----------+
|                   1|  < 1 year|
|                  10| 10+ years|
|                   1|  < 1 year|
|                    |       n/a|
|                   5|   5 years|
|                   9|   9 years|
|                   3|   3 years|
|                  10| 10+ years|
|                    |       n/a|
|                  10| 10+ years|
+--------------------+----------+
only showing top 10 rows



In [0]:
df = df.withColumn('emplen_cleaned',regexp_extract('emp_length',regex_string,0)).withColumn('term_cleaned',regexp_replace('term',"months",""))

In [0]:
df.select('term','term_cleaned','emp_length','emplen_cleaned').show(10)

+----------+------------+----------+--------------+
|      term|term_cleaned|emp_length|emplen_cleaned|
+----------+------------+----------+--------------+
| 36 months|         36 |  < 1 year|             1|
| 36 months|         36 | 10+ years|            10|
| 60 months|         60 |  < 1 year|             1|
| 60 months|         60 |       n/a|              |
| 60 months|         60 |   5 years|             5|
| 36 months|         36 |   9 years|             9|
| 36 months|         36 |   3 years|             3|
| 36 months|         36 | 10+ years|            10|
| 36 months|         36 |       n/a|              |
| 36 months|         36 | 10+ years|            10|
+----------+------------+----------+--------------+
only showing top 10 rows



In [0]:
table_name = "loanstatus_sel"
df.createOrReplaceTempView(table_name)

In [0]:
%sql
select * from loanstatus_sel

term,home_ownership,grade,purpose,int_rate,addr_state,loan_status,application_type,loan_amnt,emp_length,annual_inc,dti,delinq_2yrs,revol_util,total_acc,num_tl_90g_dpd_24m,dti_joint,emplen_cleaned,term_cleaned
36 months,MORTGAGE,B,debt_consolidation,10.33%,OR,Current,Individual,10000,< 1 year,280000.0,6.15,2,38%,23,0,,1.0,36
36 months,RENT,C,debt_consolidation,13.56%,NY,Current,Individual,2500,10+ years,55000.0,18.24,0,10.30%,34,0,,10.0,36
60 months,MORTGAGE,C,debt_consolidation,13.56%,PA,Current,Individual,12000,< 1 year,40000.0,19.23,0,55.50%,9,0,,1.0,60
60 months,MORTGAGE,C,debt_consolidation,14.47%,TX,Current,Joint App,15000,,30000.0,41.6,0,37.20%,30,0,34.95,,60
60 months,MORTGAGE,D,debt_consolidation,17.97%,NC,Current,Individual,16000,5 years,51000.0,21.91,0,24.80%,27,0,,5.0,60
36 months,RENT,E,credit_card,23.40%,WV,Current,Individual,9600,9 years,65000.0,23.01,1,37.50%,20,0,,9.0,36
36 months,RENT,E,debt_consolidation,23.40%,NJ,Current,Individual,4000,3 years,90000.0,26.33,0,19.20%,20,0,,3.0,36
36 months,MORTGAGE,D,car,20.89%,NJ,Current,Joint App,3500,10+ years,40000.0,9.09,0,33.50%,18,0,10.59,10.0,36
36 months,MORTGAGE,B,home_improvement,12.98%,KY,Current,Individual,9600,,35704.0,0.84,0,11.50%,23,0,,,36
36 months,OWN,E,debt_consolidation,23.40%,AL,Current,Individual,8000,10+ years,43000.0,33.24,0,81.30%,16,0,,10.0,36


In [0]:
df.groupBy('purpose').count().show()

+------------------+-----+
|           purpose|count|
+------------------+-----+
|             other| 7094|
|    small_business| 1051|
|debt_consolidation|70603|
|       credit_card|34961|
|            moving|  656|
|          vacation|  802|
|  renewable_energy|   71|
|             house|  823|
|               car| 1037|
|    major_purchase| 2303|
|           medical| 1499|
|  home_improvement| 7512|
|              null|    2|
+------------------+-----+



In [0]:
df.groupBy('purpose').count().orderBy(col('count').desc()).show()

+------------------+-----+
|           purpose|count|
+------------------+-----+
|debt_consolidation|70603|
|       credit_card|34961|
|  home_improvement| 7512|
|             other| 7094|
|    major_purchase| 2303|
|           medical| 1499|
|    small_business| 1051|
|               car| 1037|
|             house|  823|
|          vacation|  802|
|            moving|  656|
|  renewable_energy|   71|
|              null|    2|
+------------------+-----+



In [0]:
from pyspark.sql.functions import isnan, when, count, col, avg
df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+--------------+------------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|emplen_cleaned|term_cleaned|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+--------------+------------+
|   2|             2|    2|      2|       2|         2|          2|               2|        2|         2|         2|239|          2|       158|        2|                 2|   111632|             2|           2|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+---

In [0]:
df = df.na.drop("all",subset=["loan_status"])

In [0]:
df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+--------------+------------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|emplen_cleaned|term_cleaned|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+--------------+------------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|             0|           0|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+---

In [0]:
df.select('revol_util','dti','dti_joint').describe().show()

+-------+----------+------------------+------------------+
|summary|revol_util|               dti|         dti_joint|
+-------+----------+------------------+------------------+
|  count|    128256|            128175|             16782|
|   mean|      null|19.933177530719703|19.226602312000953|
| stddev|      null| 20.14354224347552|  8.14163126459557|
|    min|        0%|               0.0|               0.0|
|    max|    99.90%|             999.0|             39.99|
+-------+----------+------------------+------------------+



In [0]:
df = df.withColumn('revolutil_cleaned',regexp_extract('revol_util',"\\d+",0))
df.select('revol_util','revolutil_cleaned').describe().show()

+-------+----------+-----------------+
|summary|revol_util|revolutil_cleaned|
+-------+----------+-----------------+
|  count|    128256|           128256|
|   mean|      null|43.76206961077844|
| stddev|      null|24.80184952820707|
|    min|        0%|                0|
|    max|    99.90%|               99|
+-------+----------+-----------------+



In [0]:
def fillavg(df,colname):
    return df.select(colname).agg(avg(colname))

In [0]:
rev_avg = fillavg(df,'revolutil_cleaned')

In [0]:
type(rev_avg)
a = rev_avg.collect()[0][0]

In [0]:
from pyspark.sql.functions import lit
df = df.withColumn('rev_avg',lit(a))
df.select('rev_avg').show(10)

+-----------------+
|          rev_avg|
+-----------------+
|43.76206961077844|
|43.76206961077844|
|43.76206961077844|
|43.76206961077844|
|43.76206961077844|
|43.76206961077844|
|43.76206961077844|
|43.76206961077844|
|43.76206961077844|
|43.76206961077844|
+-----------------+
only showing top 10 rows



In [0]:
from pyspark.sql.functions import coalesce
df = df.withColumn('revolutil_cleaned',coalesce('revolutil_cleaned','rev_avg'))

In [0]:
df = df.withColumn('revolutil_cleaned',df['revolutil_cleaned'].cast('double'))

In [0]:
df.select('revolutil_cleaned').describe().show()

+-------+------------------+
|summary| revolutil_cleaned|
+-------+------------------+
|  count|            128412|
|   mean| 43.76206961077843|
| stddev|24.786779696453944|
|    min|               0.0|
|    max|             183.0|
+-------+------------------+



In [0]:
df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+--------------+------------+-----------------+-------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|emplen_cleaned|term_cleaned|revolutil_cleaned|rev_avg|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+--------------+------------+-----------------+-------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|             0|           0|                0|      0|
+----+--------------+-----+-------+--------+--------

In [0]:
df = df.withColumn('dti_cleaned',coalesce('dti','dti_joint'))

In [0]:
df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+--------------+------------+-----------------+-------+-----------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|emplen_cleaned|term_cleaned|revolutil_cleaned|rev_avg|dti_cleaned|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+--------------+------------+-----------------+-------+-----------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|             0|           0|                0|      0|          0|
+---

In [0]:
df.groupBy('loan_status').count().show()

+------------------+------+
|       loan_status| count|
+------------------+------+
|        Fully Paid|  4908|
|   In Grace Period|   513|
|       Charged Off|   122|
|Late (31-120 days)|   849|
|           Current|121676|
| Late (16-30 days)|   344|
+------------------+------+



In [0]:
df = df.withColumn('bad_loan',when(df['loan_status'].isin(['In Grace Period','Charged Off','Late (31-120 days)','Late (16-30 days)']),'Yes').otherwise('No'))

In [0]:
df.groupBy('bad_loan').count().show()

+--------+------+
|bad_loan| count|
+--------+------+
|      No|126584|
|     Yes|  1828|
+--------+------+



In [0]:
df.printSchema()

root
 |-- term: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- num_tl_90g_dpd_24m: integer (nullable = true)
 |-- dti_joint: double (nullable = true)
 |-- emplen_cleaned: string (nullable = true)
 |-- term_cleaned: string (nullable = true)
 |-- revolutil_cleaned: double (nullable = true)
 |-- rev_avg: double (nullable = false)
 |-- dti_cleaned: double (nullable = true)
 |-- bad_loan: string (nullable = false)



In [0]:
df_final = df.drop('revol_util','dti','dti_joint')

In [0]:
df_final.printSchema()

root
 |-- term: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- num_tl_90g_dpd_24m: integer (nullable = true)
 |-- emplen_cleaned: string (nullable = true)
 |-- term_cleaned: string (nullable = true)
 |-- revolutil_cleaned: double (nullable = true)
 |-- rev_avg: double (nullable = false)
 |-- dti_cleaned: double (nullable = true)
 |-- bad_loan: string (nullable = false)



In [0]:
df_final.stat.crosstab('bad_loan','grade').show()

+--------------+-----+-----+-----+-----+----+---+---+
|bad_loan_grade|    A|    B|    C|    D|   E|  F|  G|
+--------------+-----+-----+-----+-----+----+---+---+
|            No|37827|35472|30498|16630|5712|357| 88|
|           Yes|  184|  389|  520|  442| 250| 30| 13|
+--------------+-----+-----+-----+-----+----+---+---+



In [0]:
permanent_table_name = 'lc_loan_data_final1'
df_final.write.format('parquet').saveAsTable(permanent_table_name)

In [0]:
%sql
select * from lc_loan_data_final1

term,home_ownership,grade,purpose,int_rate,addr_state,loan_status,application_type,loan_amnt,emp_length,annual_inc,delinq_2yrs,total_acc,num_tl_90g_dpd_24m,emplen_cleaned,term_cleaned,revolutil_cleaned,rev_avg,dti_cleaned,bad_loan
36 months,MORTGAGE,C,credit_card,13.56%,NC,Current,Individual,16000,4 years,40000.0,0,9,0,4.0,36,54.0,43.76206961077844,13.53,No
60 months,MORTGAGE,C,debt_consolidation,13.56%,NJ,Current,Individual,25000,5 years,68000.0,1,19,0,5.0,60,1.0,43.76206961077844,6.6,No
36 months,MORTGAGE,A,debt_consolidation,6.67%,CT,Current,Individual,15000,5 years,60000.0,0,25,0,5.0,36,36.0,43.76206961077844,22.22,No
36 months,RENT,B,debt_consolidation,10.08%,TX,Current,Individual,10000,1 year,60000.0,0,9,0,1.0,36,40.0,43.76206961077844,16.84,No
36 months,MORTGAGE,B,credit_card,11.55%,AZ,Current,Individual,12000,5 years,46000.0,0,37,0,5.0,36,34.0,43.76206961077844,30.11,No
36 months,RENT,A,debt_consolidation,8.46%,NY,Current,Joint App,5975,10+ years,40496.0,0,30,0,10.0,36,18.0,43.76206961077844,42.53,No
60 months,MORTGAGE,B,debt_consolidation,11.55%,NV,Fully Paid,Individual,24000,4 years,55000.0,0,39,0,4.0,60,9.0,43.76206961077844,3.72,No
60 months,RENT,D,debt_consolidation,19.92%,IL,Current,Joint App,30000,2 years,45500.0,0,18,0,2.0,60,90.0,43.76206961077844,37.46,No
36 months,RENT,C,debt_consolidation,13.56%,TX,Current,Individual,15000,2 years,45000.0,0,13,0,2.0,36,61.0,43.76206961077844,15.6,No
36 months,RENT,A,credit_card,7.84%,AZ,Current,Individual,14000,10+ years,61000.0,1,23,0,10.0,36,56.0,43.76206961077844,30.87,No


In [0]:
df_final.write.format('com.databricks.spark.csv').option("header","true").save("dbfs:/FileStore/df/Sample.csv")

In [0]:
df_final.display()

term,home_ownership,grade,purpose,int_rate,addr_state,loan_status,application_type,loan_amnt,emp_length,annual_inc,delinq_2yrs,total_acc,num_tl_90g_dpd_24m,emplen_cleaned,term_cleaned,revolutil_cleaned,rev_avg,dti_cleaned,bad_loan
36 months,MORTGAGE,B,debt_consolidation,10.33%,OR,Current,Individual,10000,< 1 year,280000.0,2,23,0,1.0,36,38.0,43.76206961077844,6.15,No
36 months,RENT,C,debt_consolidation,13.56%,NY,Current,Individual,2500,10+ years,55000.0,0,34,0,10.0,36,10.0,43.76206961077844,18.24,No
60 months,MORTGAGE,C,debt_consolidation,13.56%,PA,Current,Individual,12000,< 1 year,40000.0,0,9,0,1.0,60,55.0,43.76206961077844,19.23,No
60 months,MORTGAGE,C,debt_consolidation,14.47%,TX,Current,Joint App,15000,,30000.0,0,30,0,,60,37.0,43.76206961077844,41.6,No
60 months,MORTGAGE,D,debt_consolidation,17.97%,NC,Current,Individual,16000,5 years,51000.0,0,27,0,5.0,60,24.0,43.76206961077844,21.91,No
36 months,RENT,E,credit_card,23.40%,WV,Current,Individual,9600,9 years,65000.0,1,20,0,9.0,36,37.0,43.76206961077844,23.01,No
36 months,RENT,E,debt_consolidation,23.40%,NJ,Current,Individual,4000,3 years,90000.0,0,20,0,3.0,36,19.0,43.76206961077844,26.33,No
36 months,MORTGAGE,D,car,20.89%,NJ,Current,Joint App,3500,10+ years,40000.0,0,18,0,10.0,36,33.0,43.76206961077844,9.09,No
36 months,MORTGAGE,B,home_improvement,12.98%,KY,Current,Individual,9600,,35704.0,0,23,0,,36,11.0,43.76206961077844,0.84,No
36 months,OWN,E,debt_consolidation,23.40%,AL,Current,Individual,8000,10+ years,43000.0,0,16,0,10.0,36,81.0,43.76206961077844,33.24,No
