Requirements

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=7b1307c40379608b61160e4802daea312ad094d0f40eaec706f3ae38d4755b75
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


Creating a Spark Session

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

Loading the data

In [3]:
df=spark.read.format("csv") \
  .option('header','True')\
  .option('delimiter',',')\
  .option('inferSchema','True')\
  .load('/content/drive/MyDrive/LoanStats_2018Q4.csv')

In [29]:
len(df.columns)

145

In [28]:
df.columns

['id',
 'member_id',
 'loan_amnt',
 'funded_amnt',
 'funded_amnt_inv',
 'term',
 'int_rate',
 'installment',
 'grade',
 'sub_grade',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'verification_status',
 'issue_d',
 'loan_status',
 'pymnt_plan',
 'url',
 'desc',
 'purpose',
 'title',
 'zip_code',
 'addr_state',
 'dti',
 'delinq_2yrs',
 'earliest_cr_line',
 'inq_last_6mths',
 'mths_since_last_delinq',
 'mths_since_last_record',
 'open_acc',
 'pub_rec',
 'revol_bal',
 'revol_util',
 'total_acc',
 'initial_list_status',
 'out_prncp',
 'out_prncp_inv',
 'total_pymnt',
 'total_pymnt_inv',
 'total_rec_prncp',
 'total_rec_int',
 'total_rec_late_fee',
 'recoveries',
 'collection_recovery_fee',
 'last_pymnt_d',
 'last_pymnt_amnt',
 'next_pymnt_d',
 'last_credit_pull_d',
 'collections_12_mths_ex_med',
 'mths_since_last_major_derog',
 'policy_code',
 'application_type',
 'annual_inc_joint',
 'dti_joint',
 'verification_status_joint',
 'acc_now_delinq',
 'tot_coll_amt',
 'tot_cur_

In [None]:
df.show()

+----+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+----+----+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+-----------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+-----------+-----------+-----------+------------------+------------+-------+-----------+-----------+----------+--------+----------------+------+-----------+------------+

Data Aggregation

In [None]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: doubl

In [None]:
#check how many rows are there in the dataset
df.count()

128412

In [None]:
#creating a temporary table to run the SQL queries.
df.createOrReplaceTempView("loanstats")

In [None]:
spark.sql("select * from loanstats").show()

+----+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+----+----+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+-----------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+-----------+-----------+-----------+------------------+------------+-------+-----------+-----------+----------+--------+----------------+------+-----------+------------+

In [None]:
spark.sql("select count(*) from loanstats")

DataFrame[count(1): bigint]

In [None]:
#It will provide the statistics(mean,cunt,min,max) for the numerical values in df.
df.describe().show()

+-------+----+---------+------------------+------------------+------------------+----------+--------+------------------+------+---------+------------------+----------+--------------+-----------------+-------------------+-------+------------------+----------+----+----+--------+--------+--------+----------+------------------+-------------------+----------------+-------------------+----------------------+----------------------+-----------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-------------------+--------------------+-----------------------+------------+------------------+------------+------------------+--------------------------+---------------------------+-----------+----------------+------------------+------------------+-------------------------+--------------+------------------+------------------+------------------+-----

In [4]:
#selecting only the required column which will be only usable.
df_sel = df.select("term","home_ownership","grade","purpose",
                   "int_rate","addr_state","loan_status","application_type",
                   "loan_amnt","emp_length","annual_inc","dti","delinq_2yrs",
                   "revol_util","total_acc","num_tl_90g_dpd_24m","dti_joint")

In [None]:
df_sel.describe().show()

+-------+----------+--------------+------+--------+--------+----------+------------------+----------------+------------------+----------+-----------------+------------------+-------------------+----------+------------------+--------------------+------------------+
|summary|      term|home_ownership| grade| purpose|int_rate|addr_state|       loan_status|application_type|         loan_amnt|emp_length|       annual_inc|               dti|        delinq_2yrs|revol_util|         total_acc|  num_tl_90g_dpd_24m|         dti_joint|
+-------+----------+--------------+------+--------+--------+----------+------------------+----------------+------------------+----------+-----------------+------------------+-------------------+----------+------------------+--------------------+------------------+
|  count|    128412|        128412|128412|  128412|  128412|    128412|            128412|          128412|            128412|    128412|           128412|            128175|             128412|    128256|

In [None]:
#further limiting the columns
df_sel.describe("term","loan_amnt","emp_length","annual_inc","dti","delinq_2yrs","revol_util","total_acc").show()

+-------+----------+------------------+----------+-----------------+------------------+-------------------+----------+------------------+
|summary|      term|         loan_amnt|emp_length|       annual_inc|               dti|        delinq_2yrs|revol_util|         total_acc|
+-------+----------+------------------+----------+-----------------+------------------+-------------------+----------+------------------+
|  count|    128412|            128412|    128412|           128412|            128175|             128412|    128256|            128412|
|   mean|      null| 15971.32102139987|      null| 82797.3278609476|19.933177530719778|0.22783696227766875|      null|22.677413325857398|
| stddev|      null|10150.384232741915|      null|108298.4657915005|20.143542243475476| 0.7337929617806072|      null|12.129215673024733|
|    min| 36 months|              1000|    1 year|              0.0|               0.0|                  0|        0%|                 2|
|    max| 60 months|             4

In [None]:
#caching-it will load the dataframe into executor memeory, so that it can process it fast
#subsequent operations can be performed faster without having to recompute the DataFrame from the original source or any intermediate transformations.
#NOte: If you cache too many DataFrames or too large DataFrames, it can lead to excessive memory consumption, which might cause memory-related issues in your Spark cluster.
df_sel.cache()

DataFrame[term: string, home_ownership: string, grade: string, purpose: string, int_rate: string, addr_state: string, loan_status: string, application_type: string, loan_amnt: int, emp_length: string, annual_inc: double, dti: double, delinq_2yrs: int, revol_util: string, total_acc: int, num_tl_90g_dpd_24m: int, dti_joint: double]

In [None]:
df_sel.describe("loan_amnt","emp_length","dti","delinq_2yrs","revol_util","total_acc").show()

+-------+------------------+----------+------------------+-------------------+----------+------------------+
|summary|         loan_amnt|emp_length|               dti|        delinq_2yrs|revol_util|         total_acc|
+-------+------------------+----------+------------------+-------------------+----------+------------------+
|  count|            128412|    128412|            128175|             128412|    128256|            128412|
|   mean| 15971.32102139987|      null|19.933177530719778|0.22783696227766875|      null|22.677413325857398|
| stddev|10150.384232741915|      null|20.143542243475476| 0.7337929617806072|      null|12.129215673024733|
|    min|              1000|    1 year|               0.0|                  0|        0%|                 2|
|    max|             40000|       n/a|             999.0|                 24|    99.90%|               160|
+-------+------------------+----------+------------------+-------------------+----------+------------------+



In [None]:
#Checking employee length varible
spark.sql("select distinct emp_length from loanstats limit 50").show()

+----------+
|emp_length|
+----------+
|   5 years|
|   9 years|
|    1 year|
|       n/a|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



In [None]:
from pyspark.sql.functions import regexp_replace, regexp_extract
from pyspark.sql.functions import col

regexp_string='years|year|\\+|\\<'  #before + we nneed to prefix \\ or else it will take the + symbol as an operator.
df_sel.select(regexp_replace(col('emp_length'),regexp_string,"").alias("emplenght_cleaned"),col("emp_length")).show()

+-----------------+----------+
|emplenght_cleaned|emp_length|
+-----------------+----------+
|               1 |  < 1 year|
|              10 | 10+ years|
|               1 |  < 1 year|
|              n/a|       n/a|
|               5 |   5 years|
|               9 |   9 years|
|               3 |   3 years|
|              10 | 10+ years|
|              n/a|       n/a|
|              10 | 10+ years|
|               1 |  < 1 year|
|              10 | 10+ years|
|              10 | 10+ years|
|               6 |   6 years|
|               5 |   5 years|
|              10 | 10+ years|
|               2 |   2 years|
|              10 | 10+ years|
|               1 |  < 1 year|
|               6 |   6 years|
+-----------------+----------+
only showing top 20 rows



In [None]:
#other way using extracting only the digits
from pyspark.sql.functions import regexp_replace, regexp_extract
regexp_string='\\d+'
df_sel.select(regexp_extract(col('emp_length'),regexp_string,0).alias("emplenght_cleaned"),col("emp_length")).show()

+-----------------+----------+
|emplenght_cleaned|emp_length|
+-----------------+----------+
|                1|  < 1 year|
|               10| 10+ years|
|                1|  < 1 year|
|                 |       n/a|
|                5|   5 years|
|                9|   9 years|
|                3|   3 years|
|               10| 10+ years|
|                 |       n/a|
|               10| 10+ years|
|                1|  < 1 year|
|               10| 10+ years|
|               10| 10+ years|
|                6|   6 years|
|                5|   5 years|
|               10| 10+ years|
|                2|   2 years|
|               10| 10+ years|
|                1|  < 1 year|
|                6|   6 years|
+-----------------+----------+
only showing top 20 rows



In [None]:
#other way using extracting only the digits
from pyspark.sql.functions import regexp_replace, regexp_extract
#regexp_string='\\d+'
#df_sel.select(regexp_extract(col('emp_length'),regexp_string,0).alias("emplenght_cleaned"),col("emp_length")).show()
reg='months'
df_sel.select(regexp_replace(col('term'),reg,"").alias("term_cleaned"),col('term')).show()

'''regexp_string='years|year|\\+|\\<'  #before + we nneed to prefix \\ or else it will take the + symbol as an operator.
df_sel.select(regexp_replace(col('emp_length'),regexp_string,"").alias("emplenght_cleaned"),col("emp_length")).show()'''

+------------+----------+
|term_cleaned|      term|
+------------+----------+
|         36 | 36 months|
|         36 | 36 months|
|         60 | 60 months|
|         60 | 60 months|
|         60 | 60 months|
|         36 | 36 months|
|         36 | 36 months|
|         36 | 36 months|
|         36 | 36 months|
|         36 | 36 months|
|         60 | 60 months|
|         60 | 60 months|
|         60 | 60 months|
|         36 | 36 months|
|         60 | 60 months|
|         60 | 60 months|
|         36 | 36 months|
|         60 | 60 months|
|         60 | 60 months|
|         36 | 36 months|
+------------+----------+
only showing top 20 rows



'regexp_string=\'years|year|\\+|\\<\'  #before + we nneed to prefix \\ or else it will take the + symbol as an operator.\ndf_sel.select(regexp_replace(col(\'emp_length\'),regexp_string,"").alias("emplenght_cleaned"),col("emp_length")).show()'

In [None]:
#assigning all the above cleaned steps to df_sel
df_sel=df_sel.withColumn("term_cleaned",regexp_replace(col("term"),"months","")) \
             .withColumn("emplen_cleaned",regexp_extract(col("emp_length"),"\\d+",0))

In [None]:
df_sel.select("term","term_cleaned","emp_length","emplen_cleaned").show()

+----------+------------+----------+--------------+
|      term|term_cleaned|emp_length|emplen_cleaned|
+----------+------------+----------+--------------+
| 36 months|         36 |  < 1 year|             1|
| 36 months|         36 | 10+ years|            10|
| 60 months|         60 |  < 1 year|             1|
| 60 months|         60 |       n/a|              |
| 60 months|         60 |   5 years|             5|
| 36 months|         36 |   9 years|             9|
| 36 months|         36 |   3 years|             3|
| 36 months|         36 | 10+ years|            10|
| 36 months|         36 |       n/a|              |
| 36 months|         36 | 10+ years|            10|
| 60 months|         60 |  < 1 year|             1|
| 60 months|         60 | 10+ years|            10|
| 60 months|         60 | 10+ years|            10|
| 36 months|         36 |   6 years|             6|
| 60 months|         60 |   5 years|             5|
| 60 months|         60 | 10+ years|            10|
| 36 months|

In [None]:
df_sel.printSchema()

root
 |-- term: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- num_tl_90g_dpd_24m: integer (nullable = true)
 |-- dti_joint: double (nullable = true)
 |-- term_cleaned: string (nullable = true)
 |-- emplen_cleaned: string (nullable = true)



In [None]:
#Creating a temporary variable for df_sel
df_sel.createOrReplaceTempView("loanstatus_sel")

In [None]:
spark.sql("select * from loanstatus_sel").show()

+----------+--------------+-----+------------------+--------+----------+-----------+----------------+---------+----------+----------+-----+-----------+----------+---------+------------------+---------+------------+--------------+
|      term|home_ownership|grade|           purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|  dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|term_cleaned|emplen_cleaned|
+----------+--------------+-----+------------------+--------+----------+-----------+----------------+---------+----------+----------+-----+-----------+----------+---------+------------------+---------+------------+--------------+
| 36 months|      MORTGAGE|    B|debt_consolidation|  10.33%|        OR|    Current|      Individual|    10000|  < 1 year|  280000.0| 6.15|          2|       38%|       23|                 0|     null|         36 |             1|
| 36 months|          RENT|    C|debt_consolidation|  13.56%|        NY|    Curr

Coorelation Matrix

In [None]:
'''df_sel.stat.corr('annual_inc','loan_amnt')'''  #stat-refers to statistic function,corr-corelation between two column
                                           #In here we are checking if the two columns are correlated to eacbh other
                                           #-1- Negatively correlated
                                           #+1 - Positively correlated
                                           #0 - No Correlation

                                           #OR
spark.sql("select corr(annual_inc,loan_amnt) from loanstatus_sel").show() #in sql

+---------------------------+
|corr(annual_inc, loan_amnt)|
+---------------------------+
|        0.20103225337914624|
+---------------------------+



In [None]:
#from above one we can see it is not so much correlated.

In [None]:
spark.sql("select corr(loan_amnt,term_cleaned) from loanstatus_sel").show()

+-----------------------------+
|corr(loan_amnt, term_cleaned)|
+-----------------------------+
|           0.3925941141844244|
+-----------------------------+



In [None]:
#still not a strong correlation

In [None]:
#Crosstab - displays the frequency distribution of values for one column against another, providing a summary of the relationship between the two columns.
df_sel.stat.crosstab('loan_status','grade').show()

+------------------+-----+-----+-----+-----+----+---+---+
| loan_status_grade|    A|    B|    C|    D|   E|  F|  G|
+------------------+-----+-----+-----+-----+----+---+---+
|        Fully Paid| 1188| 1333| 1175|  807| 360| 36|  9|
|   In Grace Period|   74|  112|  146|  122|  54|  4|  1|
|       Charged Off|   16|   35|   38|   19|   8|  3|  3|
|Late (31-120 days)|   68|  164|  234|  220| 142| 14|  7|
|           Current|36639|34139|29323|15823|5352|321| 79|
| Late (16-30 days)|   26|   78|  102|   81|  46|  9|  2|
+------------------+-----+-----+-----+-----+----+---+---+



In [None]:
freq=df_sel.stat.freqItems(['purpose','grade'],0.3)
freq.collect()

[Row(purpose_freqItems=['debt_consolidation', 'credit_card', 'other'], grade_freqItems=['A', 'B', 'C'])]

In [None]:
#similar way
df_sel.groupBy('purpose').count().orderBy(col('count').desc()).show()

+------------------+-----+
|           purpose|count|
+------------------+-----+
|debt_consolidation|70603|
|       credit_card|34961|
|  home_improvement| 7512|
|             other| 7094|
|    major_purchase| 2303|
|           medical| 1499|
|    small_business| 1051|
|               car| 1037|
|             house|  823|
|          vacation|  802|
|            moving|  656|
|  renewable_energy|   71|
+------------------+-----+



Aggregate Function

In [None]:
from pyspark.sql.functions import count,mean,stddev_pop,min,max,avg

In [None]:
#Checking how many null values are present in the column
from pyspark.sql.functions import isnan,when,count,col
df_sel.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in df_sel.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+



In [None]:
#DTI Column
df_sel.describe("dti","revol_util").show()

+-------+------------------+----------+
|summary|               dti|revol_util|
+-------+------------------+----------+
|  count|            128175|    128256|
|   mean|19.933177530719778|      null|
| stddev|20.143542243475476|      null|
|    min|               0.0|        0%|
|    max|             999.0|    99.90%|
+-------+------------------+----------+



In [None]:
from pyspark.sql.functions import regexp_replace, regexp_extract
#spark.sql("select ceil(regexp_replace(revol_util,'\%',"")), count(*) from loanstatus_sel group by ceil(regexp_replace(revol_util,'\%',""))")
spark.sql("select ceil(regexp_replace(revol_util, '\%', '')), count(*) from loanstatus_sel group by ceil(regexp_replace(revol_util, '\%', ''))").show()

+-----------------------------------------+--------+
|CEIL(regexp_replace(revol_util, \%, , 1))|count(1)|
+-----------------------------------------+--------+
|                                       29|    1824|
|                                       26|    1776|
|                                       65|    1297|
|                                       54|    1582|
|                                       19|    1537|
|                                        0|    1132|
|                                      112|       2|
|                                       22|    1665|
|                                        7|     944|
|                                       77|     964|
|                                       34|    1909|
|                                      184|       1|
|                                      126|       1|
|                                       94|     536|
|                                       50|    1697|
|                                      110|   

In [None]:
df_sel=df_sel.withColumn("revolutil_cleaned",regexp_extract(col("revol_util"),"\\d+",0))

In [None]:
df_sel.describe('revol_util','revolutil_cleaned').show()

+-------+----------+------------------+
|summary|revol_util| revolutil_cleaned|
+-------+----------+------------------+
|  count|    128256|            128256|
|   mean|      null| 43.76206961077844|
| stddev|      null|24.801849528207015|
|    min|        0%|                 0|
|    max|    99.90%|                99|
+-------+----------+------------------+



In [None]:
#Defining a function since the columns may conatin a lot of null values in it
def fill_avg(df,colname):
  return df.select(colname).agg(avg(colname))

In [None]:
rev_avg=fill_avg(df_sel,'revolutil_cleaned')

In [None]:
#assigning it to the data frame
from pyspark.sql.functions import lit
rev_avg=fill_avg(df_sel,'revolutil_cleaned').first()[0]
df_sel=df_sel.withColumn('rev_avg',lit(rev_avg))

In [None]:
#coalesce
'''
  It will take two variables
    -if the first is not null it will take it.
    -if the first is null it will take the second variable
'''
from pyspark.sql.functions import coalesce
df_sel=df_sel.withColumn('revolutil_cleaned',coalesce(col('revolutil_cleaned'),col('rev_avg')))
'''
In the above one['revolutil_cleaned'] it will take all the not null values  and whereever there is null it will use the 'rev_avg'.
So where ever there is a null it will substitute with the average(will convert the two columns into single columns)
'''

"\nIn the above one['revolutil_cleaned'] it will take all the not null values  and whereever there is null it will use the 'rev_avg'.\nSo where ever there is a null it will substitute with the average(will convert the two columns into single columns)\n"

In [None]:
#converting the datatype into double for the column 'revolutil_cleaned'
df_sel=df_sel.withColumn('revolutil_cleaned',df_sel['revolutil_cleaned'].cast('double'))


In [None]:
df_sel.describe('revol_util','revolutil_cleaned').show()

+-------+----------+------------------+
|summary|revol_util| revolutil_cleaned|
+-------+----------+------------------+
|  count|    128256|            128412|
|   mean|      null|43.762069610778525|
| stddev|      null|24.786779696453955|
|    min|        0%|               0.0|
|    max|    99.90%|             183.0|
+-------+----------+------------------+



In [None]:
#Checking how many null values are present in the column
from pyspark.sql.functions import isnan,when,count,col
df_sel.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in df_sel.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+-----------------+-------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|revolutil_cleaned|rev_avg|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+-----------------+-------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|                0|      0|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+-----------

In [None]:
#checking which values are null in the 'dti' column
spark.sql("select * from loanstatus_sel where dti is null").show()

+----------+--------------+-----+------------------+--------+----------+-----------+----------------+---------+----------+----------+----+-----------+----------+---------+------------------+---------+
|      term|home_ownership|grade|           purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc| dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|
+----------+--------------+-----+------------------+--------+----------+-----------+----------------+---------+----------+----------+----+-----------+----------+---------+------------------+---------+
| 60 months|      MORTGAGE|    B|    major_purchase|  10.72%|        AZ|    Current|       Joint App|    13000|       n/a|       0.0|null|          0|       26%|       47|                 0|    33.06|
| 60 months|          RENT|    C|debt_consolidation|  16.91%|        CA|    Current|       Joint App|    18000|       n/a|       0.0|null|          0|    35.20%|       12|                 0|    17

In [None]:
#from the above one we can see the applciariontype='Joint App' the values are null still narrowing doewn

In [None]:
spark.sql("select application_type,dti,dti_joint from loanstatus_sel where dti is null").show()

+----------------+----+---------+
|application_type| dti|dti_joint|
+----------------+----+---------+
|       Joint App|null|    33.06|
|       Joint App|null|    17.67|
|       Joint App|null|     27.3|
|       Joint App|null|     8.74|
|       Joint App|null|    11.68|
|       Joint App|null|    28.01|
|       Joint App|null|    15.06|
|       Joint App|null|    12.79|
|       Joint App|null|     8.05|
|       Joint App|null|    24.22|
|       Joint App|null|    20.04|
|       Joint App|null|    12.75|
|       Joint App|null|    24.89|
|       Joint App|null|    22.61|
|       Joint App|null|    19.01|
|       Joint App|null|    15.34|
|       Joint App|null|      0.6|
|       Joint App|null|    23.69|
|       Joint App|null|    17.29|
|       Joint App|null|    29.25|
+----------------+----+---------+
only showing top 20 rows



In [None]:
#from the above insights ,
'''
we are going to create a new column name 'dti_cleaned' in that with the help of coalesce function, 'dti' records null values will be filled with the vlaues of 'dti_joint'
'''
df_sel=df_sel.withColumn('dti_cleaned',coalesce(col("dti"),col("dti_joint")))
#checking the result
from pyspark.sql.functions import isnan,when,count,col
df_sel.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in df_sel.columns]).show()

