
## Client Requirement Updated Scenario

Before we were only storing member_id for those members who had social inquiry of any kind. Now, based on further analysis and communication, client want us to add extra details regarding the enquiry, as it is seen to be useful for calculating loan score


In [0]:
if 'spark' in locals():
    spark.stop()
from pyspark.sql import SparkSession
import getpass 
username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/itv008542/warehouse"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [0]:
spark

## Understanding Loan Score Calculation Logic

In [0]:
loans_def_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.option("inferSchema", True) \
.load("/user/itv008542/lendingclubproject/raw/loans_defaulters_csv")

In [0]:
loans_def_raw_df

member_id,delinq_2yrs,delinq_amnt,pub_rec,pub_rec_bankruptcies,inq_last_6mths,total_rec_late_fee,mths_since_last_delinq,mths_since_last_record
6d5091b3fcaaeb4ea...,0.0,0.0,0.0,0.0,1.0,0.0,30.0,
b5e7938b0a2da4cea...,1.0,0.0,0.0,0.0,4.0,0.0,6.0,
91060b858433e8a61...,0.0,0.0,0.0,0.0,0.0,0.0,,
cab1fa9f533688b0a...,0.0,0.0,0.0,0.0,0.0,0.0,,
f74e401c1ab0adf78...,1.0,0.0,0.0,0.0,3.0,0.0,12.0,
8aef4bb29d609d8d6...,0.0,0.0,0.0,0.0,0.0,0.0,,
538b4653da3b1e814...,0.0,0.0,0.0,0.0,0.0,0.0,49.0,
b24d55f21390533c5...,1.0,0.0,0.0,0.0,0.0,0.0,3.0,
1035c5401b0ca76d0...,0.0,0.0,1.0,1.0,1.0,0.0,,106.0
cb0f1777593e77909...,0.0,0.0,0.0,0.0,0.0,0.0,75.0,


In [0]:
loans_def_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- delinq_amnt: double (nullable = true)
 |-- pub_rec: string (nullable = true)
 |-- pub_rec_bankruptcies: double (nullable = true)
 |-- inq_last_6mths: string (nullable = true)
 |-- total_rec_late_fee: string (nullable = true)
 |-- mths_since_last_delinq: string (nullable = true)
 |-- mths_since_last_record: string (nullable = true)



In [0]:
loans_def_raw_df.createOrReplaceTempView("loan_defaulters")

In [0]:
spark.sql("select distinct(delinq_2yrs) from loan_defaulters")

delinq_2yrs
1.0
I bike to work on...
271 monthly payme...
VISA and AMEX cre...
etc. and I feel t...
183xx
AZ
017xx
923xx
446xx


In [0]:
spark.sql("select delinq_2yrs, count(*) as total from loan_defaulters group by delinq_2yrs order by total desc").show(40)

+------------------+-------+
|       delinq_2yrs|  total|
+------------------+-------+
|               0.0|1838878|
|               1.0| 281335|
|               2.0|  81285|
|               3.0|  29539|
|               4.0|  13179|
|               5.0|   6599|
|               6.0|   3717|
|               7.0|   2062|
|               8.0|   1223|
|               9.0|    818|
|              10.0|    556|
|              11.0|    363|
|              12.0|    264|
|              13.0|    165|
|              14.0|    120|
|              15.0|     87|
|              null|     63|
|              16.0|     55|
|              18.0|     30|
|              17.0|     30|
|              19.0|     23|
|              20.0|     17|
|              21.0|     12|
|                CA|      8|
|                TX|      6|
|    small_business|      5|
|              22.0|      5|
|                IL|      5|
|debt_consolidation|      5|
|              24.0|      4|
|                FL|      4|
|             

In [0]:
loan_defaulters_schema = "member_id string, delinq_2yrs float, delinq_amnt float, pub_rec float, pub_rec_bankruptcies float,inq_last_6mths float, total_rec_late_fee float, mths_since_last_delinq float, mths_since_last_record float"

In [0]:
loans_def_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(loan_defaulters_schema) \
.load("/user/itv008542/lendingclubproject/raw/loans_defaulters_csv")

In [0]:
loans_def_raw_df.createOrReplaceTempView("loan_defaulters")

In [0]:
spark.sql("select delinq_2yrs, count(*) as total from loan_defaulters group by delinq_2yrs order by total desc").show(40)

+-----------+-------+
|delinq_2yrs|  total|
+-----------+-------+
|        0.0|1838878|
|        1.0| 281335|
|        2.0|  81285|
|        3.0|  29539|
|        4.0|  13179|
|        5.0|   6599|
|        6.0|   3717|
|        7.0|   2062|
|        8.0|   1223|
|        9.0|    818|
|       10.0|    556|
|       11.0|    363|
|       12.0|    264|
|       null|    261|
|       13.0|    165|
|       14.0|    120|
|       15.0|     87|
|       16.0|     55|
|       18.0|     30|
|       17.0|     30|
|       19.0|     23|
|       20.0|     17|
|       21.0|     12|
|       22.0|      5|
|       24.0|      4|
|       26.0|      3|
|       23.0|      2|
|       25.0|      2|
|       3.44|      2|
|       30.0|      2|
|       29.0|      2|
|       3.45|      1|
|      13.76|      1|
|      21.72|      1|
|       58.0|      1|
|      17.17|      1|
|       6.52|      1|
|       5.52|      1|
|       14.1|      1|
|      22.62|      1|
+-----------+-------+
only showing top 40 rows



In [0]:
from pyspark.sql.functions import col

In [0]:
loans_def_processed_df = loans_def_raw_df.withColumn("delinq_2yrs", col("delinq_2yrs").cast("integer")).fillna(0, subset = ["delinq_2yrs"])

In [0]:
loans_def_processed_df.createOrReplaceTempView("loan_defaulters")

In [0]:
spark.sql("select count(*) from loan_defaulters where delinq_2yrs is null")

count(1)
0


In [0]:
spark.sql("select delinq_2yrs, count(*) as total from loan_defaulters group by delinq_2yrs order by total desc").show(40)

+-----------+-------+
|delinq_2yrs|  total|
+-----------+-------+
|          0|1839141|
|          1| 281337|
|          2|  81285|
|          3|  29545|
|          4|  13180|
|          5|   6601|
|          6|   3719|
|          7|   2063|
|          8|   1226|
|          9|    821|
|         10|    558|
|         11|    363|
|         12|    266|
|         13|    167|
|         14|    123|
|         15|     90|
|         16|     56|
|         17|     33|
|         18|     32|
|         19|     24|
|         20|     19|
|         21|     16|
|         22|      7|
|         24|      6|
|         23|      5|
|         26|      4|
|         30|      2|
|         25|      2|
|         29|      2|
|         27|      1|
|         35|      1|
|         28|      1|
|         42|      1|
|         39|      1|
|         32|      1|
|         58|      1|
|         36|      1|
+-----------+-------+



In [0]:
loans_def_delinq_df = spark.sql("select member_id,delinq_2yrs, delinq_amnt, int(mths_since_last_delinq) from loan_defaulters where delinq_2yrs > 0 or mths_since_last_delinq > 0")

In [0]:
loans_def_delinq_df

member_id,delinq_2yrs,delinq_amnt,mths_since_last_delinq
6d5091b3fcaaeb4ea...,0,0.0,30
b5e7938b0a2da4cea...,1,0.0,6
f74e401c1ab0adf78...,1,0.0,12
538b4653da3b1e814...,0,0.0,49
b24d55f21390533c5...,1,0.0,3
cb0f1777593e77909...,0,0.0,75
a962f4d59caec5fa1...,0,0.0,54
23857480ccf555ce4...,0,0.0,42
f2c4010f700d8c9c4...,0,0.0,29
0d3c568ff6944b11c...,0,0.0,33


In [0]:
loans_def_delinq_df.count()

1106163

In [0]:
loans_def_detail_records_enq_df = spark.sql("select member_id, pub_rec, pub_rec_bankruptcies, inq_last_6mths from loan_defaulters")

In [0]:
loans_def_detail_records_enq_df

member_id,pub_rec,pub_rec_bankruptcies,inq_last_6mths
6d5091b3fcaaeb4ea...,0.0,0.0,1.0
b5e7938b0a2da4cea...,0.0,0.0,4.0
91060b858433e8a61...,0.0,0.0,0.0
cab1fa9f533688b0a...,0.0,0.0,0.0
f74e401c1ab0adf78...,0.0,0.0,3.0
8aef4bb29d609d8d6...,0.0,0.0,0.0
538b4653da3b1e814...,0.0,0.0,0.0
b24d55f21390533c5...,0.0,0.0,0.0
1035c5401b0ca76d0...,1.0,1.0,1.0
cb0f1777593e77909...,0.0,0.0,0.0


In [0]:
loans_def_p_pub_rec_df = loans_def_processed_df.withColumn("pub_rec", col("pub_rec").cast("integer")).fillna(0, subset = ["pub_rec"])

In [0]:
loans_def_p_pub_rec_bankruptcies_df = loans_def_p_pub_rec_df.withColumn("pub_rec_bankruptcies", col("pub_rec_bankruptcies").cast("integer")).fillna(0, subset = ["pub_rec_bankruptcies"])

In [0]:
loans_def_p_inq_last_6mths_df = loans_def_p_pub_rec_bankruptcies_df.withColumn("inq_last_6mths", col("inq_last_6mths").cast("integer")).fillna(0, subset = ["inq_last_6mths"])

In [0]:
loans_def_p_inq_last_6mths_df.createOrReplaceTempView("loan_defaulters")

