In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName("loan_score") \
.master("spark://10.208.36.84:7077") \
.config("spark.hadoop.defaultFS","hdfs://10.208.36.84:9000") \
.config("spark.sql.warehouse.dir","/home/hadoop/spark_workspace/shared_folder/tushar/Tables_data")\
.config("spark.executor.cores",2) \
.config("spark.executor.memory","4g") \
.config("spark.cores.max","8") \
.enableHiveSupport() \
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/21 14:19:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.conf.set("spark.sql.unacceptable_rated_pts", 0)
spark.conf.set("spark.sql.very_bad_rated_pts", 100)
spark.conf.set("spark.sql.bad_rated_pts", 250)
spark.conf.set("spark.sql.good_rated_pts", 500)
spark.conf.set("spark.sql.very_good_rated_pts", 650)
spark.conf.set("spark.sql.excellent_rated_pts", 800)

In [4]:
spark.conf.set("spark.sql.unacceptable_grade_pts", 750)
spark.conf.set("spark.sql.very_bad_grade_pts", 1000)
spark.conf.set("spark.sql.bad_grade_pts", 1500)
spark.conf.set("spark.sql.good_grade_pts", 2000)
spark.conf.set("spark.sql.very_good_grade_pts", 2500)

In [5]:
bad_df = spark.read.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/home/tushar/Documents/project/bad_data/consolidated")

                                                                                

In [6]:
bad_df.createOrReplaceTempView("bad_data_customer")

In [7]:
spark.sql("use project_trendy")

DataFrame[]

<ul> Loan Calculation is based on three things:
<li> Loan Repayment History</li>
<li>Loan Defaulters History</li>
<li> Financial Health Data</li></ul>

In [8]:
spark.sql("select * from project_trendy.loans limit 3").show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------+--------------------+-----------+-------------+---------------+-------------+-------------------+----------+-----------+------------------+--------------------+-----------+
| loan_id|           member_id|loan_amount|funded_amount|loan_term_years|interest_rate|monthly_installment|issue_date|loan_status|      loan_purpose|          loan_title|ingest_date|
+--------+--------------------+-----------+-------------+---------------+-------------+-------------------+----------+-----------+------------------+--------------------+-----------+
|57267443|48e96acef66a1b3aa...|    15000.0|      15000.0|              3|        19.99|               null|  Aug-2015| Fully Paid|debt_consolidation|  Debt consolidation|       null|
|57186326|a26f0e7701a6ab87d...|     6000.0|       6000.0|              3|         6.89|               null|  Aug-2015| Fully Paid|debt_consolidation|  Debt consolidation|       null|
|57044889|eeb80dce9907d9aa4...|     9625.0|       9625.0|              3|         8.1

                                                                                

In [9]:
ph_df = spark.sql("select c.member_id, \
   case \
   when p.last_payment_amount < (c.monthly_installment * 0.5) then ${spark.sql.very_bad_rated_pts} \
   when p.last_payment_amount >= (c.monthly_installment * 0.5) and p.last_payment_amount < c.monthly_installment then ${spark.sql.very_bad_rated_pts} \
   when p.last_payment_amount = (c.monthly_installment) then ${spark.sql.good_rated_pts} \
   when p.last_payment_amount > (c.monthly_installment) and p.last_payment_amount <= (c.monthly_installment * 1.50) then ${spark.sql.very_good_rated_pts} \
   when p.last_payment_amount > (c.monthly_installment * 1.50) then ${spark.sql.excellent_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   end as last_payment_pts, \
   case \
   when p.total_payment_received >= (c.funded_amount * 0.50) then ${spark.sql.very_good_rated_pts} \
   when p.total_payment_received < (c.funded_amount * 0.50) and p.total_payment_received > 0 then ${spark.sql.good_rated_pts} \
   when p.total_payment_received = 0 or (p.total_payment_received) is null then ${spark.sql.unacceptable_rated_pts} \
   end as total_payment_pts \
from project_trendy.loans_repayments as p \
inner join project_trendy.loans as c on c.loan_id = p.loan_id ")

In [10]:
ph_df.createOrReplaceTempView("loan_score")

In [11]:
spark.sql("select * from loan_score").show()

                                                                                