+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+-----------------+-------+-----------+
|term|home_ownership|grade|purpose|int_rate|addr_state|loan_status|application_type|loan_amnt|emp_length|annual_inc|dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|revolutil_cleaned|rev_avg|dti_cleaned|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+---+-----------+----------+---------+------------------+---------+-----------------+-------+-----------+
|   0|             0|    0|      0|       0|         0|          0|               0|        0|         0|         0|237|          0|       156|        0|                 0|   111630|                0|      0|          0|
+----+--------------+-----+-------+--------+----------+-----------+----------------+---------+----------+----------+

In [6]:
df_sel.groupby('loan_status').count().show()

+------------------+------+
|       loan_status| count|
+------------------+------+
|        Fully Paid|  4908|
|   In Grace Period|   513|
|       Charged Off|   122|
|Late (31-120 days)|   849|
|           Current|121676|
| Late (16-30 days)|   344|
+------------------+------+



In [7]:
df_sel.where(df_sel.loan_status.isin(["Late (31-120 days)", "Charged Off","In Grace Period","Late (16-30 days)"])).show()

+----------+--------------+-----+------------------+--------+----------+------------------+----------------+---------+----------+----------+-----+-----------+----------+---------+------------------+---------+
|      term|home_ownership|grade|           purpose|int_rate|addr_state|       loan_status|application_type|loan_amnt|emp_length|annual_inc|  dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|
+----------+--------------+-----+------------------+--------+----------+------------------+----------------+---------+----------+----------+-----+-----------+----------+---------+------------------+---------+
| 36 months|          RENT|    B|debt_consolidation|  10.33%|        PA|Late (31-120 days)|      Individual|    16000|   3 years|   71000.0|16.58|          0|    11.80%|       28|                 0|     null|
| 60 months|      MORTGAGE|    D|debt_consolidation|  22.35%|        OH|Late (31-120 days)|       Joint App|    17500|   9 years|   50000.0|36.92|          0|    35