In [0]:
loans_def_detail_records_enq_df = spark.sql("select member_id, pub_rec, pub_rec_bankruptcies, inq_last_6mths from loan_defaulters")

In [0]:
loans_def_detail_records_enq_df

member_id,pub_rec,pub_rec_bankruptcies,inq_last_6mths
6d5091b3fcaaeb4ea...,0,0,1
b5e7938b0a2da4cea...,0,0,4
91060b858433e8a61...,0,0,0
cab1fa9f533688b0a...,0,0,0
f74e401c1ab0adf78...,0,0,3
8aef4bb29d609d8d6...,0,0,0
538b4653da3b1e814...,0,0,0
b24d55f21390533c5...,0,0,0
1035c5401b0ca76d0...,1,1,1
cb0f1777593e77909...,0,0,0


In [0]:
loans_def_detail_records_enq_df.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_csv") \
.save()

In [0]:
loans_def_detail_records_enq_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_parquet") \
.save()

In [0]:
! hadoop fs -ls /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_csv
! hadoop fs -ls /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_parquet

Found 3 items
-rw-r--r--   3 itv008542 supergroup          0 2024-02-07 11:19 /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_csv/_SUCCESS
-rw-r--r--   3 itv008542 supergroup   81767094 2024-02-07 11:19 /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_csv/part-00000-4ee05a8e-73d3-4413-8947-5cedb7f9a995-c000.csv
-rw-r--r--   3 itv008542 supergroup   78743512 2024-02-07 11:19 /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_csv/part-00001-4ee05a8e-73d3-4413-8947-5cedb7f9a995-c000.csv
Found 3 items
-rw-r--r--   3 itv008542 supergroup          0 2024-02-07 11:19 /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_parquet/_SUCCESS
-rw-r--r--   3 itv008542 supergroup   77102271 2024-02-07 11:19 /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_parquet/part-00000-60756812-6a06-4f37-bc1f-a36ca6e3ce71-c000.snappy.parquet
-rw-r--r--   3 itv008542 supe

In [0]:
! hadoop fs -head /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_csv/part-00000-4ee05a8e-73d3-4413-8947-5cedb7f9a995-c000.csv

member_id,pub_rec,pub_rec_bankruptcies,inq_last_6mths
6d5091b3fcaaeb4eac37445042b6b79e8b21f16943b9c32c4f5dd74dcb0f2210,0,0,1
b5e7938b0a2da4ceaa75ea2e3c111d1e33ca0f944f954b6d65671ef395d21713,0,0,4
91060b858433e8a6107be9b76b324e304b5802631fc081b449188fe4b4e32205,0,0,0
cab1fa9f533688b0aab7c6be7b6256d15ad1dd5f3c4ca061a49cfeb26901977d,0,0,0
f74e401c1ab0adf788a7279dd70a27459fcc47733d35eed71149ff371d2a7283,0,0,3
8aef4bb29d609d8d684fd709af965bd690fc9ef3a2691777f38e0c10d622fe97,0,0,0
538b4653da3b1e8142b50baacf97a57b721eb6ec565f5fb63a1b71d191e9e934,0,0,0
b24d55f21390533c512f78f822153dc247e984dcd0cfe031c341ccef1c689817,0,0,0
1035c5401b0ca76d02c334b813302a9b72825e70b1659e8bc8ce5eea6c5d1f52,1,1,1
cb0f1777593e77909d0d359a6d9e633f28568500e17806f815395c92f0cd375d,0,0,0
a962f4d59caec5fa1d51d5ad6ccf761573b476dbc25295e9645a491fa2bc6997,0,0,0
e7592ab57b3afd9f10d50541f66994def18784791690158a17b220c450751b96,0,0,0
603afa9d1be879b7bb0685aa87fe166f4b90bde7bd1304ec55c845c587940e8f,0,0,0
9fe2d59ddf2a4f37e54e225

## Processing - Permanent Table Creation on Cleaned Data


## Requirement 1
Some people from other team wants to use cleaned data for analysis purpose. Need to create a permanent table on top of cleaned data


In [0]:
customers_df = spark.read \
.format("parquet") \
.load("/user/itv008542/lendingclubproject/raw/cleaned/customers_parquet")

In [0]:
customers_df

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
c5285504f6fe07df0...,,6,OWN,75000.0,MT,590xx,USA,A,A5,Verified,103108.0,Individual,,,2024-02-07 02:40:...
423472a2a5feccdbe...,Assistant General...,1,MORTGAGE,62000.0,RI,028xx,USA,A,A5,Source Verified,277024.0,Individual,,,2024-02-07 02:40:...
5dfbbc290399123f6...,RN,3,RENT,83000.0,NY,104xx,USA,B,B3,Source Verified,33300.0,Individual,,,2024-02-07 02:40:...
4947cb2f94eaa67f3...,Real estate broker,10,RENT,130000.0,CA,926xx,USA,C,C3,Not Verified,93334.0,Individual,,,2024-02-07 02:40:...
72e7d16536ebd5d1c...,Bus Operator,10,MORTGAGE,120000.0,NY,117xx,USA,C,C1,Not Verified,451501.0,Individual,,,2024-02-07 02:40:...
1d661a84e5e614169...,site leader,8,MORTGAGE,32000.0,VA,242xx,USA,C,C2,Verified,122934.0,Individual,,,2024-02-07 02:40:...
410599dabdbbfa060...,Police Officer,10,MORTGAGE,82000.0,OH,442xx,USA,B,B5,Not Verified,226936.0,Individual,,,2024-02-07 02:40:...
cb15cc131f37ff4cb...,Operations Liaiso...,10,RENT,75000.0,NJ,074xx,USA,D,D4,Source Verified,71850.0,Individual,,,2024-02-07 02:40:...
48774a798366a9c93...,Client Services M...,10,MORTGAGE,70000.0,WA,980xx,USA,A,A3,Not Verified,50946.0,Individual,,,2024-02-07 02:40:...
77c3ce033df62339d...,Service Technician,10,MORTGAGE,50000.0,TX,795xx,USA,B,B2,Source Verified,71889.0,Individual,,,2024-02-07 02:40:...


In [0]:
# Create a database to store all our relevant datas
spark.sql("create database itv008542_lending_club")


#### 1. Create customers table

In [0]:
spark.sql("""create external table itv008542_lending_club.customers(
member_id string, emp_title string, emp_length int, 
home_ownership string, annual_income float, address_state string, address_zipcode string, address_country string, grade string, 
sub_grade string, verification_status string, total_high_credit_limit float, application_type string, join_annual_income float, 
verification_status_joint string, ingest_date timestamp)
stored as parquet location  '/user/itv008542/lendingclubproject/raw/cleaned/customers_parquet'
""")

In [0]:
spark.sql("select * from itv008542_lending_club.customers")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
c5285504f6fe07df0...,,6,OWN,75000.0,MT,590xx,USA,A,A5,Verified,103108.0,Individual,,,2024-02-07 02:40:...
423472a2a5feccdbe...,Assistant General...,1,MORTGAGE,62000.0,RI,028xx,USA,A,A5,Source Verified,277024.0,Individual,,,2024-02-07 02:40:...
5dfbbc290399123f6...,RN,3,RENT,83000.0,NY,104xx,USA,B,B3,Source Verified,33300.0,Individual,,,2024-02-07 02:40:...
4947cb2f94eaa67f3...,Real estate broker,10,RENT,130000.0,CA,926xx,USA,C,C3,Not Verified,93334.0,Individual,,,2024-02-07 02:40:...
72e7d16536ebd5d1c...,Bus Operator,10,MORTGAGE,120000.0,NY,117xx,USA,C,C1,Not Verified,451501.0,Individual,,,2024-02-07 02:40:...
1d661a84e5e614169...,site leader,8,MORTGAGE,32000.0,VA,242xx,USA,C,C2,Verified,122934.0,Individual,,,2024-02-07 02:40:...
410599dabdbbfa060...,Police Officer,10,MORTGAGE,82000.0,OH,442xx,USA,B,B5,Not Verified,226936.0,Individual,,,2024-02-07 02:40:...
cb15cc131f37ff4cb...,Operations Liaiso...,10,RENT,75000.0,NJ,074xx,USA,D,D4,Source Verified,71850.0,Individual,,,2024-02-07 02:40:...
48774a798366a9c93...,Client Services M...,10,MORTGAGE,70000.0,WA,980xx,USA,A,A3,Not Verified,50946.0,Individual,,,2024-02-07 02:40:...
77c3ce033df62339d...,Service Technician,10,MORTGAGE,50000.0,TX,795xx,USA,B,B2,Source Verified,71889.0,Individual,,,2024-02-07 02:40:...



#### 2. Create loans table

In [0]:
! hadoop fs -ls /user/itv008542/lendingclubproject/raw/cleaned

Found 12 items
drwxr-xr-x   - itv008542 supergroup          0 2024-02-07 03:04 /user/itv008542/lendingclubproject/raw/cleaned/customers_csv
drwxr-xr-x   - itv008542 supergroup          0 2024-02-07 02:41 /user/itv008542/lendingclubproject/raw/cleaned/customers_parquet
drwxr-xr-x   - itv008542 supergroup          0 2024-02-07 04:05 /user/itv008542/lendingclubproject/raw/cleaned/loans_csv
drwxr-xr-x   - itv008542 supergroup          0 2024-02-07 11:19 /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_csv
drwxr-xr-x   - itv008542 supergroup          0 2024-02-07 11:19 /user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_parquet
drwxr-xr-x   - itv008542 supergroup          0 2024-02-07 06:47 /user/itv008542/lendingclubproject/raw/cleaned/loans_defaulters_deling_csv
drwxr-xr-x   - itv008542 supergroup          0 2024-02-07 06:48 /user/itv008542/lendingclubproject/raw/cleaned/loans_defaulters_deling_parquet
drwxr-xr-x   - itv008542 super

