In [1]:
from pyspark.sql import SparkSession
 
spark=SparkSession.\
builder.\
config('spark.shuffle.useOldFetchProtocol','true').\
config('spark.ui.port','0').\
config("spark.sql.warehouse.dir","/user/itv016478/warehouse").\
enableHiveSupport().\
master('yarn').\
getOrCreate()

In [2]:
loans_def_raw_df= spark.read \
.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.load("/user/itv016478/lendingclubproject/raw/loans_defaulters_csv")

In [3]:
loans_def_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- delinq_amnt: double (nullable = true)
 |-- pub_rec: string (nullable = true)
 |-- pub_rec_bankruptcies: double (nullable = true)
 |-- inq_last_6mths: string (nullable = true)
 |-- total_rec_late_fee: string (nullable = true)
 |-- mths_since_last_delinq: string (nullable = true)
 |-- mths_since_last_record: string (nullable = true)



In [4]:
loans_def_raw_df.createOrReplaceTempView("loan_defaulters")


In [5]:
spark.sql("select distinct (delinq_2yrs) from loan_defaulters")

delinq_2yrs
1.0
271 monthly payme...
I bike to work on...
183xx
VISA and AMEX cre...
etc. and I feel t...
AZ
017xx
923xx
446xx


In [6]:
spark.sql("select delinq_2yrs, count(*)as total from loan_defaulters GROUP BY delinq_2yrs ORDER BY total desc ").show(50)

+------------------+-------+
|       delinq_2yrs|  total|
+------------------+-------+
|               0.0|1838878|
|               1.0| 281335|
|               2.0|  81285|
|               3.0|  29539|
|               4.0|  13179|
|               5.0|   6599|
|               6.0|   3717|
|               7.0|   2062|
|               8.0|   1223|
|               9.0|    818|
|              10.0|    556|
|              11.0|    363|
|              12.0|    264|
|              13.0|    165|
|              14.0|    120|
|              15.0|     87|
|              null|     63|
|              16.0|     55|
|              18.0|     30|
|              17.0|     30|
|              19.0|     23|
|              20.0|     17|
|              21.0|     12|
|                CA|      8|
|                TX|      6|
|    small_business|      5|
|                IL|      5|
|              22.0|      5|
|debt_consolidation|      5|
|                FL|      4|
|              24.0|      4|
|             

In [7]:
#1) want to make all as Datatype  float except member_id  string
#if any thing under float data type is not a number ...if its alphabets or symbol then it will come as NULL

In [8]:
loans_defaulter_schema = "member_id string,delinq_2yrs float,delinq_amnt float, pub_rec float ,pub_rec_bankruptcies float ,inq_last_6mths float , total_rec_late_fee float , mths_since_last_delinq float ,mths_since_last_record float "

In [9]:
loans_def_raw_df= spark.read \
.format("csv") \
.option("header",True) \
.schema(loans_defaulter_schema) \
.load("/user/itv016478/lendingclubproject/raw/loans_defaulters_csv")

In [10]:
loans_def_raw_df.createOrReplaceTempView("loan_defaulters")
spark.sql("select delinq_2yrs, count(*)as total from loan_defaulters GROUP BY delinq_2yrs ORDER BY total desc ").show(50)

+-----------+-------+
|delinq_2yrs|  total|
+-----------+-------+
|        0.0|1838878|
|        1.0| 281335|
|        2.0|  81285|
|        3.0|  29539|
|        4.0|  13179|
|        5.0|   6599|
|        6.0|   3717|
|        7.0|   2062|
|        8.0|   1223|
|        9.0|    818|
|       10.0|    556|
|       11.0|    363|
|       12.0|    264|
|       null|    261|
|       13.0|    165|
|       14.0|    120|
|       15.0|     87|
|       16.0|     55|
|       17.0|     30|
|       18.0|     30|
|       19.0|     23|
|       20.0|     17|
|       21.0|     12|
|       22.0|      5|
|       24.0|      4|
|       26.0|      3|
|       23.0|      2|
|       29.0|      2|
|       25.0|      2|
|       3.44|      2|
|       30.0|      2|
|       6.52|      1|
|       39.0|      1|
|      26.24|      1|
|       3.45|      1|
|      21.72|      1|
|       9.44|      1|
|       8.56|      1|
|       1.41|      1|
|      17.18|      1|
|      13.76|      1|
|       5.52|      1|
|       14

