In [93]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.shuffle.useOldFetchProtocol","true"). \
    config("spark.sql.warehouse.dir",f"/user/itv010130/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

### Identifying The Bad Data

In [94]:
spark.sql("select member_id,count(member_id) as total from itv010130_Loan_Database.customers group by member_id order by total desc")

member_id,total
e4c167053d5418230...,5
76b577467eda5bdbc...,4
3f87585a20f702838...,4
ad8e5d384dae17e06...,4
22593a1870543b2db...,3
819453be77718d747...,3
498bb6b1f0099cb47...,3
f54295a60946dedad...,3
53789bea7edc660ed...,3
c563428cb58da48ff...,3


In [95]:
spark.sql("select * from itv010130_Loan_Database.customers where member_id like 'e4c167053d5418230%'")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingestion_date
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,110907.0,Individual,,,2024-07-15 16:44:...
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,129833.0,Individual,,,2024-07-15 16:44:...
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,138780.0,Individual,,,2024-07-15 16:44:...
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,207300.0,Individual,,,2024-07-15 16:44:...
e4c167053d5418230...,,6,MORTGAGE,55000.0,IL,604xx,USA,B,B5,Verified,171165.0,Individual,,,2024-07-15 16:44:...


In [96]:
spark.sql("select member_id,count(member_id) as total from itv010130_Loan_Database.defaulters_delinquent group by member_id order by total desc limit 5")

member_id,total
ee2d0dd6ad9e048b8...,3
793b618a7de10f813...,3
b67f4aa39e82954f2...,3
bca141343d9405a9a...,3
6f88761291d14b65e...,3


In [97]:
spark.sql("select * from itv010130_Loan_Database.defaulters_delinquent where member_id like 'ee2d0dd6ad9e048b8%'")

member_id,delinq_in_2yrs,delinq_amount,months_since_last_delinq
ee2d0dd6ad9e048b8...,0,0.0,25.0
ee2d0dd6ad9e048b8...,0,0.0,66.0
ee2d0dd6ad9e048b8...,1,0.0,18.0


In [98]:
spark.sql("select member_id,count(member_id) as total from itv010130_Loan_Database.defaulters group by member_id order by total desc limit 5")

member_id,total
e3b0c44298fc1c149...,33
e4c167053d5418230...,5
ad8e5d384dae17e06...,4
76b577467eda5bdbc...,4
3f87585a20f702838...,4


In [99]:
spark.sql("select * from itv010130_Loan_Database.defaulters where member_id like 'e4c167053d5418230%'")

member_id,pub_rec,pub_record_bankruptcies,inquiry_in_last_6mths
e4c167053d5418230...,0,0,0
e4c167053d5418230...,0,0,1
e4c167053d5418230...,0,0,0
e4c167053d5418230...,0,0,3
e4c167053d5418230...,0,0,2


### Creating DataFrame and Saving The Bad Data

In [102]:
customer_bad_data_df = spark.sql("""select member_id from (select member_id,count(*) as total from itv010130_Loan_Database.customers group by member_id having total > 1)""")

In [103]:
customer_bad_data_df.count()

3157

In [104]:
defaulter_delinquent_bad_data_df = spark.sql("""select member_id from (select member_id,count(*) as total from itv010130_Loan_Database.defaulters_delinquent group by member_id having total > 1)""")

In [112]:
defaulter_delinquent_bad_data_df.count()

939

In [106]:
defaulter_bad_data_df = spark.sql("""select member_id from (select member_id,count(*) as total from itv010130_Loan_Database.defaulters group by member_id having total > 1)""")

In [107]:
defaulter_bad_data_df.count()

3189

In [108]:
customer_bad_data_df.repartition(1).write \
.format('csv') \
.option('header',True) \
.mode('overwrite') \
.option("path","/user/itv010130/loanproject/bad/customer_bad_data_csv") \
.save()

In [58]:
!hadoop fs -ls /user/itv010130/loanproject/bad

Found 1 items
drwxr-xr-x   - itv010130 supergroup          0 2024-07-17 17:02 /user/itv010130/loanproject/bad/customer_bad_data_csv


In [109]:
defaulter_delinquent_bad_data_df.repartition(1).write \
.format('csv') \
.option('header',True) \
.mode('overwrite') \
.option("path","/user/itv010130/loanproject/bad/defaulter_delinquent_bad_data_csv") \
.save()

In [110]:
defaulter_bad_data_df.repartition(1).write \
.format('csv') \
.option('header',True) \
.mode('overwrite') \
.option("path","/user/itv010130/loanproject/bad/defaulter_bad_data_csv") \
.save()

In [61]:
!hadoop fs -ls /user/itv010130/loanproject/bad

Found 3 items
drwxr-xr-x   - itv010130 supergroup          0 2024-07-17 17:02 /user/itv010130/loanproject/bad/customer_bad_data_csv
drwxr-xr-x   - itv010130 supergroup          0 2024-07-17 17:03 /user/itv010130/loanproject/bad/defaulter_bad_data_csv
drwxr-xr-x   - itv010130 supergroup          0 2024-07-17 17:03 /user/itv010130/loanproject/bad/defaulter_delinquent_bad_data_csv


### Combining and Storing All Bad Data

In [113]:
all_bad_data = customer_bad_data_df.select("member_id") \
.union(defaulter_delinquent_bad_data_df.select("member_id")) \
.union(defaulter_bad_data_df.select("member_id"))

In [116]:
distinct_all_bad_data = all_bad_data.distinct()

In [117]:
distinct_all_bad_data.count()

3189