+--------------------+----------------+-----------------+
|           member_id|last_payment_pts|total_payment_pts|
+--------------------+----------------+-----------------+
|48e96acef66a1b3aa...|               0|              650|
|a26f0e7701a6ab87d...|               0|              650|
|eeb80dce9907d9aa4...|               0|              650|
|7402edb6f0ddc36f9...|               0|              650|
|e01ab149a9a148d65...|               0|              650|
|40f90b73b901d4409...|               0|              650|
|96600ee521db9560d...|               0|              500|
|603d52eee7a658910...|               0|              650|
|985f379c5a65583bf...|               0|              650|
|005bb3c40d7cf116d...|               0|              650|
|c5816e78c25d6b6ad...|               0|              650|
|8e582a7286c763c94...|               0|              650|
|dba24d2a7e49b160f...|               0|              650|
|9f01942ec481d6a7c...|               0|              500|
|17be387404072

In [12]:
ph_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- last_payment_pts: integer (nullable = false)
 |-- total_payment_pts: integer (nullable = true)



In [13]:
ph_df.show()

+--------------------+----------------+-----------------+
|           member_id|last_payment_pts|total_payment_pts|
+--------------------+----------------+-----------------+
|48e96acef66a1b3aa...|               0|              650|
|a26f0e7701a6ab87d...|               0|              650|
|eeb80dce9907d9aa4...|               0|              650|
|7402edb6f0ddc36f9...|               0|              650|
|e01ab149a9a148d65...|               0|              650|
|40f90b73b901d4409...|               0|              650|
|96600ee521db9560d...|               0|              500|
|603d52eee7a658910...|               0|              650|
|985f379c5a65583bf...|               0|              650|
|005bb3c40d7cf116d...|               0|              650|
|c5816e78c25d6b6ad...|               0|              650|
|8e582a7286c763c94...|               0|              650|
|dba24d2a7e49b160f...|               0|              650|
|9f01942ec481d6a7c...|               0|              500|
|17be387404072

                                                                                

In [16]:
spark.sql("use project_trendy")

DataFrame[]

In [21]:
ldh_ph_df = spark.sql(
    "select p.*, \
    CASE \
    WHEN d.delinq_2yrs = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN d.delinq_2yrs BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN d.delinq_2yrs BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN d.delinq_2yrs > 5 OR d.delinq_2yrs IS NULL THEN ${spark.sql.unacceptable_grade_pts} \
    END AS delinq_pts, \
    CASE \
    WHEN l.pub_rec = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.pub_rec BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.pub_rec BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.pub_rec > 5 OR l.pub_rec IS NULL THEN ${spark.sql.very_bad_rated_pts} \
    END AS public_records_pts, \
    CASE \
    WHEN l.pub_rec_bankruptcies = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.pub_rec_bankruptcies BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.pub_rec_bankruptcies BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.pub_rec_bankruptcies > 5 OR l.pub_rec_bankruptcies IS NULL THEN ${spark.sql.very_bad_rated_pts} \
    END as public_bankruptcies_pts, \
    CASE \
    WHEN l.inq_last_6mths = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.inq_last_6mths BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.inq_last_6mths BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.inq_last_6mths > 5 OR l.inq_last_6mths IS NULL THEN ${spark.sql.unacceptable_rated_pts} \
    END AS enq_pts \
    FROM project_trendy.new_loans_defaulters_detail_rec_enq l \
    INNER JOIN project_trendy.loans_defaulters_delinq d ON d.member_id = l.member_id  \
    INNER JOIN loan_score p ON p.member_id = l.member_id where l.member_id NOT IN (select member_id from bad_data_customer)")

In [22]:
ldh_ph_df.show()

[Stage 15:>                                                         (0 + 1) / 1]

+--------------------+----------------+-----------------+----------+------------------+-----------------------+-------+
|           member_id|last_payment_pts|total_payment_pts|delinq_pts|public_records_pts|public_bankruptcies_pts|enq_pts|
+--------------------+----------------+-----------------+----------+------------------+-----------------------+-------+
|000152208b3e77b5b...|               0|              650|       800|               800|                    800|    800|
|0001cfa200f7480b9...|               0|              650|       250|               250|                    250|    250|
|00026136ec721b938...|               0|              500|       250|               800|                    800|    250|
|0004656412a18b9c1...|               0|              650|       250|               800|                    800|    800|
|0004f290201c29d93...|               0|              500|       800|               800|                    800|    800|
|0005c3b2e23d18d3a...|               0| 

                                                                                

