In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('Loan Score') \
.config('spark.shuffle.useOldFetchProtocol' , 'true') \
.config('spark.sql.warehouse.dir' , 'user/itv006879/warehouse') \
.enableHiveSupport() \
.master('yarn') \
.getOrCreate()

## Associating points to grades for usage in Loan score

In [2]:
spark.conf.set('spark.sql.unacceptable_rated_pts' , 0)
spark.conf.set('spark.sql.very_bad_rated_pts' , 100)
spark.conf.set('spark.sql.bad_rated_pts' , 250)
spark.conf.set('spark.sql.good_rated_pts' , 500)
spark.conf.set('spark.sql.very_good_rated_pts' , 650)
spark.conf.set('spark.sql.excellent_rated_pts' , 800)

In [None]:
spark.conf.set('spark.sql.unacceptable_grade_pts' , 750)
spark.conf.set('spark.sql.very_bad_grade_pts' , 1000)
spark.conf.set('spark.sql.bad_rated_grade_pts' , 1500)
spark.conf.set('spark.sql.good_grade_pts' , 2000)
spark.conf.set('spark.sql.very_good_grade_pts' , 2500)

# The Tables required are:-

### customers_new

### loans

### loans_repayment

### loans_defaulters_delinq_new

### loans_defaulters_detail_records_enq_new




# 1. Payment history points

In [None]:
bad_data_cust_df =spark.read \
.format('csv') \
.option('header' , 'true')\
.load('user/itv006879/bad_data/*')

In [None]:
bad_data_cust_df.createOrReplaceTempView('bad_data_customers')

In [None]:
ph_df = spark.sql(''' select c.member_id ,
case
   when p.last_payment_amount < (c.monthly_installments * 0.5) then ${spark.sql.very_bad_rated_pts}
   when p.last_payment_amount > (c.monthly_installments * 0.5) and p.last_payment_amount < c.monthly_installment then ${spark.sql.bad_rated_pts}
   when p.last_payment_amount = (c.monthly_installments * 0.5) then ${spark.sql.good_rated_pts}
   when p.last_payment_amount > (c.monthly_installments ) and p.last_payment < (c.monthly_installment * 1.50) then ${spark.sql.very_good_rated_pts}
   when p.last_payment_amount > (c.monthly_installments * 1.50)  then ${spark.sql.excellent_rated_pts}
end as last_payment_pts ,
case
   when p.total_payment_received >= (c.funded_payment * 0.5) then ${spark.sql.very_good_rated_pts}
   when p.total_payment_received < (c.funded_payment * 0.5) and p.total_payment_received > 0 then ${spark.sql.good_rated_pts}
   when p.total_payment_received = 0 or p.total_payment_received is null then ${spark.sql.unacceptable_rated_pts}
end as total_payment_received_pts
from lending_club_proj_6879.loan_repayments p 
inner join loans c on c.loan_id = p.loan_id where not member_id in (select * from bad_data_customers)
''')

In [None]:
ph_df.createOrReplaceTempView('ph_pts')

# 2.Loan defaulter history points

In [None]:
ldh_ph_df = spark.sql(''' select p.*
case
when d.delinq_2yrs = 0 then ${spark.sql.excellent_rated_pts}
when d.delinq_2yrs between 1 and 2 then ${spark.sql.bad_rated_pts}
when d.delinq_2yrs between 3 and 5 then ${spark.sql.very_bad_rated_pts}
when d.delinq_2yrs > 5 or d.delinq_2yrs is null then ${spark.sql.unacceptable_rated_pts}
end as delinq_pts ,
case
when l.pub_rec = 0 then ${spark.sql.excellent_rated_pts}
when l.pub_rec between 1 and 2  then ${spark.sql.bad_rated_pts}
when l.pub_rec between 3 and 5 then ${spark.sql.very_bad_rated_pts}
when l.pub_rec > 5 or d.delinq_2yrs is null then ${spark.sql.unacceptable_rated_pts}
end as public_record_pts,
case
when l.pub_rec_bankruptcies = 0 then ${spark.sql.excellent_rated_pts}
when l.pub_rec_bankruptcies between 1 and 2  then ${spark.sql.bad_rated_pts}
when l.pub_rec_bankruptcies between 3 and 5 then ${spark.sql.very_bad_rated_pts}
when l.pub_rec_bankruptcies > 5 or d.delinq_2yrs is null then ${spark.sql.unacceptable_rated_pts}
end as public_bankruptcies_pts,
case
when l.inq_last_6mths = 0 then ${spark.sql.excellent_rated_pts}
when l.inq_last_6mths between 1 and 2  then ${spark.sql.bad_rated_pts}
when l.inq_last_6mths between 3 and 5 then ${spark.sql.very_bad_rated_pts}
when l.inq_last_6mths > 5 or d.delinq_2yrs is null then ${spark.sql.unacceptable_rated_pts}
end as enq_pts
from  lending_club_proj_6879.loans_defaulters_detail_records_enq_new l
inner join lending_club_proj_6879.loans_defaulters_delinq_new d on d.member_id = l.member_id 
inner join ph_pts p on p.member_id = l.member where not l.member_id in (select * from bad_data_customers)
''')