In [0]:
spark.sql("""
create external table itv008542_lending_club.loans(
loan_id string, member_id string, loan_amount float, funded_amount float,
loan_term_years integer, interest_rate float, monthly_installment float, issue_date string,
loan_status string, loan_purpose string, loan_title string, ingest_date timestamp)
stored as parquet
location '/user/itv008542/lendingclubproject/raw/cleaned/loans_parquet'
""")

In [0]:
spark.sql("select * from itv008542_lending_club.loans")

loan_id,member_id,loan_amount,funded_amount,loan_term_years,interest_rate,monthly_installment,issue_date,loan_status,loan_purpose,loan_title,ingest_date
128664977,34680bca04aef4ac5...,10000.0,10000.0,5,9.93,212.13,Feb-2018,Current,credit_card,Credit card refin...,2024-02-07 04:04:...
129133385,487c223c3c924963a...,25000.0,25000.0,5,25.81,745.71,Feb-2018,Current,debt_consolidation,Debt consolidation,2024-02-07 04:04:...
129227575,042bf442e5b77d702...,19000.0,19000.0,3,13.58,645.51,Feb-2018,Fully Paid,debt_consolidation,Debt consolidation,2024-02-07 04:04:...
129090652,a48155acd1afbbabc...,18000.0,18000.0,3,9.93,580.22,Feb-2018,Current,debt_consolidation,Debt consolidation,2024-02-07 04:04:...
129138233,f5bb6c38570c1625f...,7000.0,7000.0,3,12.61,234.55,Feb-2018,Current,debt_consolidation,Debt consolidation,2024-02-07 04:04:...
128934532,709bdec3d98b1c0b6...,18000.0,18000.0,3,7.35,558.68,Feb-2018,Current,credit_card,Credit card refin...,2024-02-07 04:04:...
129044423,38149670b64a8b6d4...,20000.0,20000.0,3,20.0,743.28,Feb-2018,Current,credit_card,Credit card refin...,2024-02-07 04:04:...
129225672,d1b6e7d2de14abea9...,7500.0,7500.0,3,14.07,256.59,Feb-2018,Current,home_improvement,Home improvement,2024-02-07 04:04:...
129168278,5314dd716ff1ccb4b...,10200.0,10200.0,3,7.96,319.45,Feb-2018,Current,credit_card,Credit card refin...,2024-02-07 04:04:...
127798780,7e402da58e14ac1fa...,15000.0,15000.0,5,12.61,338.31,Feb-2018,Current,credit_card,Credit card refin...,2024-02-07 04:04:...



#### 3. Create loan repayments table

In [0]:
spark.sql("""CREATE EXTERNAL TABLE itv008542_lending_club.loans_repayments(loan_id string, total_principal_received float,
total_interest_received float,total_late_fee_received float,total_payment_received float,last_payment_amount float,
last_payment_date string,next_payment_date string,ingest_date timestamp)
stored as parquet LOCATION '/user/itv008542/lendingclubproject/raw/cleaned/loans_repayments_parquet'
""")

In [0]:
spark.sql("select * from itv008542_lending_club.loans_repayments")

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
14408468,5000.0,561.49,0.0,5561.49,4019.97,,,2024-02-07 06:25:...
14520567,11000.0,1716.53,0.0,12716.525,353.13,,,2024-02-07 06:25:...
14708730,21760.43,4807.25,0.0,26567.68,830.24,,,2024-02-07 06:25:...
14491003,1500.0,192.68,0.0,1692.6753,1342.22,,,2024-02-07 06:25:...
14510981,10000.0,2128.02,0.0,12128.021,336.52,,,2024-02-07 06:25:...
14177845,5380.29,3227.55,0.0,9415.65,300.28,,,2024-02-07 06:25:...
13947687,10000.0,3859.88,0.0,13859.883,5032.64,,,2024-02-07 06:25:...
12905731,18000.0,5535.46,0.0,23535.46,652.67,,,2024-02-07 06:25:...
14137736,12000.0,1424.07,0.0,13424.07,6487.77,,,2024-02-07 06:25:...
13207083,3200.0,812.68,0.0,4012.68,1977.27,,,2024-02-07 06:25:...



#### 4. Create loan defaulters delinq

In [0]:
spark.sql("""CREATE EXTERNAL TABLE itv008542_lending_club.loans_defaulters_delinq(
member_id string, delinq_2yrs integer, delinq_amnt float, mths_since_last_delinq integer)
stored as parquet LOCATION '/user/itv008542/lendingclubproject/raw/cleaned/loans_defaulters_deling_parquet'""")

In [0]:
spark.sql("select * from itv008542_lending_club.loans_defaulters_delinq")

member_id,delinq_2yrs,delinq_amnt,mths_since_last_delinq
6d5091b3fcaaeb4ea...,0,0.0,30
b5e7938b0a2da4cea...,1,0.0,6
f74e401c1ab0adf78...,1,0.0,12
538b4653da3b1e814...,0,0.0,49
b24d55f21390533c5...,1,0.0,3
cb0f1777593e77909...,0,0.0,75
a962f4d59caec5fa1...,0,0.0,54
23857480ccf555ce4...,0,0.0,42
f2c4010f700d8c9c4...,0,0.0,29
0d3c568ff6944b11c...,0,0.0,33



#### 5. Create loan defaulters detail enquiry table

In [0]:
spark.sql("""CREATE EXTERNAL TABLE itv008542_lending_club.loans_defaulters_detail_rec_enq(
member_id string, pub_rec integer, pub_rec_bankruptcies integer, inq_last_6mths integer)
stored as parquet LOCATION '/user/itv008542/lendingclubproject/raw/cleaned/loans_def_detail_records_enq_df_parquet'""")

In [0]:
spark.sql("select * from itv008542_lending_club.loans_defaulters_detail_rec_enq")

member_id,pub_rec,pub_rec_bankruptcies,inq_last_6mths
6d5091b3fcaaeb4ea...,0,0,1
b5e7938b0a2da4cea...,0,0,4
91060b858433e8a61...,0,0,0
cab1fa9f533688b0a...,0,0,0
f74e401c1ab0adf78...,0,0,3
8aef4bb29d609d8d6...,0,0,0
538b4653da3b1e814...,0,0,0
b24d55f21390533c5...,0,0,0
1035c5401b0ca76d0...,1,1,1
cb0f1777593e77909...,0,0,0


In [0]:
spark.sql("DROP TABLE IF EXISTS itv008542_lending_club.loans_defaulters_detail_rec_enq")

## Access Patterns - Quick Access(Old Data) & Slow Access(New Data)

In [0]:
spark.sql("use itv008542_lending_club")
spark.sql("show tables").show(truncate=False)

+----------------------+-------------------------------+-----------+
|database              |tableName                      |isTemporary|
+----------------------+-------------------------------+-----------+
|itv008542_lending_club|customers                      |false      |
|itv008542_lending_club|loans                          |false      |
|itv008542_lending_club|loans_defaulters_delinq        |false      |
|itv008542_lending_club|loans_defaulters_detail_rec_enq|false      |
|itv008542_lending_club|loans_repayments               |false      |
+----------------------+-------------------------------+-----------+




## Requirement 2

- A complete view of these 5 datasets in one single view.
- Need the most upto date data
- New data comes every 24 hrs, the underlying tables are refreshed every 24 hrs
- For this, as a solution we will create view

In [0]:
spark.sql("""
create or replace view itv008542_lending_club.customers_loan_v as select
l.loan_id,
c.member_id,
c.emp_title,
c.emp_length,
c.home_ownership,
c.annual_income,
c.address_state,
c.address_zipcode,
c.address_country,
c.grade,
c.sub_grade,
c.verification_status,
c.total_high_credit_limit,
c.application_type,
c.join_annual_income,
c.verification_status_joint,
l.loan_amount,
l.funded_amount,
l.loan_term_years,
l.interest_rate,
l.monthly_installment,
l.issue_date,
l.loan_status,
l.loan_purpose,
r.total_principal_received,
r.total_interest_received,
r.total_late_fee_received,
r.last_payment_date,
r.next_payment_date,
d.delinq_2yrs,
d.delinq_amnt,
d.mths_since_last_delinq,
e.pub_rec,
e.pub_rec_bankruptcies,
e.inq_last_6mths

FROM itv008542_lending_club.customers c
LEFT JOIN itv008542_lending_club.loans l on c.member_id = l.member_id
LEFT JOIN itv008542_lending_club.loans_repayments r ON l.loan_id = r.loan_id
LEFT JOIN itv008542_lending_club.loans_defaulters_delinq d ON c.member_id = d.member_id
LEFT JOIN itv008542_lending_club.loans_defaulters_detail_rec_enq e ON c.member_id = e.member_id
""")

In [0]:
spark.sql("select * from itv008542_lending_club.customers_loan_v")