In [23]:
ldh_ph_df.createOrReplaceTempView("ldh_ph_pts")

In [28]:
spark.sql("show tables").show()

+--------------+--------------------+-----------+
|     namespace|           tableName|isTemporary|
+--------------+--------------------+-----------+
|project_trendy|           customers|      false|
|project_trendy|      customers_loan|      false|
|project_trendy|    customers_loan_v|      false|
|project_trendy|               loans|      false|
|project_trendy|loans_defaulters_...|      false|
|project_trendy|loans_defaulters_...|      false|
|project_trendy|    loans_repayments|      false|
|project_trendy|        new_customer|      false|
|project_trendy|new_loans_default...|      false|
|project_trendy|new_loans_default...|      false|
|              |   bad_data_customer|       true|
|              |          ldh_ph_pts|       true|
|              |          loan_score|       true|
+--------------+--------------------+-----------+



In [29]:
fh_ldh_ph_df = spark.sql("select ldef.*, \
   CASE \
   WHEN LOWER(l.loan_status) LIKE '%fully paid%' THEN ${spark.sql.excellent_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%current%' THEN ${spark.sql.good_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%in grace period%' THEN ${spark.sql.bad_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%late (16-30 days)%' OR LOWER(l.loan_status) LIKE '%late (31-120 days)%' THEN ${spark.sql.very_bad_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%charged off%' THEN ${spark.sql.unacceptable_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   END AS loan_status_pts, \
   CASE \
   WHEN LOWER(a.home_ownership) LIKE '%own' THEN ${spark.sql.excellent_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%rent' THEN ${spark.sql.good_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%mortgage' THEN ${spark.sql.bad_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%any' OR LOWER(a.home_ownership) IS NULL THEN ${spark.sql.very_bad_rated_pts} \
   END AS home_pts, \
   CASE \
   WHEN l.funded_amount <= (a.total_high_credit_limit * 0.10) THEN ${spark.sql.excellent_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.10) AND l.funded_amount <= (a.total_high_credit_limit * 0.20) THEN ${spark.sql.very_good_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.20) AND l.funded_amount <= (a.total_high_credit_limit * 0.30) THEN ${spark.sql.good_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.30) AND l.funded_amount <= (a.total_high_credit_limit * 0.50) THEN ${spark.sql.bad_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.50) AND l.funded_amount <= (a.total_high_credit_limit * 0.70) THEN ${spark.sql.very_bad_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.70) THEN ${spark.sql.unacceptable_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   END AS credit_limit_pts, \
   CASE \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A1' THEN ${spark.sql.excellent_rated_pts} \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A2' THEN (${spark.sql.excellent_rated_pts} * 0.95) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A3' THEN (${spark.sql.excellent_rated_pts} * 0.90) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A4' THEN (${spark.sql.excellent_rated_pts} * 0.85) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A5' THEN (${spark.sql.excellent_rated_pts} * 0.80) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B1' THEN (${spark.sql.very_good_rated_pts}) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B2' THEN (${spark.sql.very_good_rated_pts} * 0.95) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B3' THEN (${spark.sql.very_good_rated_pts} * 0.90) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B4' THEN (${spark.sql.very_good_rated_pts} * 0.85) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B5' THEN (${spark.sql.very_good_rated_pts} * 0.80) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C1' THEN (${spark.sql.good_rated_pts}) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C2' THEN (${spark.sql.good_rated_pts} * 0.95) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C3' THEN (${spark.sql.good_rated_pts} * 0.90) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C4' THEN (${spark.sql.good_rated_pts} * 0.85) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C5' THEN (${spark.sql.good_rated_pts} * 0.80) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D1' THEN (${spark.sql.bad_rated_pts}) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D2' THEN (${spark.sql.bad_rated_pts} * 0.95) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D3' THEN (${spark.sql.bad_rated_pts} * 0.90) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D4' THEN (${spark.sql.bad_rated_pts} * 0.85) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D5' THEN (${spark.sql.bad_rated_pts} * 0.80) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E1' THEN (${spark.sql.very_bad_rated_pts}) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E2' THEN (${spark.sql.very_bad_rated_pts} * 0.95) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E3' THEN (${spark.sql.very_bad_rated_pts} * 0.90) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E4' THEN (${spark.sql.very_bad_rated_pts} * 0.85) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E5' THEN (${spark.sql.very_bad_rated_pts} * 0.80) \
   WHEN (a.grade) in ('F', 'G') THEN (${spark.sql.unacceptable_rated_pts}) \
   END AS grade_pts \
   FROM ldh_ph_pts ldef \
   INNER JOIN project_trendy.loans l ON ldef.member_id = l.member_id \
   INNER JOIN project_trendy.new_customer a ON a.member_id = ldef.member_id where ldef.member_id NOT IN (select member_id from bad_data_customer)") 