In [14]:
#adding a new column as bad loan
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
df_sel=df_sel.withColumn("bad_loan",when(df_sel.loan_status.isin(["Late (31-120 days)", "Charged Off","In Grace Period","Late (16-30 days)"]),'Yes').otherwise("No"))

In [15]:
df_sel.groupby('bad_loan').count().show()

+--------+------+
|bad_loan| count|
+--------+------+
|      No|126584|
|     Yes|  1828|
+--------+------+



In [16]:
df_sel.filter(df_sel.bad_loan == 'Yes').show()

+----------+--------------+-----+------------------+--------+----------+------------------+----------------+---------+----------+----------+-----+-----------+----------+---------+------------------+---------+--------+--------+
|      term|home_ownership|grade|           purpose|int_rate|addr_state|       loan_status|application_type|loan_amnt|emp_length|annual_inc|  dti|delinq_2yrs|revol_util|total_acc|num_tl_90g_dpd_24m|dti_joint|bad loan|bad_loan|
+----------+--------------+-----+------------------+--------+----------+------------------+----------------+---------+----------+----------+-----+-----------+----------+---------+------------------+---------+--------+--------+
| 36 months|          RENT|    B|debt_consolidation|  10.33%|        PA|Late (31-120 days)|      Individual|    16000|   3 years|   71000.0|16.58|          0|    11.80%|       28|                 0|     null|     Yes|     Yes|
| 60 months|      MORTGAGE|    D|debt_consolidation|  22.35%|        OH|Late (31-120 days)| 

In [19]:
#Dropping the unnecessary columns
df_sel_final=df_sel.drop('revol_util','dti','dti_joint','bad loan')

In [20]:
df_sel_final.printSchema()

root
 |-- term: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- num_tl_90g_dpd_24m: integer (nullable = true)
 |-- bad_loan: string (nullable = false)



In [21]:
#permanent table
#saving table in the parquet foramt
permanent_table='lc_loan_data'
df_sel.write.format("parquet").saveAsTable(permanent_table)