loan_id,member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,loan_amount,funded_amount,loan_term_years,interest_rate,monthly_installment,issue_date,loan_status,loan_purpose,total_principal_received,total_interest_received,total_late_fee_received,last_payment_date,next_payment_date,delinq_2yrs,delinq_amnt,mths_since_last_delinq,pub_rec,pub_rec_bankruptcies,inq_last_6mths
71175798,000c8875b71a6b47c...,Director of Gvt O...,1,MORTGAGE,100000.0,CA,920xx,USA,A,A4,Not Verified,761500.0,Individual,,,3000.0,3000.0,3,7.39,93.17,Feb-2016,Fully Paid,debt_consolidation,3000.0,114.47,0.0,,,1.0,0.0,13.0,0,0,0
139313300,000fc98fc1ca5faa3...,coord,10,MORTGAGE,220000.0,CA,952xx,USA,A,A3,Not Verified,227457.0,Joint App,230000.0,,20000.0,20000.0,3,7.21,619.47,Aug-2018,Current,credit_card,3558.76,769.52,0.0,Apr-2019,Apr-2019,,,,0,0,0
143195702,0012728d9f616bdf2...,,6,OWN,35000.0,SC,294xx,USA,A,A5,Not Verified,20900.0,Individual,,,9000.0,9000.0,3,8.46,283.95,Dec-2018,Current,debt_consolidation,666.17,181.45,0.0,Apr-2019,Apr-2019,0.0,0.0,24.0,0,0,0
2284564,00151ece27c7ca280...,Northglenn Police...,8,RENT,54000.0,CO,802xx,USA,C,C1,Not Verified,40650.0,Individual,,,12800.0,12800.0,3,14.33,439.53,Nov-2012,Fully Paid,debt_consolidation,12800.0,2192.68,0.0,,,0.0,0.0,29.0,0,0,1
3845340,002800d49886390d2...,RehabCare,2,RENT,60000.0,TX,787xx,USA,D,D3,Not Verified,37248.0,Individual,,,15000.0,15000.0,3,18.75,547.95,Mar-2013,Charged Off,debt_consolidation,12856.06,4702.54,0.0,,,,,,0,0,0
360493,003715c2aec34dd43...,Apex Technologies...,1,MORTGAGE,90000.0,PA,171xx,USA,C,C3,Not Verified,,Individual,,,12000.0,12000.0,3,12.41,400.93,Oct-2008,Fully Paid,debt_consolidation,12000.0,208.02,0.0,,,,,,0,0,2
95004059,003769d7f54c7859e...,Trainer,10,MORTGAGE,35000.0,NC,282xx,USA,E,E1,Verified,184961.0,Individual,,,10000.0,10000.0,5,22.74,280.42,Dec-2016,Charged Off,medical,90.92,176.87,0.0,,,1.0,0.0,20.0,0,0,0
142641777,0037bb910c0a758f5...,Network Engineer,2,RENT,105000.0,IL,601xx,USA,C,C3,Source Verified,50260.0,Individual,,,2000.0,2000.0,3,15.02,69.36,Oct-2018,Current,other,227.25,116.21,0.0,Apr-2019,Apr-2019,0.0,0.0,66.0,0,0,0
91518424,003d7bee408492f11...,Flight Attendant,10,RENT,75000.0,AZ,852xx,USA,A,A5,Not Verified,166387.0,Individual,,,10000.0,10000.0,3,7.99,313.32,Oct-2016,Current,other,7864.12,1213.28,0.0,Apr-2019,Apr-2019,,,,0,0,0
116883875,003e1e6cbd2920bbb...,Operations Superv...,9,MORTGAGE,92000.0,LA,705xx,USA,A,A5,Source Verified,375806.0,Individual,,,10000.0,10000.0,3,7.97,313.23,Aug-2017,Current,debt_consolidation,4980.61,970.21,0.0,Apr-2019,Apr-2019,1.0,0.0,23.0,1,1,0



## Requirement 3

- Another team came and asked that they would need a very quick access of datas from the view table ceated above.

- Solution 1: we have a job that runs every 7 DAYS ONE TIME, THE JOIN OF 5 TABLE IS DONE AND THE RESULTS ARE PUT IN A TABLE The results are faster but the data might be older in this case ( max 7 days old ). Since its heavy job, we cannot affor running it daily. So, we need to communicate if the team is okay with this.



In [0]:
spark.sql("""
create table itv008542_lending_club.customers_loan_t as select
l.loan_id,
c.member_id,
c.emp_title,
c.emp_length,
c.home_ownership,
c.annual_income,
c.address_state,
c.address_zipcode,
c.address_country,
c.grade,
c.sub_grade,
c.verification_status,
c.total_high_credit_limit,
c.application_type,
c.join_annual_income,
c.verification_status_joint,
l.loan_amount,
l.funded_amount,
l.loan_term_years,
l.interest_rate,
l.monthly_installment,
l.issue_date,
l.loan_status,
l.loan_purpose,
r.total_principal_received,
r.total_interest_received,
r.total_late_fee_received,
r.last_payment_date,
r.next_payment_date,
d.delinq_2yrs,
d.delinq_amnt,
d.mths_since_last_delinq,
e.pub_rec,
e.pub_rec_bankruptcies,
e.inq_last_6mths

FROM itv008542_lending_club.customers c
LEFT JOIN itv008542_lending_club.loans l on c.member_id = l.member_id
LEFT JOIN itv008542_lending_club.loans_repayments r ON l.loan_id = r.loan_id
LEFT JOIN itv008542_lending_club.loans_defaulters_delinq d ON c.member_id = d.member_id
LEFT JOIN itv008542_lending_club.loans_defaulters_detail_rec_enq e ON c.member_id = e.member_id
""")

In [0]:
spark.sql("select * from itv008542_lending_club.customers_loan_t")

loan_id,member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,loan_amount,funded_amount,loan_term_years,interest_rate,monthly_installment,issue_date,loan_status,loan_purpose,total_principal_received,total_interest_received,total_late_fee_received,last_payment_date,next_payment_date,delinq_2yrs,delinq_amnt,mths_since_last_delinq,pub_rec,pub_rec_bankruptcies,inq_last_6mths
71175798,000c8875b71a6b47c...,Director of Gvt O...,1,MORTGAGE,100000.0,CA,920xx,USA,A,A4,Not Verified,761500.0,Individual,,,3000.0,3000.0,3,7.39,93.17,Feb-2016,Fully Paid,debt_consolidation,3000.0,114.47,0.0,,,1.0,0.0,13.0,0,0,0
139313300,000fc98fc1ca5faa3...,coord,10,MORTGAGE,220000.0,CA,952xx,USA,A,A3,Not Verified,227457.0,Joint App,230000.0,,20000.0,20000.0,3,7.21,619.47,Aug-2018,Current,credit_card,3558.76,769.52,0.0,Apr-2019,Apr-2019,,,,0,0,0
143195702,0012728d9f616bdf2...,,6,OWN,35000.0,SC,294xx,USA,A,A5,Not Verified,20900.0,Individual,,,9000.0,9000.0,3,8.46,283.95,Dec-2018,Current,debt_consolidation,666.17,181.45,0.0,Apr-2019,Apr-2019,0.0,0.0,24.0,0,0,0
2284564,00151ece27c7ca280...,Northglenn Police...,8,RENT,54000.0,CO,802xx,USA,C,C1,Not Verified,40650.0,Individual,,,12800.0,12800.0,3,14.33,439.53,Nov-2012,Fully Paid,debt_consolidation,12800.0,2192.68,0.0,,,0.0,0.0,29.0,0,0,1
3845340,002800d49886390d2...,RehabCare,2,RENT,60000.0,TX,787xx,USA,D,D3,Not Verified,37248.0,Individual,,,15000.0,15000.0,3,18.75,547.95,Mar-2013,Charged Off,debt_consolidation,12856.06,4702.54,0.0,,,,,,0,0,0
360493,003715c2aec34dd43...,Apex Technologies...,1,MORTGAGE,90000.0,PA,171xx,USA,C,C3,Not Verified,,Individual,,,12000.0,12000.0,3,12.41,400.93,Oct-2008,Fully Paid,debt_consolidation,12000.0,208.02,0.0,,,,,,0,0,2
95004059,003769d7f54c7859e...,Trainer,10,MORTGAGE,35000.0,NC,282xx,USA,E,E1,Verified,184961.0,Individual,,,10000.0,10000.0,5,22.74,280.42,Dec-2016,Charged Off,medical,90.92,176.87,0.0,,,1.0,0.0,20.0,0,0,0
142641777,0037bb910c0a758f5...,Network Engineer,2,RENT,105000.0,IL,601xx,USA,C,C3,Source Verified,50260.0,Individual,,,2000.0,2000.0,3,15.02,69.36,Oct-2018,Current,other,227.25,116.21,0.0,Apr-2019,Apr-2019,0.0,0.0,66.0,0,0,0
91518424,003d7bee408492f11...,Flight Attendant,10,RENT,75000.0,AZ,852xx,USA,A,A5,Not Verified,166387.0,Individual,,,10000.0,10000.0,3,7.99,313.32,Oct-2016,Current,other,7864.12,1213.28,0.0,Apr-2019,Apr-2019,,,,0,0,0
116883875,003e1e6cbd2920bbb...,Operations Superv...,9,MORTGAGE,92000.0,LA,705xx,USA,A,A5,Source Verified,375806.0,Individual,,,10000.0,10000.0,3,7.97,313.23,Aug-2017,Current,debt_consolidation,4980.61,970.21,0.0,Apr-2019,Apr-2019,1.0,0.0,23.0,1,1,0


In [0]:
! hadoop fs -ls /user/itv008542/warehouse