In [None]:
ldh_ph_df.createOrReplaceTempView('ldh_ph_pts')

## 3. Financial Health points

In [None]:
fh_ldh_ph_df = spark.sql(''' select ldef.* ,
case
when lower(l.loan_status) is like '%fully paid%' then ${spark.sql.excellent_rated_pts}
when lower(l.loan_status) is like '%current%' then ${spark.sql.good_rated_pts}
when lower(l.loan_status) is like '%in grace period%' then ${spark.sql.bad_rated_pts}
when lower(l.loan_status) is like '%late (16-30 days)%' or lower(loan_status) is like '%late (31 - 120 days)%' then ${spark.sql.very_bad_rated_pts}
when lower(l.loan_status) is like '%charged off%' then ${spark.sql.unacceptable_rated_pts}
end as loan_status_pts ,
case
when lower(a.home_ownership) like '%own'then ${spark.sql.excellent_rated_pts}
when lower(a.home_ownership) is like '%rent%' then ${spark.sql.good_rated_pts}
when lower(a.home_ownership) is like '%mortgage%' then ${spark.sql.bad_rated_pts}
when lower(a.home_ownership) is like '%any%' or lower(loan_status) is null then ${spark.sql.unacceptable_rated_pts}
end as home_pts
case
when l.funded_amount <= (a.total_high_credit_limit * 0.1) then ${spark.sql.excellent_rated_pts}
when l.funded_amount > (a.total_high_credit_limit * 0.1) and l.funded_amount < (a.total_high_credit_limit * 0.3) then ${spark.sql.good_rated_pts}
when l.funded_amount <= (a.total_high_credit_limit * 0.3) and l.funded_amount < (a.total_high_credit_limit * 0.5) then ${spark.sql.bad_rated_pts}
when l.funded_amount <= (a.total_high_credit_limit * 0.5) and l.funded_amount < (a.total_high_credit_limit * 0.9) then ${spark.sql.very_bad_rated_pts}
when l.funded_amount = a.total_high_credit_limit then ${spark.sql.unacceptable_rated_pts}
end as funded_amt_pts
case
when a.grade = 'A' and a.sub_grade in ('A1','A2' , 'A3' , 'A4' , 'A5') then ${spark.sql.excellent_rated_pts}
when a.grade = 'B' and a.sub_grade in ('B1','B2' , 'B3' , 'B4' , 'B5') then ${spark.sql.good_rated_pts}
when a.grade = 'C' and a.sub_grade in ('C1','C2' , 'C3' , 'C4' , 'C5') then ${spark.sql.bad_rated_pts}
when a.grade = 'D' and a.sub_grade in ('D1','D2' , 'D3' , 'D4' , 'D5') then ${spark.sql.very_bad_rated_pts}
when a.grade = 'E' and a.sub_grade in ('E1','E2' , 'E3' , 'E4' , 'E5') then ${spark.sql.unacceptable_rated_pts}
end as grade_pts
from ldh_ph_pts ldef
inner join lending_club_proj_6879.loans l on ldef.member_id = l.member_id 
inner join lending_club_proj_6879.customer_new a on a.member_id = ledf.member_id 
''')

In [None]:
fh_ldh_ph_df.createOrReplaceTempView('fh_ldh_ph_pts')

## Loan score calculation

### payment_history = 20%
### loan_default_history = 45%
### financial health = 35%

In [None]:
loan_score_df = spark.sql('''
select ((last_payment_pts + total_payment_received_pts) * 0.2 ) as payment_history_pts ,
((delinq_pts + public_record_pts + public_bankruptcies_pts + enq_pts) * 0.45) as loan_default_history_pts,
((loan_status_pts + home_pts + funded_amt_pts + grade_pts) * 0.35) as c
from fh_ldh_ph_pts
''')

In [None]:
loan_score_df = loan_score_df.withColumn('Loan_total_score' , loan_score_df.payment_history_pts + loan_score_df.loan_default_history_pts + loan_score_df.loan_score_df )

In [None]:
loan_score_df.createOrReplaceTempView('final_loan_score')

In [None]:
loan_score_eval = spark.sql('''
select ls.* ,
case
when ls.Loan_total_score >= ${'spark.sql.very_good_grade_pts'} then A+
when ls.Loan_total_score between ${'spark.sql.good_grade_pts'} and ${'spark.sql.very_good_grade_pts'}-1 then B
when ls.Loan_total_score betwneen ${'spark.sql.bad_rated_grade_pts'} and  ${'spark.sql.good_grade_pts'}-1  then C
when ls.Loan_total_score between ${'spark.sql.very_bad_grade_pts'}  and ${'spark.sql.bad_rated_grade_pts'}-1 then D
when ls.Loan_total_score between ${'spark.sql.unacceptable_grade_pts'} and ${'spark.sql.very_bad_grade_pts'} -1 then E
when ls.Loan_total_score < ${'spark.sql.unacceptable_grade_pts'} then F
end as loan_grade
from final_loan_score ls
''')

In [None]:
loan_score_eval.repartition(1).write \
.format('parquet') \
.option('header' , 'true') \
.mode('overwrite') \
.option('path' , '/user/itv006879/processed/') \
.save()