In [11]:
#2) Check how many null counts are there now ...which was earlier not a number .
spark.sql("select count(*) from  loan_defaulters where  delinq_2yrs is null")

count(1)
261


In [12]:
#3) want to convert/cast it to integer from float(decimal) and fill all NA with 0
from pyspark.sql.functions import col


In [13]:
#cast with integer & fill all NA with 0 in col "delinq_2yrs" (.fillna(0,subset= ["delinq_2yrs"])
loans_def_processed_df=loans_def_raw_df.withColumn("delinq_2yrs", col("delinq_2yrs").cast("integer")).fillna(0,subset= ["delinq_2yrs"])

In [14]:
#Now see the count where delinq_2yrs is null ie 0 now
loans_def_processed_df.createOrReplaceTempView("loan_defaulters")
spark.sql("select count(*) from  loan_defaulters where  delinq_2yrs is null")

count(1)
0


In [15]:
#Now this data is sorted for 0 "delingq_2yrs" ..total count is 1839141....
spark.sql("select delinq_2yrs, count(*)as total from loan_defaulters GROUP BY delinq_2yrs ORDER BY total desc ").show (40)

+-----------+-------+
|delinq_2yrs|  total|
+-----------+-------+
|          0|1839141|
|          1| 281337|
|          2|  81285|
|          3|  29545|
|          4|  13180|
|          5|   6601|
|          6|   3719|
|          7|   2063|
|          8|   1226|
|          9|    821|
|         10|    558|
|         11|    363|
|         12|    266|
|         13|    167|
|         14|    123|
|         15|     90|
|         16|     56|
|         17|     33|
|         18|     32|
|         19|     24|
|         20|     19|
|         21|     16|
|         22|      7|
|         24|      6|
|         23|      5|
|         26|      4|
|         25|      2|
|         29|      2|
|         30|      2|
|         27|      1|
|         28|      1|
|         35|      1|
|         39|      1|
|         32|      1|
|         58|      1|
|         42|      1|
|         36|      1|
+-----------+-------+



In [16]:
spark.sql("select * from loan_defaulters where delinq_2yrs = 0")

member_id,delinq_2yrs,delinq_amnt,pub_rec,pub_rec_bankruptcies,inq_last_6mths,total_rec_late_fee,mths_since_last_delinq,mths_since_last_record
d86a1c7244ace602c...,0,0.0,1.0,1.0,0.0,0.0,,91.0
25b8cd9ff47e50eea...,0,0.0,0.0,0.0,1.0,0.0,51.0,
f9773942305b2dc9c...,0,0.0,1.0,1.0,0.0,0.0,,88.0
b22f99c198f3738a7...,0,0.0,0.0,0.0,0.0,0.0,72.0,
4b3b7068c15b0e046...,0,0.0,0.0,0.0,4.0,0.0,68.0,
0c7a9f8a6d5b209d2...,0,0.0,1.0,1.0,0.0,0.0,,85.0
f1cc7e6ae9c735148...,0,0.0,0.0,0.0,0.0,0.0,,
8f8faa7eeb2a1ee41...,0,0.0,0.0,0.0,0.0,0.0,,
dad3f5ea8788186cb...,0,0.0,0.0,0.0,2.0,0.0,,
a20867c0985126202...,0,0.0,0.0,0.0,2.0,0.0,,


In [17]:
#4) Making 2 different dataframes

In [18]:
#For delingq category dataframe : Df1
#Want to save data in dataframe who has delayed/who is in defaulter list
loans_def_delinq_df= spark.sql("select member_id, delinq_2yrs, delinq_amnt, int(mths_since_last_delinq) from loan_defaulters where delinq_2yrs > 0 OR mths_since_last_delinq > 0 ")
loans_def_delinq_df

member_id,delinq_2yrs,delinq_amnt,mths_since_last_delinq
25b8cd9ff47e50eea...,0,0.0,51
b22f99c198f3738a7...,0,0.0,72
4b3b7068c15b0e046...,0,0.0,68
9468bb9f4c7e39e72...,1,0.0,7
a14a75a803373848c...,0,0.0,68
c1573ce869bafcbe9...,0,0.0,30
7a748acb9cb11d073...,0,0.0,40
3029a35037c3d47f2...,0,0.0,59
989b1622ddf441235...,0,0.0,75
69d87d14c315f222d...,0,0.0,55