Found 8 items
drwxr-xr-x   - itv008542 supergroup          0 2024-01-28 04:53 /user/itv008542/warehouse/assignments_hotel_usecase.db
drwxr-xr-x   - itv008542 supergroup          0 2024-01-27 11:55 /user/itv008542/warehouse/cust_transaction_db.db
drwxr-xr-x   - itv008542 supergroup          0 2023-10-23 05:06 /user/itv008542/warehouse/itv008542_bucketingdb.db
drwxr-xr-x   - itv008542 supergroup          0 2023-10-16 03:41 /user/itv008542/warehouse/itv008542_cachingdemo_db.db
drwxr-xr-x   - itv008542 supergroup          0 2024-01-25 10:52 /user/itv008542/warehouse/itv008542_cachingdemo_db_ext.db
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 02:02 /user/itv008542/warehouse/itv008542_lending_club.db
drwxr-xr-x   - itv008542 supergroup          0 2023-11-25 03:18 /user/itv008542/warehouse/itv008542_optimizejoin.db
drwxr-xr-x   - itv008542 supergroup          0 2024-01-20 11:04 /user/itv008542/warehouse/itv008542_retail.db


In [0]:
! hadoop fs -ls /user/itv008542/warehouse/itv008542_lending_club.db

Found 1 items
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 02:04 /user/itv008542/warehouse/itv008542_lending_club.db/customers_loan_t


In [0]:
! hadoop fs -ls /user/itv008542/warehouse/itv008542_lending_club.db/customers_loan_t

Found 200 items
-rwxr-xr-x   3 itv008542 supergroup    3088103 2024-02-08 02:03 /user/itv008542/warehouse/itv008542_lending_club.db/customers_loan_t/part-00000-7f3c5175-85b0-4f6b-8115-6e7e5807419a-c000
-rwxr-xr-x   3 itv008542 supergroup    3184150 2024-02-08 02:03 /user/itv008542/warehouse/itv008542_lending_club.db/customers_loan_t/part-00001-7f3c5175-85b0-4f6b-8115-6e7e5807419a-c000
-rwxr-xr-x   3 itv008542 supergroup    3180029 2024-02-08 02:03 /user/itv008542/warehouse/itv008542_lending_club.db/customers_loan_t/part-00002-7f3c5175-85b0-4f6b-8115-6e7e5807419a-c000
-rwxr-xr-x   3 itv008542 supergroup    3147046 2024-02-08 02:03 /user/itv008542/warehouse/itv008542_lending_club.db/customers_loan_t/part-00003-7f3c5175-85b0-4f6b-8115-6e7e5807419a-c000
-rwxr-xr-x   3 itv008542 supergroup    3122171 2024-02-08 02:03 /user/itv008542/warehouse/itv008542_lending_club.db/customers_loan_t/part-00004-7f3c5175-85b0-4f6b-8115-6e7e5807419a-c000
-rwxr-xr-x   3 itv008542 supergroup    3169552 2024-02

## Identifying the Bad Data - Final cleaning


## Requirement 4

- Calculate the loan score
- Higher the loan score, higher the chances of getting your loan approved, and vice versa
- 3 Major criteria to calculate loan score -- loan repayment history (last payment, total payment received) - only 20% weight -- loan defaulters history (delinq-delinquet 2 yrs, pub_rec, ub_rec_bankruptcies, inq_last_6mnths) - only 45 % -- financial health data (home_ownership, loan_status, funded amount, grade pts-) - only 35%

if our credit limit is 40k, and you just take 2 k loan then this is good.

customers - home_ownership, grade, high credit limit

loans - monthly installment, loan_status, funded amount

loans_repayments - last payment, toal payment received

loan_def_delinq - delinq-delinquet 2 yrs

loan_def_detail_record_enq - pub_rec, ub_rec_bankruptcies, inq_last_6mnths

customers - member id should be unique loan_def_delinq - member id unique loan_def_detail_record_enq - member id unique

Bad data should be sent to upstream team, and ask what is valid record


In [0]:
# Checking number of records per customer
spark.sql("""select member_id, count(*) as total from itv008542_lending_club.customers
group by member_id order by total desc """)

member_id,total
e4c167053d5418230...,5
ad8e5d384dae17e06...,4
76b577467eda5bdbc...,4
3f87585a20f702838...,4
819453be77718d747...,3
c92062bb371842b3d...,3
035bf3d8288d803bd...,3
53789bea7edc660ed...,3
291ca1b09ef11ca3e...,3
5d52e7773cb0efff3...,3


In [0]:
spark.sql("""select * from itv008542_lending_club.customers 
where member_id like 'e4c167053d5418230%'""")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,171165.0,Individual,,,2024-02-07 02:40:...
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,129833.0,Individual,,,2024-02-07 02:40:...
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,207300.0,Individual,,,2024-02-07 02:40:...
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,110907.0,Individual,,,2024-02-07 02:40:...
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,138780.0,Individual,,,2024-02-07 02:40:...


In the above data, we can see that for a single member there are different records. As a developer we would not know which record to keep, so we need to compile such records and send it to our clients/upstream teams for further analysis

In [0]:
spark.sql("use itv008542_lending_club")
spark.sql("show tables").show(truncate=False)

+----------------------+-------------------------------+-----------+
|database              |tableName                      |isTemporary|
+----------------------+-------------------------------+-----------+
|itv008542_lending_club|customers                      |false      |
|itv008542_lending_club|customers_loan_t               |false      |
|itv008542_lending_club|customers_loan_v               |false      |
|itv008542_lending_club|loans                          |false      |
|itv008542_lending_club|loans_defaulters_delinq        |false      |
|itv008542_lending_club|loans_defaulters_detail_rec_enq|false      |
|itv008542_lending_club|loans_repayments               |false      |
+----------------------+-------------------------------+-----------+



In [0]:
# Checking number of records per customer
spark.sql("""select member_id, count(*) as total from itv008542_lending_club.loans_defaulters_delinq
group by member_id order by total desc """)

member_id,total
1f392553105eebc55...,3
6f88761291d14b65e...,3
41d36a5e709d6218a...,3
ee2d0dd6ad9e048b8...,3
793b618a7de10f813...,3
bca141343d9405a9a...,3
9240fa4e744ff0846...,3
b67f4aa39e82954f2...,3
e4c167053d5418230...,3
f99c6e9cfe3a7a2d2...,2


In [0]:
spark.sql("""select * from itv008542_lending_club.loans_defaulters_delinq
where member_id like 'e4c167053d5418230%'""")

member_id,delinq_2yrs,delinq_amnt,mths_since_last_delinq
e4c167053d5418230...,3,0.0,13
e4c167053d5418230...,1,0.0,16
e4c167053d5418230...,1,0.0,37


In [0]:
spark.sql("""select member_id, count(*) as total from itv008542_lending_club.loans_defaulters_detail_rec_enq
group by member_id order by total desc """)

member_id,total
e3b0c44298fc1c149...,33
e4c167053d5418230...,5
3f87585a20f702838...,4
ad8e5d384dae17e06...,4
76b577467eda5bdbc...,4
3ae415acd6bbfaac1...,3
22593a1870543b2db...,3
035bf3d8288d803bd...,3
066ddaa64bee66dff...,3
291ca1b09ef11ca3e...,3


In [0]:
spark.sql("""select * from itv008542_lending_club.loans_defaulters_detail_rec_enq
where member_id like 'e4c167053d5418230%'""")

member_id,pub_rec,pub_rec_bankruptcies,inq_last_6mths
e4c167053d5418230...,0,0,0
e4c167053d5418230...,0,0,0
e4c167053d5418230...,0,0,3
e4c167053d5418230...,0,0,2
e4c167053d5418230...,0,0,1


In [0]:
bad_data_customer_df = spark.sql("""select member_id from(select member_id, count(*)
as total from itv008542_lending_club.customers group by member_id having total > 1)""")

In [0]:
bad_data_customer_df.count()

3157

In [0]:
bad_data_customer_df

member_id
761b2f1e61999e14e...
4231a55d0199c619a...
f99c6e9cfe3a7a2d2...
a53e2f488d2d76a30...
61be6498c93cadf89...
5130d0087970e032e...
fc0e468bff11ac7c3...
f284044b881f218c0...
01b2223757c3b62e7...
675151e58a628e87b...


In [0]:
bad_data_loans_defaulters_delinq_df = spark.sql("""select member_id from(select member_id, count(*)
as total from itv008542_lending_club.loans_defaulters_delinq        
group by member_id having total > 1)""")

In [0]:
bad_data_loans_defaulters_delinq_df.count()

939

In [0]:
bad_data_loans_defaulters_delinq_df

member_id
cd1fdca829c443fa7...
f99c6e9cfe3a7a2d2...
a53e2f488d2d76a30...
d4782ddad8591f44d...
7115ace193e13d8f3...
cbede54df344cdb94...
675151e58a628e87b...
22593a1870543b2db...
3b94fbab00b7a0ec4...
0fa3654dfc604c1d4...


In [0]:
bad_data_loans_defaulters_detail_rec_enq_df = spark.sql("""select member_id from(select member_id, count(*)
as total from itv008542_lending_club.loans_defaulters_detail_rec_enq
group by member_id having total > 1)""")

In [0]:
bad_data_loans_defaulters_detail_rec_enq_df.count()

3189

In [0]:
bad_data_loans_defaulters_detail_rec_enq_df

member_id
a2af7506825a7dcff...
7115ace193e13d8f3...
d4782ddad8591f44d...
f99c6e9cfe3a7a2d2...
fc0e468bff11ac7c3...
2bae2e4dd6d5f2b21...
61be6498c93cadf89...
5130d0087970e032e...
761b2f1e61999e14e...
cbede54df344cdb94...


#### Store all bad records in a folder to review later on

In [0]:
! hadoop fs -mkdir /user/itv008542/lendingclubproject/bad

In [0]:
bad_data_customer_df.repartition(1).write \
.format("csv") \
.option("header", True) \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/bad/bad_data_customers") \
.save()

