In [73]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import getpass

username = getpass.getuser()

In [74]:
spark = SparkSession.\
        builder.\
        config('spark.ui.port','0').\
        config('spark.sql.warehouse.dir',f'/user/{username}/warehouse').\
        config('spark.shuffle.useOldFetchProtocol','true').\
        enableHiveSupport().\
        master('yarn').\
        getOrCreate()


In [75]:
spark.sql("USE itv015278_lending_club")

In [76]:
spark.sql("SHOW TABLES").show(9,False)

+----------------------+-------------------------------+-----------+
|database              |tableName                      |isTemporary|
+----------------------+-------------------------------+-----------+
|itv015278_lending_club|customers                      |false      |
|itv015278_lending_club|customers_loan_v               |false      |
|itv015278_lending_club|loans                          |false      |
|itv015278_lending_club|loans_defaulters_delinq        |false      |
|itv015278_lending_club|loans_defaulters_detail_rec_enq|false      |
|itv015278_lending_club|loans_repayments               |false      |
+----------------------+-------------------------------+-----------+



In [77]:
spark.sql("select member_id , loan_amount , total_principal_received , last_payment_amount from loans l inner join loans_repayments lr on l.loan_id = lr.loan_id where l.loan_amount > lr.last_payment_amount ")

member_id,loan_amount,total_principal_received,last_payment_amount
dcec9334e70f1cc95...,12000.0,12000.0,11909.77
fc58ca61f51f9dcac...,13000.0,7918.89,463.43
2fb62a6ca51063b11...,6000.0,3742.07,201.42
488268a5531951622...,12000.0,12000.0,9594.21
ade6026208e48f5f9...,12000.0,7443.09,361.38
7c8b0ca6acddfaeb1...,10000.0,10000.0,8729.01
a707b7fe7c38bad65...,24000.0,24000.0,19752.63
1df639cddea30c288...,10000.0,10000.0,4789.04
22d67005e12d8726d...,3500.0,2198.77,115.24
009cf312bd46551b4...,21000.0,4589.16,544.64


In [78]:
spark.sql("REFRESH TABLE customers")

bad_data_customer_df = spark.sql("""
select member_id , count(*) as total
from customers
group by member_id
having total > 1
""")

In [79]:
bad_data_customer_only_df = bad_data_customer_df.select("member_id")

In [80]:
bad_data_customer_only_df

member_id
cd1fdca829c443fa7...
61be6498c93cadf89...
a53e2f488d2d76a30...
675151e58a628e87b...
761b2f1e61999e14e...
4231a55d0199c619a...
d4782ddad8591f44d...
f284044b881f218c0...
2bae2e4dd6d5f2b21...
01b2223757c3b62e7...


In [81]:
spark.sql("REFRESH TABLE loans_defaulters_delinq")
bad_data_loans_defaulters_delinq_df = spark.sql("""
select member_id from (select member_id , count(*) as total
from loans_defaulters_delinq
group by member_id
having total > 1)
""")

In [82]:
bad_data_loans_defaulters_delinq_df.count()

939

In [83]:

bad_data_loans_defaulters_detail_rec_df = spark.sql("""
select member_id from (select member_id , count(*) as total
from loans_defaulters_detail_rec_enq
group by member_id
having total > 1)
""")

In [84]:
 bad_data_loans_defaulters_detail_rec_df.count()

3189

In [85]:
bad_data_customer_only_df.repartition(1).write.format("csv").mode("overwrite").option("header","True").option("path","/user/itv015278/lendingclubproject/bad_data/customers").save()

In [86]:
bad_data_loans_defaulters_delinq_df.repartition(1).write.format("csv").mode("overwrite").option("header","True").option("path","/user/itv015278/lendingclubproject/bad_data/loan_defaulter_delinq").save()

In [87]:
bad_data_loans_defaulters_detail_rec_df.repartition(1).write.format("csv").mode("overwrite").option("header","True").option("path","/user/itv015278/lendingclubproject/bad_data/loan_defaulter_detail_record").save()

In [89]:
union_data_of_all_bad_customers = bad_data_customer_only_df.union(bad_data_loans_defaulters_delinq_df).union(bad_data_loans_defaulters_detail_rec_df)

In [90]:
union_data_of_all_bad_customers.stop()

7285

In [93]:
bad_customer_final_df = union_data_of_all_bad_customers.distinct()

In [94]:
bad_customer_final_df.createOrReplaceTempView("bad_data_customer")

In [111]:
cleaned_customer_df = spark.sql("select cust.* from customers cust left join bad_data_customer bad_cust on cust.member_id = bad_cust.member_id where bad_cust.member_id is NULL ")

In [112]:
cleaned_customer_df.write.mode("overwrite").option("path","/user/itv015278/lendingclubproject/cleaned_new/customers_parquet").save()

In [113]:
cleaned_loan_defaulter_delinq_df = spark.sql("select cust.* from loans_defaulters_delinq cust left join bad_data_customer bad_cust on cust.member_id = bad_cust.member_id where bad_cust.member_id is NULL ")

In [114]:
cleaned_loan_defaulter_delinq_df.write.mode("overwrite").option("path","/user/itv015278/lendingclubproject/cleaned_new/loan_defaulters_delinq_parquet").save()

In [115]:
cleaned_loans_defaulters_detail_rec_enq = spark.sql("select cust.* from loans_defaulters_detail_rec_enq cust left join bad_data_customer bad_cust on cust.member_id = bad_cust.member_id where bad_cust.member_id is NULL ")

In [116]:
cleaned_loans_defaulters_detail_rec_enq.write.mode("overwrite").option("path","/user/itv015278/lendingclubproject/cleaned_new/loan_defaulters_detail_rec_enq_parquet").save()

In [117]:
spark.sql("""
CREATE EXTERNAL TABLE itv015278_lending_club.customers_new(member_id string , emp_title string , emp_length int , home_ownership string , annual_income float , address_state string ,
address_zipcode string , address_country string , grade string , sub_grade string , verification_status string , total_high_credit_limit float , application_type string , join_annual_income float,
verification_status_joint string , ingest_date timestamp) stored as parquet LOCATION '/user/itv015278/lendingclubproject/cleaned_new/customers_parquet'
""")

In [119]:
spark.sql("""CREATE EXTERNAL TABLE itv015278_lending_club.loans_defaulters_delinq_new
(member_id string,delinq_2yrs integer,delinq_amnt float,mths_since_last_delinq integer)
stored as parquet 
LOCATION '/user/itv015278/lendingclubproject/cleaned_new/loan_defaulters_delinq_parquet' """)


In [122]:
spark.sql("""CREATE EXTERNAL TABLE itv015278_lending_club.loans_defaulters_detail_rec_enq_new
(member_id string,public_record integer,public_record_bankruptcies integer,inquiries_last_6months integer)
stored as parquet 
LOCATION '/user/itv015278/lendingclubproject/cleaned_new/loan_defaulters_detail_rec_enq_parquet' """)