In [19]:
loans_def_delinq_df.count()

1106163

In [20]:
#For inquiry category dataframe : Df2
#Want to save data in df whose inquiry has came in last few days or has some public record
loans_def_enq_df= spark.sql("select member_id from loan_defaulters where pub_rec > 0.0 or pub_rec_bankruptcies > 0.0 or inq_last_6mths > 0.0")
loans_def_enq_df

member_id
d86a1c7244ace602c...
25b8cd9ff47e50eea...
f9773942305b2dc9c...
4b3b7068c15b0e046...
0c7a9f8a6d5b209d2...
dad3f5ea8788186cb...
a20867c0985126202...
b62a92faa291b619c...
b9c65cbaf8241cbe8...
38edbe648d5b71710...


In [21]:
loans_def_enq_df.count()

1070125

In [22]:
loans_def_delinq_df.write \
.option("header",True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv016478/lendingclubproject/cleaned/loans_def_delinq_csv")\
.save()

In [23]:
loans_def_enq_df.write \
.option("header",True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv016478/lendingclubproject/cleaned/loans_def_enq_csv")\
.save()

In [24]:
loans_def_delinq_df.write \
.option("header",True) \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv016478/lendingclubproject/cleaned/loans_def_delinq_parquet")\
.save()

In [25]:
loans_def_enq_df.write \
.option("header",True) \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv016478/lendingclubproject/cleaned/loans_def_enq_parquet")\
.save()

In [26]:
loans_def_detail_records_enq_df= spark.sql("select member_id , pub_rec, pub_rec_bankruptcies, inq_last_6mths from loan_defaulters ").show()


+--------------------+-------+--------------------+--------------+
|           member_id|pub_rec|pub_rec_bankruptcies|inq_last_6mths|
+--------------------+-------+--------------------+--------------+
|d86a1c7244ace602c...|    1.0|                 1.0|           0.0|
|25b8cd9ff47e50eea...|    0.0|                 0.0|           1.0|
|f9773942305b2dc9c...|    1.0|                 1.0|           0.0|
|b22f99c198f3738a7...|    0.0|                 0.0|           0.0|
|4b3b7068c15b0e046...|    0.0|                 0.0|           4.0|
|0c7a9f8a6d5b209d2...|    1.0|                 1.0|           0.0|
|f1cc7e6ae9c735148...|    0.0|                 0.0|           0.0|
|8f8faa7eeb2a1ee41...|    0.0|                 0.0|           0.0|
|dad3f5ea8788186cb...|    0.0|                 0.0|           2.0|
|a20867c0985126202...|    0.0|                 0.0|           2.0|
|b62a92faa291b619c...|    2.0|                 1.0|           2.0|
|9468bb9f4c7e39e72...|    0.0|                 0.0|           

In [27]:
loans_def_pub_rec_df= loans_def_processed_df.withColumn("pub_rec", col("pub_rec").cast("integer")).fillna(0,subset= ["pub_rec"]).show()


+--------------------+-----------+-----------+-------+--------------------+--------------+------------------+----------------------+----------------------+
|           member_id|delinq_2yrs|delinq_amnt|pub_rec|pub_rec_bankruptcies|inq_last_6mths|total_rec_late_fee|mths_since_last_delinq|mths_since_last_record|
+--------------------+-----------+-----------+-------+--------------------+--------------+------------------+----------------------+----------------------+
|d86a1c7244ace602c...|          0|        0.0|      1|                 1.0|           0.0|               0.0|                  null|                  91.0|
|25b8cd9ff47e50eea...|          0|        0.0|      0|                 0.0|           1.0|               0.0|                  51.0|                  null|
|f9773942305b2dc9c...|          0|        0.0|      1|                 1.0|           0.0|               0.0|                  null|                  88.0|
|b22f99c198f3738a7...|          0|        0.0|      0|          

In [28]:
loans_def_bankrupt_df= loans_def_processed_df.withColumn("pub_rec_bankruptcies", col("pub_rec_bankruptcies").cast("integer")).fillna(0,subset= ["pub_rec_bankruptcies"])
loans_def_bankrupt_df

member_id,delinq_2yrs,delinq_amnt,pub_rec,pub_rec_bankruptcies,inq_last_6mths,total_rec_late_fee,mths_since_last_delinq,mths_since_last_record
d86a1c7244ace602c...,0,0.0,1.0,1,0.0,0.0,,91.0
25b8cd9ff47e50eea...,0,0.0,0.0,0,1.0,0.0,51.0,
f9773942305b2dc9c...,0,0.0,1.0,1,0.0,0.0,,88.0
b22f99c198f3738a7...,0,0.0,0.0,0,0.0,0.0,72.0,
4b3b7068c15b0e046...,0,0.0,0.0,0,4.0,0.0,68.0,
0c7a9f8a6d5b209d2...,0,0.0,1.0,1,0.0,0.0,,85.0
f1cc7e6ae9c735148...,0,0.0,0.0,0,0.0,0.0,,
8f8faa7eeb2a1ee41...,0,0.0,0.0,0,0.0,0.0,,
dad3f5ea8788186cb...,0,0.0,0.0,0,2.0,0.0,,
a20867c0985126202...,0,0.0,0.0,0,2.0,0.0,,


In [29]:
loans_def_inq_last_6mths_df= loans_def_processed_df.withColumn("inq_last_6mths", col("inq_last_6mths").cast("integer")).fillna(0,subset= ["inq_last_6mths"])
loans_def_inq_last_6mths_df  # Now this df is the latest and cleaned dataset

member_id,delinq_2yrs,delinq_amnt,pub_rec,pub_rec_bankruptcies,inq_last_6mths,total_rec_late_fee,mths_since_last_delinq,mths_since_last_record
d86a1c7244ace602c...,0,0.0,1.0,1.0,0,0.0,,91.0
25b8cd9ff47e50eea...,0,0.0,0.0,0.0,1,0.0,51.0,
f9773942305b2dc9c...,0,0.0,1.0,1.0,0,0.0,,88.0
b22f99c198f3738a7...,0,0.0,0.0,0.0,0,0.0,72.0,
4b3b7068c15b0e046...,0,0.0,0.0,0.0,4,0.0,68.0,
0c7a9f8a6d5b209d2...,0,0.0,1.0,1.0,0,0.0,,85.0
f1cc7e6ae9c735148...,0,0.0,0.0,0.0,0,0.0,,
8f8faa7eeb2a1ee41...,0,0.0,0.0,0.0,0,0.0,,
dad3f5ea8788186cb...,0,0.0,0.0,0.0,2,0.0,,
a20867c0985126202...,0,0.0,0.0,0.0,2,0.0,,


In [30]:
#Making new table based on latest and clean dataframe
loans_def_inq_last_6mths_df.createOrReplaceTempView("loan_defaulters")

In [31]:
loans_def_detail_records_enq_df= spark.sql("select member_id , pub_rec, pub_rec_bankruptcies, inq_last_6mths from loan_defaulters ").show()


+--------------------+-------+--------------------+--------------+
|           member_id|pub_rec|pub_rec_bankruptcies|inq_last_6mths|
+--------------------+-------+--------------------+--------------+
|d86a1c7244ace602c...|    1.0|                 1.0|             0|
|25b8cd9ff47e50eea...|    0.0|                 0.0|             1|
|f9773942305b2dc9c...|    1.0|                 1.0|             0|
|b22f99c198f3738a7...|    0.0|                 0.0|             0|
|4b3b7068c15b0e046...|    0.0|                 0.0|             4|
|0c7a9f8a6d5b209d2...|    1.0|                 1.0|             0|
|f1cc7e6ae9c735148...|    0.0|                 0.0|             0|
|8f8faa7eeb2a1ee41...|    0.0|                 0.0|             0|
|dad3f5ea8788186cb...|    0.0|                 0.0|             2|
|a20867c0985126202...|    0.0|                 0.0|             2|
|b62a92faa291b619c...|    2.0|                 1.0|             2|
|9468bb9f4c7e39e72...|    0.0|                 0.0|           

In [32]:
loans_def_detail_records_enq_df = spark.sql("""
    SELECT member_id, pub_rec, pub_rec_bankruptcies, inq_last_6mths
    FROM loan_defaulters
""")

In [35]:
loans_def_detail_records_enq_df.write \
.option("header",True) \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv016478/lendingclubproject/cleaned/loans_def_detail_records_enq_parquet")\
.save()

In [34]:
loans_def_detail_records_enq_df.write \
.option("header",True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv016478/lendingclubproject/cleaned/loans_def_detail_records_enq_csv")\
.save()