In [30]:
fh_ldh_ph_df.createOrReplaceTempView("fh_ldh_ph_pts")

In [31]:
fh_ldh_ph_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- last_payment_pts: integer (nullable = false)
 |-- total_payment_pts: integer (nullable = true)
 |-- delinq_pts: integer (nullable = true)
 |-- public_records_pts: integer (nullable = true)
 |-- public_bankruptcies_pts: integer (nullable = true)
 |-- enq_pts: integer (nullable = true)
 |-- loan_status_pts: integer (nullable = false)
 |-- home_pts: integer (nullable = true)
 |-- credit_limit_pts: integer (nullable = false)
 |-- grade_pts: decimal(12,2) (nullable = true)



<ul> Final Score :
<li> Payment History = 20% </li>
<li>Loan Defaults = 45% </li>
<li> Financial Health = 35%</li>

In [32]:
loan_score = spark.sql("SELECT member_id, \
((last_payment_pts+total_payment_pts)*0.20) as payment_history_pts, \
((delinq_pts + public_records_pts + public_bankruptcies_pts + enq_pts) * 0.45) as defaulters_history_pts, \
((loan_status_pts + home_pts + credit_limit_pts + grade_pts)*0.35) as financial_health_pts \
FROM fh_ldh_ph_pts")

In [33]:
loan_score.show()

[Stage 31:>                                                         (0 + 1) / 1]

+--------------------+-------------------+----------------------+--------------------+
|           member_id|payment_history_pts|defaulters_history_pts|financial_health_pts|
+--------------------+-------------------+----------------------+--------------------+
|000152208b3e77b5b...|             130.00|               1440.00|            665.0000|
|0001cfa200f7480b9...|             130.00|                450.00|            665.0000|
|00026136ec721b938...|             100.00|                945.00|            653.6250|
|0004656412a18b9c1...|             130.00|               1192.50|            988.7500|
|0004f290201c29d93...|             100.00|               1440.00|            682.5000|
|0005c3b2e23d18d3a...|             130.00|                945.00|            759.5000|
|0005f7eea1cc9ebeb...|             100.00|               1440.00|            672.0000|
|00068d8b359a1a6e1...|             130.00|                945.00|            962.5000|
|0006f75e5d4813efb...|             130.00| 

                                                                                

In [34]:
final_loan_score = loan_score.withColumn('loan_score', loan_score.payment_history_pts + loan_score.defaulters_history_pts + loan_score.financial_health_pts)

In [35]:
final_loan_score.createOrReplaceTempView("loan_score_eval")

In [36]:
loan_score_final = spark.sql("select ls.*, \
case \
WHEN loan_score > ${spark.sql.very_good_grade_pts} THEN 'A' \
WHEN loan_score <= ${spark.sql.very_good_grade_pts} AND loan_score > ${spark.sql.good_grade_pts} THEN 'B' \
WHEN loan_score <= ${spark.sql.good_grade_pts} AND loan_score > ${spark.sql.bad_grade_pts} THEN 'C' \
WHEN loan_score <= ${spark.sql.bad_grade_pts} AND loan_score  > ${spark.sql.very_bad_grade_pts} THEN 'D' \
WHEN loan_score <= ${spark.sql.very_bad_grade_pts} AND loan_score > ${spark.sql.unacceptable_grade_pts} THEN 'E'  \
WHEN loan_score <= ${spark.sql.unacceptable_grade_pts} THEN 'F' \
end as loan_final_grade \
from loan_score_eval ls")

In [37]:
loan_score_final.createOrReplaceTempView("loan_final_table")

In [38]:
loan_score_final.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/home/tushar/Documents/project/final_processed/loan_score") \
.save()

                                                                                