# Lendind Club Project
## Step 101
- Mounting Storage (via Azure Key vault backed secret scope)
- Reading File
- Generating unique Hash Value
- Spliting into multiple files
- Writing these files to stoarage

### Mounting Storage

In [0]:
dbutils.secrets.list("databricksazurestorage")

In [0]:
dbutils.fs.mount(
    source = 'wasbs://lendingclub@sparkprojectadls.blob.core.windows.net',
    mount_point = '/mnt/Lendingclub',
    extra_configs = {"fs.azure.account.key.sparkprojectadls.blob.core.windows.net": dbutils.secrets.get(scope = "databricksazurestorage", key = "databricks-storageAccount")}
)

In [0]:
# dbutils.fs.unmount("/mnt/Lendingclub")

Reading the Raw File

In [0]:
dbutils.fs.ls("/mnt/Lendingclub/Lendingclub/Lendingclub/rawfile")

In [0]:
raw_df = spark.read \
.format("csv") \
.option("InferSchema","true") \
.option("header","true") \
.load("dbfs:/mnt/Lendingclub/Lendingclub/Lendingclub/rawfile/accepted_2007_to_2018Q4.csv")

In [0]:
raw_df.createOrReplaceTempView("lending_club_data")

In [0]:
display(spark.sql("select * from lending_club_data").limit(10))

### Adding a unique hash value to identify each row

In [0]:
from pyspark.sql.functions import sha2,concat_ws

In [0]:
new_df = raw_df.withColumn("name_sha2", sha2(concat_ws("||", *["emp_title", "emp_length", "home_ownership", "annual_inc", "zip_code", "addr_state", "grade", "sub_grade","verification_status"]), 256))

In [0]:
display(new_df.limit(10))

In [0]:
display(new_df.groupBy("name_sha2").count().orderBy("count", ascending=False).limit(10))

## Splitting the file into 4 new files

### 1. customers_data
member_id, emp_title, emp_length, home_ownership, annual_inc, addr_state, zip_code, country, grade, sub_grade, verification_status, tot_hi_cred_lim, application_type, annual_inc_joint, verification_status_joint

### 2. loans_data 
loan_id, member_id, loan_amnt, funded_amnt, term, int_rate, installment, issue_d, loan_status, purpose, title

### 3. loan_repayments
loan_id, total_rec_prncp, total_rec_int, total_rec_late_fee, total_pymnt, last_pymnt_amnt, last_pymnt_d, next_pymnt_d

### 4. loan_defaulters
member_id, delinq_2yrs, delinq_amnt, pub_rec, pub_rec_bankruptcies, inq_last_6mths, total_rec_late_fee, mths_since_last_delinq, mths_since_last_record





In [0]:
new_df.createOrReplaceTempView("newtable")

In [0]:
spark.sql("""select name_sha2 as member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,'USA' as country,grade,sub_grade,
verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint from newtable
""").repartition(1).write \
.option("header","true")\
.format("csv") \
.mode("overwrite") \
.option("path", "/mnt/Lendingclub/Lendingclub/Lendingclub/raw/customers_data_csv") \
.save()

In [0]:
customers_df = spark.read \
.format("csv") \
.option("InferSchema","true") \
.option("header","true") \
.load("/mnt/Lendingclub/Lendingclub/Lendingclub/raw/customers_data_csv")

In [0]:
display(customers_df.limit(10))

In [0]:
spark.sql("""select id as loan_id, name_sha2 as member_id,loan_amnt,funded_amnt,term,int_rate,installment,issue_d,loan_status,purpose,
title from newtable""").repartition(1).write \
.option("header",True)\
.format("csv") \
.mode("overwrite") \
.option("path", "/mnt/Lendingclub/Lendingclub/Lendingclub/raw/loans_data_csv") \
.save()

In [0]:
loans_df = spark.read \
.format("csv") \
.option("InferSchema","true") \
.option("header","true") \
.load("/mnt/Lendingclub/Lendingclub/Lendingclub/raw/loans_data_csv")

In [0]:
display(loans_df.limit(10))

In [0]:
spark.sql("""select id as loan_id,total_rec_prncp,total_rec_int,total_rec_late_fee,total_pymnt,last_pymnt_amnt,last_pymnt_d,next_pymnt_d from newtable""").repartition(1).write \
.option("header",True)\
.format("csv") \
.mode("overwrite") \
.option("path", "/mnt/Lendingclub/Lendingclub/Lendingclub/raw/loans_repayments_csv") \
.save()

In [0]:
loans_repayments_df = spark.read \
.format("csv") \
.option("InferSchema","true") \
.option("header","true") \
.load("/mnt/Lendingclub/Lendingclub/Lendingclub/raw/loans_repayments_csv")

In [0]:
display(loans_repayments_df.limit(10))

In [0]:
spark.sql("""select name_sha2 as member_id,delinq_2yrs,delinq_amnt,pub_rec,pub_rec_bankruptcies,inq_last_6mths,total_rec_late_fee,mths_since_last_delinq,mths_since_last_record from newtable""").repartition(1).write \
.option("header",True)\
.format("csv") \
.mode("overwrite") \
.option("path", "/mnt/Lendingclub/Lendingclub/Lendingclub/raw/loans_defaulters_csv") \
.save()

In [0]:
loans_defaulters_df = spark.read \
.format("csv") \
.option("InferSchema","true") \
.option("header","true") \
.load("/mnt/Lendingclub/Lendingclub/Lendingclub/raw/loans_defaulters_csv")


In [0]:
display(loans_defaulters_df.limit(10))