In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.shuffle.useOldFetchProtocol','true'). \
config("spark.sql.warehouse.dir", f"/user/itv021558/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
loans_defaulter_schema = "member_id string, deling_2yrs float, deling_amount float, pub_rec float, pub_rec_bankruptacies float, inq_last_6mnths float, total_rec_late_fee float, mnths_since_last_deling float, mnths_since_last_record float"

In [3]:
loans_def_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(loans_defaulter_schema) \
.load("/public/trendytech/lendingclubproject/raw/loans_defaulters_csv")

In [4]:
loans_def_df.createOrReplaceTempView("defaulters")

In [5]:
spark.sql("select deling_2yrs, count(*) as total from defaulters group by deling_2yrs order by total desc ").show(40)

+-----------+-------+
|deling_2yrs|  total|
+-----------+-------+
|        0.0|1838878|
|        1.0| 281335|
|        2.0|  81285|
|        3.0|  29539|
|        4.0|  13179|
|        5.0|   6599|
|        6.0|   3717|
|        7.0|   2062|
|        8.0|   1223|
|        9.0|    818|
|       10.0|    556|
|       11.0|    363|
|       12.0|    264|
|       null|    261|
|       13.0|    165|
|       14.0|    120|
|       15.0|     87|
|       16.0|     55|
|       17.0|     30|
|       18.0|     30|
|       19.0|     23|
|       20.0|     17|
|       21.0|     12|
|       22.0|      5|
|       24.0|      4|
|       26.0|      3|
|       23.0|      2|
|       25.0|      2|
|       3.44|      2|
|       30.0|      2|
|       29.0|      2|
|      22.62|      1|
|       1.37|      1|
|      26.24|      1|
|      21.72|      1|
|      22.95|      1|
|       9.44|      1|
|       3.45|      1|
|       14.1|      1|
|       5.52|      1|
+-----------+-------+
only showing top 40 rows



In [7]:
from pyspark.sql.functions import col

In [8]:
defaulter_clean = loans_def_df.withColumn("deling_2yrs", col("deling_2yrs").cast("integer")).fillna(0, subset=["deling_2yrs"])

In [9]:
defaulter_clean1 = defaulter_clean.createOrReplaceTempView("defaulter")

In [None]:
spark.sql("select deling_2yrs, count(*) as total from defaulter group by deling_2yrs order by total desc ").show(50)

In [10]:
defaulters_only = spark.sql("""select member_id, deling_2yrs, deling_amount, int(mnths_since_last_deling)
                            from defaulter where deling_2yrs > 0 or mnths_since_last_deling > 0 
                            """)

In [11]:
defaulters_only

member_id,deling_2yrs,deling_amount,mnths_since_last_deling
9cb79aa7323e81be1...,2,0.0,11
aac68850fdac09fd0...,1,0.0,21
c89986155a070db2e...,1,0.0,5
6e8d94bf446e97025...,0,0.0,36
42f73fd8a01f1c475...,0,0.0,46
1eef79a0e79b72c7a...,1,0.0,21
1dd1d1b51473d4993...,0,0.0,44
ec1953dba2cfb89ad...,2,0.0,13
8241a6bb3a9350fb8...,0,0.0,57
cdc94fa1c29a6a70a...,0,0.0,44


In [11]:
no_pub_rec = spark.sql("""select member_id from defaulters where pub_rec > 0.0 or pub_rec_bankruptacies > 0.0 
                      or inq_last_6mnths > 0.0""")

In [13]:
no_pub_rec

member_id
0dd2bbc517e3c8f9e...
458458599d3df3bfc...
f1efcf7dfbfef21be...
c89986155a070db2e...
e88945f86a96f8d71...
4e1c30a5dfe9f1e20...
76cbefe31f7834f47...
47d002f59a274c6f2...
09a1c6855801dad88...
56d4375718ad6940d...


In [14]:
defaulters_only.write \
.option("header", True) \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv021558/lendingclubproject/cleaned/loans_defaulters_deling") \
.save()

In [15]:
no_pub_rec.write \
.option("header", True) \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv021558/lendingclubproject/cleaned/loans_inquiry_deling") \
.save()

In [14]:
loans_def_detail_records_enq_df = spark.sql("select member_id, pub_rec, pub_rec_bankruptacies, inq_last_6mnths from defaulters")

In [17]:
loans_def_processed_df = loans_def_df.withColumn("deling_2yrs",
                                                     col("deling_2yrs").cast("integer")).fillna(0, subset=["deling_2yrs"])


In [18]:
loans_def_p_pub_rec_df = loans_def_processed_df.withColumn("pub_rec", 
                                                           col("pub_rec").cast("integer")).fillna(0, subset=["pub_rec"])

In [20]:
loans_def_p_pub_rec_bankruptcies_df = loans_def_p_pub_rec_df.withColumn("pub_rec_bankruptacies", 
                                    col("pub_rec_bankruptacies").cast("integer")).fillna(0, 
                                    subset=["pub_rec_bankruptacies"])

In [21]:
loans_def_p_inq_last_6mths_df = loans_def_p_pub_rec_bankruptcies_df.withColumn("inq_last_6mnths", 
                                col("inq_last_6mnths").cast("integer")).fillna(0, subset=["inq_last_6mnths"])

In [22]:
loans_def_p_inq_last_6mths_df.createOrReplaceTempView("loan_defaulters")

In [24]:
loans_def_detail_records_enq_df = spark.sql("select member_id, pub_rec, pub_rec_bankruptacies, inq_last_6mnths from loan_defaulters")


In [25]:
loans_def_detail_records_enq_df

member_id,pub_rec,pub_rec_bankruptacies,inq_last_6mnths
9cb79aa7323e81be1...,0,0,0
0dd2bbc517e3c8f9e...,1,1,3
458458599d3df3bfc...,1,1,1
05ea141ec28b5c7f7...,0,0,0
aac68850fdac09fd0...,0,0,0
3a423e4589e89f429...,0,0,0
f1efcf7dfbfef21be...,0,0,1
c89986155a070db2e...,0,0,1
118dc629b6e134419...,0,0,0
a86fa4b7493708333...,0,0,0


In [26]:
loans_def_detail_records_enq_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path","/user/itv021558/lendingclubproject/cleaned/loans_defaulters_detail_records_enq_parquet") \
.save()

In [27]:
spark.stop()