In [118]:
distinct_all_bad_data.repartition(1).write \
.format('csv') \
.option('header',True) \
.mode('overwrite') \
.option("path","/user/itv010130/loanproject/bad/customer_final_bad_data_csv") \
.save()

In [67]:
!hadoop fs -ls /user/itv010130/loanproject/bad

Found 4 items
drwxr-xr-x   - itv010130 supergroup          0 2024-07-17 17:02 /user/itv010130/loanproject/bad/customer_bad_data_csv
drwxr-xr-x   - itv010130 supergroup          0 2024-07-17 17:12 /user/itv010130/loanproject/bad/customer_final_bad_data_csv
drwxr-xr-x   - itv010130 supergroup          0 2024-07-17 17:03 /user/itv010130/loanproject/bad/defaulter_bad_data_csv
drwxr-xr-x   - itv010130 supergroup          0 2024-07-17 17:03 /user/itv010130/loanproject/bad/defaulter_delinquent_bad_data_csv


In [68]:
!hadoop fs -ls /user/itv010130/loanproject/bad/customer_final_bad_data_csv

Found 2 items
-rw-r--r--   3 itv010130 supergroup          0 2024-07-17 17:12 /user/itv010130/loanproject/bad/customer_final_bad_data_csv/_SUCCESS
-rw-r--r--   3 itv010130 supergroup     207295 2024-07-17 17:12 /user/itv010130/loanproject/bad/customer_final_bad_data_csv/part-00000-4350abec-f3d2-49a8-9d91-12aae61b23fa-c000.csv


In [69]:
!hadoop fs -cat /user/itv010130/loanproject/bad/customer_final_bad_data_csv/part-00000-4350abec-f3d2-49a8-9d91-12aae61b23fa-c000.csv | wc -l

3190


### Saving Updates Clean Files and Creating Tables

In [119]:
distinct_all_bad_data.createOrReplaceTempView("all_bad_data")

In [120]:
new_customer_df = spark.sql("""select * from itv010130_Loan_Database.customers where member_id NOT IN (select member_id from all_bad_data)""")

In [123]:
new_customer_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path","/user/itv010130/loanproject/new_cleaned/customers_data_parquet") \
.save()

In [124]:
new_defaulters_df = spark.sql("""select * from itv010130_Loan_Database.defaulters where member_id NOT IN (select member_id from all_bad_data)""")

In [125]:
new_defaulters_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path","/user/itv010130/loanproject/new_cleaned/defaulters_data_parquet") \
.save()

In [126]:
new_defaulters_delinquent_df = spark.sql("""select * from itv010130_Loan_Database.defaulters_delinquent where member_id NOT IN (select member_id from all_bad_data)""")

In [127]:
new_defaulters_delinquent_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path","/user/itv010130/loanproject/new_cleaned/defaulters_delinquent_data_parquet") \
.save()

### Creating New External Tables

In [131]:
spark.sql("""
CREATE EXTERNAL TABLE itv010130_Loan_Database.customers_new(member_id string,emp_title string,emp_length int,home_ownership string,annual_income float,address_state string,address_zipcode string,address_country string,grade string,sub_grade string,verification_status string,total_high_credit_limit float,application_type string,join_annual_income float,verification_status_joint string,ingestion_date timestamp) stored as parquet LOCATION '/user/itv010130/loanproject/new_cleaned/customers_data_parquet'
""")

In [133]:
spark.sql("""
CREATE EXTERNAL TABLE itv010130_Loan_Database.defaulters_new(member_id string,pub_rec int,pub_record_bankruptcies int,inquiry_in_last_6mths int) stored as parquet LOCATION '/user/itv010130/loanproject/new_cleaned/defaulters_data_parquet'
""")

In [134]:
spark.sql("""
CREATE EXTERNAL TABLE itv010130_Loan_Database.defaulters_delinquent_new(member_id string,delinq_in_2yrs int,delinq_amount float,months_since_last_delinq float) stored as parquet LOCATION '/user/itv010130/loanproject/new_cleaned/defaulters_delinquent_data_parquet'
""")

In [135]:
spark.sql("select member_id, count(member_id) as total from itv010130_Loan_Database.customers_new group by member_id order by total desc limit 10")

member_id,total
60f71322249c5f3ce...,1
55a7b0d5e9ffaff74...,1
4d23236b420459814...,1
191062375281c4b23...,1
8f712742d8569edf4...,1
f1d6af0ceb261763a...,1
b005861ee8ed595f1...,1
d147010c35a2b6285...,1
c106edf445b61dbae...,1
babfc3aa487b63d5b...,1


In [136]:
spark.sql("select member_id, count(member_id) as total from itv010130_Loan_Database.defaulters_new group by member_id order by total desc limit 10")

member_id,total
3df3cdeddb74a8712...,1
0b6beb388edd047d8...,1
c02f5f84058c5f952...,1
0b9083d10c1a1db0b...,1
57cf658ad87be221b...,1
576209768213e3f3b...,1
dfcbd6434a458f952...,1
c2da27a4d1f99fba9...,1
38c9e9077eac24f54...,1
a85ccb7523cd134c1...,1


In [137]:
spark.sql("select member_id, count(member_id) as total from itv010130_Loan_Database.defaulters_delinquent_new group by member_id order by total desc limit 10")

member_id,total
dfcbd6434a458f952...,1
38c9e9077eac24f54...,1
37e30f66471454fba...,1
b7ae8015bb4d13156...,1
3082229e95dbf3a24...,1
3b69b138136f83111...,1
6c92f938d9f6ce989...,1
1ce674fddf9f7c11e...,1
fc0a58f165d67a681...,1
e17ac2a96a5776fae...,1


In [138]:
spark.stop()