In [0]:
bad_data_loans_defaulters_delinq_df.repartition(1).write \
.format("csv") \
.option("header", True) \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/bad/bad_data_loans_defaulters_delinq") \
.save()

In [0]:
bad_data_loans_defaulters_detail_rec_enq_df.repartition(1).write \
.format("csv") \
.option("header", True) \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/bad/bad_data_loans_defaulters_detail_rec_enq") \
.save()

In [0]:
! hadoop fs -ls  /user/itv008542/lendingclubproject/bad

Found 4 items
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 03:51 /user/itv008542/lendingclubproject/bad/bad_customer_data_final
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 03:42 /user/itv008542/lendingclubproject/bad/bad_data_customers
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 03:43 /user/itv008542/lendingclubproject/bad/bad_data_loans_defaulters_delinq
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 03:43 /user/itv008542/lendingclubproject/bad/bad_data_loans_defaulters_detail_rec_enq


I want to create a consolidated files which have all member ids uniquely from above three

In [0]:
# union followed by distinct
bad_customer_data_union_df = bad_data_customer_df.select("member_id") \
.union(bad_data_loans_defaulters_delinq_df.select("member_id")) \
.union(bad_data_loans_defaulters_detail_rec_enq_df.select("member_id"))

In [0]:
bad_customer_data_final_df = bad_customer_data_union_df.distinct()

In [0]:
bad_customer_data_final_df.count()

3189

In [0]:
bad_customer_data_final_df.repartition(1).write \
.format("csv") \
.option("header", True) \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/bad/bad_customer_data_final") \
.save()

In [0]:
! hadoop fs -ls  /user/itv008542/lendingclubproject/bad/bad_customer_data_final

Found 2 items
-rw-r--r--   3 itv008542 supergroup          0 2024-02-08 03:51 /user/itv008542/lendingclubproject/bad/bad_customer_data_final/_SUCCESS
-rw-r--r--   3 itv008542 supergroup     207295 2024-02-08 03:51 /user/itv008542/lendingclubproject/bad/bad_customer_data_final/part-00000-a1961ffa-663c-473e-a26e-891f4b6c2ede-c000.csv


In [0]:
! hadoop fs -head  /user/itv008542/lendingclubproject/bad/bad_customer_data_final/part-00000-a1961ffa-663c-473e-a26e-891f4b6c2ede-c000.csv

member_id
f74ca3285851a8c5a077c1af2a4d184d2f55bf5861ec92914be084df1f889e1b
b33bff4d9c078c25dcf5aaf7f69d6c628d7fe8de5ad4c6f16f6040fd7071e032
793681798e3c70fb0261cf52c3f7d47105e33a9a491b01a622e218682e08dade
cf60ca5d476cabca8c071b617f59e111ab47be6bb1d86d38f3259b3383075c10
cc36e715f23147ae01f232b6bc1df49632e229c8787f8842995eaa9724753449
a5a8f249f10fc7d823dd529bae00fa18a0433102882886a20be2d0921d18af0c
1c4c7f1541769ac6c216d8c75703c42a934ef8b48fc1f58c260d69fb7fd52263
1396169b283852f1fd805614582cfa4e29df38ce5c7185be7beaac112f27d91b
2e6ad5a0aaaf31adb79162f3288950973e06253f26e5824b0d98b4d2a0d20078
f6776ae4f355c1aa8a3a8df574b077657bb21fba135ce795d0976473750ad69e
f876bc5308177f391f1c4bd7bef905ceae2a7fd3b49f34f547ca6a5d8d854232
149029765aa6c93db4781614061d1150376134b46c976be7d5ac187a93121645
11099eff1a2ced61feb243f45becc12a9f0dcef8ba6bdfb9c15e799e80335acf
d7f7fcf5ff64fff2dce80dd666747652001f46f91266a8fae505125bc493aafc
983690808b2ed9a75ba15c345fd9d0580e93bf7947ad8d9404b82cb19b02324a
b5ded5638e54e16

In [0]:
! hadoop fs -cat  /user/itv008542/lendingclubproject/bad/bad_customer_data_final/part-00000-a1961ffa-663c-473e-a26e-891f4b6c2ede-c000.csv | wc -l

3190


## Segregating the Identified Bad Data from the Normal Data

For our future analysis we will remove records of all those customers who fall under bad data category for now


In [0]:
bad_customer_data_final_df.createOrReplaceTempView("bad_data_customer")

In [0]:
customers_df = spark.sql("""select * from itv008542_lending_club.customers
where member_id NOT IN (select member_id from bad_data_customer)
""")

In [0]:
! hadoop fs -mkdir /user/itv008542/lendingclubproject/raw/cleaned_new

