In [0]:
accountkey = dbutils.secrets.get('databricks-scope','accountkey')

In [0]:
dbutils.fs.mounts()

In [0]:
already_mounted = False 
for x in dbutils.fs.mounts():
    if x.mountPoint == "/mnt/bronze":
        already_mounted = True
        print("It is already mounted")
        break
    else:
        already_mounted = False
if not already_mounted:
    dbutils.fs.mount(source = 'wasbs://bronze@loanlendingdatalake.blob.core.windows.net',
                     mount_point = '/mnt/bronze',
                     extra_configs={'fs.azure.account.key.loanlendingdatalake.blob.core.windows.net':accountkey})
    print("Mount Created")

In [0]:
already_mounted = False 
for x in dbutils.fs.mounts():
    if x.mountPoint == "/mnt/silver":
        already_mounted = True
        print("It is already mounted")
        break
    else:
        already_mounted = False
if not already_mounted:
    dbutils.fs.mount(source = 'wasbs://silver@loanlendingdatalake.blob.core.windows.net',
                     mount_point = '/mnt/silver',
                     extra_configs={'fs.azure.account.key.loanlendingdatalake.blob.core.windows.net':accountkey})
    print("Mount Created")

In [0]:
already_mounted = False 
for x in dbutils.fs.mounts():
    if x.mountPoint == "/mnt/gold":
        already_mounted = True
        print("It is already mounted")
        break
    else:
        already_mounted = False
if not already_mounted:
    dbutils.fs.mount(source = 'wasbs://gold@loanlendingdatalake.blob.core.windows.net',
                     mount_point = '/mnt/gold',
                     extra_configs={'fs.azure.account.key.loanlendingdatalake.blob.core.windows.net':accountkey})
    print("Mount Created")

In [0]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType,FloatType,DateType
from pyspark.sql.functions import col, concat, current_timestamp,regexp_replace,lit,to_date,when,sha2

In [0]:
account_schema = StructType(fields=[StructField("acc_id", StringType(), False),
                                     StructField("mem_id", StringType(), False),
                                     StructField("loan_id", StringType(), False),
                                     StructField("grade", StringType(), True),
                                     StructField("sub_grade",StringType(), True),
                                     StructField("emp_title",StringType(), True),
                                     StructField("emp_length",StringType(), True),
                                     StructField("home_ownership",StringType(), True),
                                     StructField("annual_inc",FloatType(), True),
                                     StructField("verification_status",StringType(), True),
                                     StructField("tot_hi_cred_lim",FloatType(), True),
                                     StructField("application_type",StringType(), True),
                                     StructField("annual_inc_joint",FloatType(), True),
                                     StructField("verification_status_joint",StringType(), True)
                                    
])

In [0]:
account_df=spark.read.option("header",True).schema(account_schema).csv("/mnt/bronze/lending_loan/account_details.csv")

In [0]:
display(account_df)

In [0]:
account_df.createOrReplaceTempView("temp")
unique_values=spark.sql("select distinct emp_length from temp").show()

In [0]:
replace_value_na=account_df.withColumn("emp_length", when(col("emp_length")== lit("n/a"),lit("null")).otherwise(col("emp_length"))  )
display(replace_value_na)

In [0]:
replace_value_1yr=replace_value_na.withColumn("emp_length", when(col("emp_length")== lit("< 1 year"),lit("1")).otherwise(col("emp_length"))  )
display(replace_value_1yr)

In [0]:
replace_value_10yr=replace_value_1yr.withColumn("emp_length", when(col("emp_length")== lit("10+ years"),lit("10")).otherwise(col("emp_length"))  )
display(replace_value_10yr)

In [0]:
string_to_remove="years"
# Use the regexp_replace function to remove the string from the column
replace_value_years = replace_value_10yr.withColumn("emp_length", regexp_replace(replace_value_10yr["emp_length"], string_to_remove, ""))
 
# Display the resulting dataframe
display(replace_value_years)

In [0]:
string_to_remove="year"
# Use the regexp_replace function to remove the string from the column
clean_df= replace_value_years.withColumn("emp_length", regexp_replace(replace_value_years["emp_length"], string_to_remove, ""))
 
# Display the resulting dataframe
display(clean_df)

In [0]:
clean_df.createOrReplaceTempView("temp")
display_df=spark.sql("select distinct emp_length from temp ")
display(display_df)

In [0]:
clean_df.createOrReplaceTempView("temp")
display_df=spark.sql("select * from temp where emp_length='null' ")
display(display_df)

#### Replace the Null Strings into Null Values 

In [0]:
final_clean_df=clean_df.replace("null",None)
display(final_clean_df)
 

In [0]:
final_clean_df.createOrReplaceTempView("temp")
display_df=spark.sql("select * from temp where tot_hi_cred_lim is null ")
display(display_df)

####Add the ingestion date to the dataframe


In [0]:
account_df_ingestDate=final_clean_df.withColumn("ingest_date", current_timestamp())
display(account_df_ingestDate)

#### Add a surrogate key to the dataframe


In [0]:
account_df_key=account_df_ingestDate.withColumn("account_key", sha2(concat(col("acc_id"),col("mem_id"),col("loan_id")), 256))
display(account_df_key)

####Rename columns in the dataframe

In [0]:
account_df_rename=account_df_key.withColumnRenamed("acc_id","account_id") \
.withColumnRenamed("mem_id","member_id") \
.withColumnRenamed("emp_title","employee_designation") \
.withColumnRenamed("emp_length","employee_experience") \
.withColumnRenamed("annual_inc","annual_income") \
.withColumnRenamed("tot_hi_cred_lim","total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint","annual_income_joint")  

In [0]:
account_df_rename.createOrReplaceTempView("temp")
df=spark.sql("select * from temp where date(ingest_date)='2023-11-14'")
display(df)

In [0]:
account_df_rename.createOrReplaceTempView("temp_table")
final_df=spark.sql("select account_key,ingest_date,account_id,member_id,loan_id,grade,sub_grade,employee_designation,employee_experience,home_ownership,annual_income,verification_status,total_high_credit_limit,application_type,annual_income_joint,verification_status_joint from temp_table ")
display(final_df)

####Write the cleaned dataframe into data lake

In [0]:
final_df.write.options(header='True').mode("append").parquet("/mnt/silver/lending_loan/account_details")

In [0]:
display(spark.read.parquet("/mnt/silver/lending_loan/account_details"))