mkdir: `/user/itv008542/lendingclubproject/raw/cleaned_new': File exists


In [0]:
! hadoop fs -ls /user/itv008542/lendingclubproject/raw

Found 6 items
drwxr-xr-x   - itv008542 supergroup          0 2024-02-07 11:57 /user/itv008542/lendingclubproject/raw/cleaned
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 04:03 /user/itv008542/lendingclubproject/raw/cleaned_new
drwxr-xr-x   - itv008542 supergroup          0 2024-02-06 12:12 /user/itv008542/lendingclubproject/raw/customers_data_csv
drwxr-xr-x   - itv008542 supergroup          0 2024-02-06 12:19 /user/itv008542/lendingclubproject/raw/loans_data_csv
drwxr-xr-x   - itv008542 supergroup          0 2024-02-06 12:24 /user/itv008542/lendingclubproject/raw/loans_defaulters_csv
drwxr-xr-x   - itv008542 supergroup          0 2024-02-06 12:22 /user/itv008542/lendingclubproject/raw/loans_repayments_csv


In [0]:
customers_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/raw/cleaned_new/customers_parquet") \
.save()

In [0]:
loans_defaulters_delinq_df = spark.sql("""select * from itv008542_lending_club.loans_defaulters_delinq
where member_id NOT IN (select member_id from bad_data_customer)
""")

In [0]:
loans_defaulters_delinq_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/raw/cleaned_new/loans_defaulters_delinq_parquet") \
.save()

In [0]:
loans_defaulters_detail_rec_enq_df = spark.sql("""select * from itv008542_lending_club.loans_defaulters_detail_rec_enq
where member_id NOT IN (select member_id from bad_data_customer)
""")

In [0]:
loans_defaulters_detail_rec_enq_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/raw/cleaned_new/loans_defaulters_detail_rec_enq_parquet") \
.save()

In [0]:
! hadoop fs -ls /user/itv008542/lendingclubproject/raw/cleaned_new

Found 3 items
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 04:09 /user/itv008542/lendingclubproject/raw/cleaned_new/customers_parquet
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 04:11 /user/itv008542/lendingclubproject/raw/cleaned_new/loans_defaulters_delinq_parquet
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 04:11 /user/itv008542/lendingclubproject/raw/cleaned_new/loans_defaulters_detail_rec_enq_parquet


In [0]:
spark.sql("""
create EXTERNAL TABLE itv008542_lending_club.customers_new(member_id string, emp_title string, emp_length int, home_ownership string, 
annual_income float, address_state string, address_zipcode string, address_country string, grade string, 
sub_grade string, verification_status string, total_high_credit_limit float, application_type string, 
join_annual_income float, verification_status_joint string, ingest_date timestamp)
stored as parquet
LOCATION '/user/itv008542/lendingclubproject/raw/cleaned_new/customers_parquet'
""")

In [0]:
spark.sql("""
create EXTERNAL TABLE itv008542_lending_club.loans_defaulters_delinq_new(member_id string,delinq_2yrs integer, delinq_amnt float, mths_since_last_delinq integer)
stored as parquet
LOCATION '/user/itv008542/lendingclubproject/raw/cleaned_new/loans_defaulters_delinq_parquet'
""")

In [0]:
spark.sql("""
create EXTERNAL TABLE itv008542_lending_club.loans_defaulters_detail_rec_enq_new(member_id string, pub_rec integer, pub_rec_bankruptcies integer, inq_last_6mths integer)
stored as parquet
LOCATION '/user/itv008542/lendingclubproject/raw/cleaned_new/loans_defaulters_detail_rec_enq_parquet'
""")

In [0]:
spark.sql("""select member_id, count(*) as total 
from itv008542_lending_club.customers_new
group by member_id order by total desc""")

member_id,total
ac1cc53aabeae5308...,1
aaae850209661c2ec...,1
dd4aef442f35c3774...,1
ffe8dde59ce73ba9f...,1
dcee5341a1a6b429c...,1
1827d16989b2eb01e...,1
9995599d198128f2d...,1
2bd3d9880a18beb9e...,1
1b9d1275c2f09143e...,1
c342d21c2e49fc97b...,1


# Processing and Storing the Final Loan Score 

## Loan Score Calculation

### Requirement 4

- Higher the loan score, higher the chances of getting your loan approved, and vice versa
- 3 Major criteria to calculate loan score

  - loan repayment history (last payment, total payment received) - only 20% weight

  - loan defaulters history (delinq-delinquet 2 yrs, pub_rec, ub_rec_bankruptcies, inq_last_6mnths) - only 45%

  - financial health data (home_ownership, loan_status, funded amount, grade pts-) - only 35%



## Associating points to the grades in order to calculate the Loan Score

In [0]:
spark.conf.set("spark.sql.unacceptable_rated_pts", 0)
spark.conf.set("spark.sql.very_bad_rated_pts", 100)
spark.conf.set("spark.sql.bad_rated_pts", 250)
spark.conf.set("spark.sql.good_rated_pts", 500)
spark.conf.set("spark.sql.very_good_rated_pts", 650)
spark.conf.set("spark.sql.excellent_rated_pts", 800)

In [0]:
spark.conf.set("spark.sql.unacceptable_grade_pts", 750)
spark.conf.set("spark.sql.very_bad_grade_pts", 1000)
spark.conf.set("spark.sql.bad_grade_pts", 1500)
spark.conf.set("spark.sql.good_grade_pts", 2000)
spark.conf.set("spark.sql.very_good_grade_pts", 2500)

## The tables required to calculate the Loan Score

customers_new 

loans

loans_repayments

loans_defaulters_delinq_new

loans_defaulters_detail_red_enq_new

## Loan Score Calculation Criteria 1: Payment History(ph)

- last payment amount < monthly installment * 0.5, then very_bad_rates_pts
- last payment amount >= monthly installment * 0.5 and < monthly installment, then bad_rates_pts
- last payment amount = monthly installment, then good_rates_pts


In [0]:
bad_customer_data_final_df = spark.read \
.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("/user/itv008542/lendingclubproject/bad/bad_customer_data_final")

In [0]:
bad_customer_data_final_df.createOrReplaceTempView("bad_data_customer")

In [0]:
ph_df = spark.sql("""select c.member_id, \
   case \
   when p.last_payment_amount < (c.monthly_installment * 0.5) then ${spark.sql.very_bad_rated_pts} \
   when p.last_payment_amount >= (c.monthly_installment * 0.5) and p.last_payment_amount < c.monthly_installment then ${spark.sql.very_bad_rated_pts} \
   when (p.last_payment_amount = (c.monthly_installment)) then ${spark.sql.good_rated_pts} \
   when p.last_payment_amount > (c.monthly_installment) and p.last_payment_amount <= (c.monthly_installment * 1.50) then ${spark.sql.very_good_rated_pts} \
   when p.last_payment_amount > (c.monthly_installment * 1.50) then ${spark.sql.excellent_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   end as last_payment_pts, \
   case \
   when p.total_payment_received >= (c.funded_amount * 0.50) then ${spark.sql.very_good_rated_pts} \
   when p.total_payment_received < (c.funded_amount * 0.50) and p.total_payment_received > 0 then ${spark.sql.good_rated_pts} \
   when p.total_payment_received = 0 or (p.total_payment_received) is null then ${spark.sql.unacceptable_rated_pts} \
   end as total_payment_pts \
from itv008542_lending_club.loans_repayments p \
inner join itv005857_lending_club.loans c on c.loan_id = p.loan_id 
where member_id NOT IN (select member_id from bad_data_customer)""")

In [0]:
ph_df.createOrReplaceTempView("ph_pts")

In [0]:
spark.sql("select * from ph_pts")

member_id,last_payment_pts,total_payment_pts
dcec9334e70f1cc95...,800,650
fc58ca61f51f9dcac...,500,650
2fb62a6ca51063b11...,500,650
488268a5531951622...,800,650
ade6026208e48f5f9...,500,650
7c8b0ca6acddfaeb1...,800,650
a707b7fe7c38bad65...,800,650
1df639cddea30c288...,800,650
22d67005e12d8726d...,500,650
009cf312bd46551b4...,500,650


## Loan Score Calculation Criteria 2: Loan Defaulters History(ldh)

In [0]:
spark.sql("use itv008542_lending_club")
spark.sql("show tables").show(truncate=False)

+----------------------+-----------------------------------+-----------+
|database              |tableName                          |isTemporary|
+----------------------+-----------------------------------+-----------+
|itv008542_lending_club|customers                          |false      |
|itv008542_lending_club|customers_loan_t                   |false      |
|itv008542_lending_club|customers_loan_v                   |false      |
|itv008542_lending_club|customers_new                      |false      |
|itv008542_lending_club|loans                              |false      |
|itv008542_lending_club|loans_defaulters_delinq            |false      |
|itv008542_lending_club|loans_defaulters_delinq_new        |false      |
|itv008542_lending_club|loans_defaulters_detail_rec_enq    |false      |
|itv008542_lending_club|loans_defaulters_detail_rec_enq_new|false      |
|itv008542_lending_club|loans_repayments                   |false      |
|                      |bad_data_customer          

In [0]:
ldh_ph_df = spark.sql(
    "select p.*, \
    CASE \
    WHEN d.delinq_2yrs = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN d.delinq_2yrs BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN d.delinq_2yrs BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN d.delinq_2yrs > 5 OR d.delinq_2yrs IS NULL THEN ${spark.sql.unacceptable_grade_pts} \
    END AS delinq_pts, \
    CASE \
    WHEN l.pub_rec = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.pub_rec BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.pub_rec BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.pub_rec > 5 OR l.pub_rec IS NULL THEN ${spark.sql.very_bad_rated_pts} \
    END AS public_records_pts, \
    CASE \
    WHEN l.pub_rec_bankruptcies = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.pub_rec_bankruptcies BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.pub_rec_bankruptcies BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.pub_rec_bankruptcies > 5 OR l.pub_rec_bankruptcies IS NULL THEN ${spark.sql.very_bad_rated_pts} \
    END as public_bankruptcies_pts, \
    CASE \
    WHEN l.inq_last_6mths = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.inq_last_6mths BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.inq_last_6mths BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.inq_last_6mths > 5 OR l.inq_last_6mths IS NULL THEN ${spark.sql.unacceptable_rated_pts} \
    END AS enq_pts \
    FROM itv008542_lending_club.loans_defaulters_detail_rec_enq_new l \
    INNER JOIN itv008542_lending_club.loans_defaulters_delinq_new d ON d.member_id = l.member_id  \
    INNER JOIN ph_pts p ON p.member_id = l.member_id where l.member_id NOT IN (select member_id from bad_data_customer)")

In [0]:
ldh_ph_df.createOrReplaceTempView("ldh_ph_pts")

In [0]:
spark.sql("select * from ldh_ph_pts")

member_id,last_payment_pts,total_payment_pts,delinq_pts,public_records_pts,public_bankruptcies_pts,enq_pts
000c8875b71a6b47c...,800,650,250,800,800,800
0012728d9f616bdf2...,500,500,800,800,800,800
00151ece27c7ca280...,800,650,800,800,800,250
003769d7f54c7859e...,500,500,250,800,800,800
0037bb910c0a758f5...,500,500,800,800,800,800
003e1e6cbd2920bbb...,500,650,250,250,250,800
004017b21bd4d6271...,100,650,750,800,800,800
005b4c3db3fce07dc...,500,650,250,250,800,250
00710707c563c2119...,800,650,250,800,800,800
007da79904f69970d...,800,650,250,800,800,800


## Loan Score Calculation Criteria 3: Financial health (fh)

In [0]:
fh_ldh_ph_df = spark.sql("select ldef.*, \
   CASE \
   WHEN LOWER(l.loan_status) LIKE '%fully paid%' THEN ${spark.sql.excellent_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%current%' THEN ${spark.sql.good_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%in grace period%' THEN ${spark.sql.bad_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%late (16-30 days)%' OR LOWER(l.loan_status) LIKE '%late (31-120 days)%' THEN ${spark.sql.very_bad_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%charged off%' THEN ${spark.sql.unacceptable_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   END AS loan_status_pts, \
   CASE \
   WHEN LOWER(a.home_ownership) LIKE '%own' THEN ${spark.sql.excellent_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%rent' THEN ${spark.sql.good_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%mortgage' THEN ${spark.sql.bad_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%any' OR LOWER(a.home_ownership) IS NULL THEN ${spark.sql.very_bad_rated_pts} \
   END AS home_pts, \
   CASE \
   WHEN l.funded_amount <= (a.total_high_credit_limit * 0.10) THEN ${spark.sql.excellent_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.10) AND l.funded_amount <= (a.total_high_credit_limit * 0.20) THEN ${spark.sql.very_good_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.20) AND l.funded_amount <= (a.total_high_credit_limit * 0.30) THEN ${spark.sql.good_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.30) AND l.funded_amount <= (a.total_high_credit_limit * 0.50) THEN ${spark.sql.bad_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.50) AND l.funded_amount <= (a.total_high_credit_limit * 0.70) THEN ${spark.sql.very_bad_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.70) THEN ${spark.sql.unacceptable_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   END AS credit_limit_pts, \
   CASE \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A1' THEN ${spark.sql.excellent_rated_pts} \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A2' THEN (${spark.sql.excellent_rated_pts} * 0.95) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A3' THEN (${spark.sql.excellent_rated_pts} * 0.90) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A4' THEN (${spark.sql.excellent_rated_pts} * 0.85) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A5' THEN (${spark.sql.excellent_rated_pts} * 0.80) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B1' THEN (${spark.sql.very_good_rated_pts}) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B2' THEN (${spark.sql.very_good_rated_pts} * 0.95) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B3' THEN (${spark.sql.very_good_rated_pts} * 0.90) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B4' THEN (${spark.sql.very_good_rated_pts} * 0.85) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B5' THEN (${spark.sql.very_good_rated_pts} * 0.80) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C1' THEN (${spark.sql.good_rated_pts}) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C2' THEN (${spark.sql.good_rated_pts} * 0.95) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C3' THEN (${spark.sql.good_rated_pts} * 0.90) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C4' THEN (${spark.sql.good_rated_pts} * 0.85) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C5' THEN (${spark.sql.good_rated_pts} * 0.80) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D1' THEN (${spark.sql.bad_rated_pts}) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D2' THEN (${spark.sql.bad_rated_pts} * 0.95) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D3' THEN (${spark.sql.bad_rated_pts} * 0.90) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D4' THEN (${spark.sql.bad_rated_pts} * 0.85) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D5' THEN (${spark.sql.bad_rated_pts} * 0.80) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E1' THEN (${spark.sql.very_bad_rated_pts}) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E2' THEN (${spark.sql.very_bad_rated_pts} * 0.95) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E3' THEN (${spark.sql.very_bad_rated_pts} * 0.90) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E4' THEN (${spark.sql.very_bad_rated_pts} * 0.85) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E5' THEN (${spark.sql.very_bad_rated_pts} * 0.80) \
   WHEN (a.grade) in ('F', 'G') THEN (${spark.sql.unacceptable_rated_pts}) \
   END AS grade_pts \
   FROM ldh_ph_pts ldef \
   INNER JOIN itv008542_lending_club.loans l ON ldef.member_id = l.member_id \
   INNER JOIN itv008542_lending_club.customers_new a ON a.member_id = ldef.member_id where ldef.member_id NOT IN (select member_id from bad_data_customer)") 

In [0]:
fh_ldh_ph_df.createOrReplaceTempView("fh_ldh_ph_pts")

In [0]:
spark.sql("select * from fh_ldh_ph_pts")

member_id,last_payment_pts,total_payment_pts,delinq_pts,public_records_pts,public_bankruptcies_pts,enq_pts,loan_status_pts,home_pts,credit_limit_pts,grade_pts
000c8875b71a6b47c...,800,650,250,800,800,800,800,250,800,680.0
0012728d9f616bdf2...,500,500,800,800,800,800,500,800,250,640.0
00151ece27c7ca280...,800,650,800,800,800,250,800,500,250,500.0
003769d7f54c7859e...,500,500,250,800,800,800,0,250,800,100.0
0037bb910c0a758f5...,500,500,800,800,800,800,500,500,800,450.0
003e1e6cbd2920bbb...,500,650,250,250,250,800,500,250,800,640.0
004017b21bd4d6271...,100,650,750,800,800,800,800,250,800,500.0
005b4c3db3fce07dc...,500,650,250,250,800,250,500,250,500,520.0
00710707c563c2119...,800,650,250,800,800,800,800,250,800,520.0
007da79904f69970d...,800,650,250,800,800,800,800,500,250,800.0


#### Final loan score calculation by considering all the 3 criterias with the following %**

#### 1. Payment History = 20%
#### 2. Loan Defaults = 45%
#### 3. Financial Health = 35%

In [0]:
loan_score = spark.sql("SELECT member_id, \
((last_payment_pts+total_payment_pts)*0.20) as payment_history_pts, \
((delinq_pts + public_records_pts + public_bankruptcies_pts + enq_pts) * 0.45) as defaulters_history_pts, \
((loan_status_pts + home_pts + credit_limit_pts + grade_pts)*0.35) as financial_health_pts \
FROM fh_ldh_ph_pts")

In [0]:
loan_score

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts
000c8875b71a6b47c...,290.0,1192.5,885.5
0012728d9f616bdf2...,200.0,1440.0,766.5
00151ece27c7ca280...,290.0,1192.5,717.5
003769d7f54c7859e...,200.0,1192.5,402.5
0037bb910c0a758f5...,200.0,1440.0,787.5
003e1e6cbd2920bbb...,230.0,697.5,766.5
004017b21bd4d6271...,150.0,1417.5,822.5
005b4c3db3fce07dc...,230.0,697.5,619.5
00710707c563c2119...,290.0,1192.5,829.5
007da79904f69970d...,290.0,1192.5,822.5


In [0]:
final_loan_score = loan_score.withColumn('loan_score', loan_score.payment_history_pts + loan_score.defaulters_history_pts + loan_score.financial_health_pts)

In [0]:
final_loan_score.createOrReplaceTempView("loan_score_eval")

In [0]:
spark.sql("select * from loan_score_eval")

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts,loan_score
000c8875b71a6b47c...,290.0,1192.5,885.5,2368.0
0012728d9f616bdf2...,200.0,1440.0,766.5,2406.5
00151ece27c7ca280...,290.0,1192.5,717.5,2200.0
003769d7f54c7859e...,200.0,1192.5,402.5,1795.0
0037bb910c0a758f5...,200.0,1440.0,787.5,2427.5
003e1e6cbd2920bbb...,230.0,697.5,766.5,1694.0
004017b21bd4d6271...,150.0,1417.5,822.5,2390.0
005b4c3db3fce07dc...,230.0,697.5,619.5,1547.0
00710707c563c2119...,290.0,1192.5,829.5,2312.0
007da79904f69970d...,290.0,1192.5,822.5,2305.0


In [0]:
loan_score_final = spark.sql("select ls.*, \
case \
WHEN loan_score > ${spark.sql.very_good_grade_pts} THEN 'A' \
WHEN loan_score <= ${spark.sql.very_good_grade_pts} AND loan_score > ${spark.sql.good_grade_pts} THEN 'B' \
WHEN loan_score <= ${spark.sql.good_grade_pts} AND loan_score > ${spark.sql.bad_grade_pts} THEN 'C' \
WHEN loan_score <= ${spark.sql.bad_grade_pts} AND loan_score  > ${spark.sql.very_bad_grade_pts} THEN 'D' \
WHEN loan_score <= ${spark.sql.very_bad_grade_pts} AND loan_score > ${spark.sql.unacceptable_grade_pts} THEN 'E'  \
WHEN loan_score <= ${spark.sql.unacceptable_grade_pts} THEN 'F' \
end as loan_final_grade \
from loan_score_eval ls")

In [0]:
loan_score_final.createOrReplaceTempView("loan_final_table")

In [0]:
spark.sql("select * from loan_final_table where loan_final_grade in ('C')")

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts,loan_score,loan_final_grade
003769d7f54c7859e...,200.0,1192.5,402.5,1795.0,C
003e1e6cbd2920bbb...,230.0,697.5,766.5,1694.0,C
005b4c3db3fce07dc...,230.0,697.5,619.5,1547.0,C
00fc8144cb210ba8c...,230.0,697.5,717.5,1645.0,C
0134d807fbc9e8ff8...,230.0,1192.5,553.0,1975.5,C
013630bb77d0f3c6a...,290.0,1192.5,399.0,1881.5,C
017ce564dc0d6f975...,200.0,945.0,591.5,1736.5,C
01b48af926b98a718...,200.0,1192.5,573.125,1965.625,C
01d0c48835e969a01...,200.0,1192.5,262.5,1655.0,C
024f077b5b4e47ea0...,230.0,1192.5,437.5,1860.0,C


In [0]:
spark.sql("select count(*) from loan_final_table")

count(1)
1102587


In [0]:
! hadoop fs -mkdir /user/itv008542/lendingclubproject/processed

In [0]:
loan_score_final.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv008542/lendingclubproject/processed/loan_score") \
.save()

In [0]:
! hadoop fs -ls /user/itv008542/lendingclubproject/processed

Found 1 items
drwxr-xr-x   - itv008542 supergroup          0 2024-02-08 11:26 /user/itv008542/lendingclubproject/processed/loan_score


In [0]:
! hadoop fs -ls /user/itv008542/lendingclubproject/processed/loan_score

Found 201 items
-rw-r--r--   3 itv008542 supergroup          0 2024-02-08 11:26 /user/itv008542/lendingclubproject/processed/loan_score/_SUCCESS
-rw-r--r--   3 itv008542 supergroup     370133 2024-02-08 11:25 /user/itv008542/lendingclubproject/processed/loan_score/part-00000-a16902a1-907a-4b1b-98e5-23765d5d39e2-c000.snappy.parquet
-rw-r--r--   3 itv008542 supergroup     385474 2024-02-08 11:25 /user/itv008542/lendingclubproject/processed/loan_score/part-00001-a16902a1-907a-4b1b-98e5-23765d5d39e2-c000.snappy.parquet
-rw-r--r--   3 itv008542 supergroup     378579 2024-02-08 11:25 /user/itv008542/lendingclubproject/processed/loan_score/part-00002-a16902a1-907a-4b1b-98e5-23765d5d39e2-c000.snappy.parquet
-rw-r--r--   3 itv008542 supergroup     380626 2024-02-08 11:25 /user/itv008542/lendingclubproject/processed/loan_score/part-00003-a16902a1-907a-4b1b-98e5-23765d5d39e2-c000.snappy.parquet
-rw-r--r--   3 itv008542 supergroup     381041 2024-02-08 11:25 /user/itv008542/lendingclubproject/proc

In [0]:
! hadoop fs -head /user/itv008542/lendingclubproject/processed/loan_score/part-00199-a16902a1-907a-4b1b-98e5-23765d5d39e2-c000.snappy.parquet

D�<f2e87e83b4a03143cfff8803d8311467e0022aeff961f240016ad2bb02822	D�>74b25482c536ebf11d5c9d5445b47cf3307691543d1207a6a54b52538b8821@%�<823c9c882e665800c6c6b65c77f9ca7c2ab31c49330a1a6a15c68e82e86c4M �>9013f9a4037290d1cfba2f1bd06c895c0149504b67389e8ca69a70c10d3476@�t934bb0748e79df905c0663e0341de1�Xc04aac3d2e95babe95e437b���5377508f21df12dcb3576c5233d39938aa465fec6�<19276a2321ac6a50)T�>9b7cfa56812914728195c5b56bae9a7bc0aae6824c044550c505230d4e668ddf0f012bf16656c24748c69c1469c506ac887d719ca1a181effb85c)Tx55de8b5dd72029be9d555c0572423ba�h6244f2ce5e96c6eddd119bcad49

In [0]:
spark.stop()