In [None]:
configs = {
    "fs.azure.account.auth.type": "CustomAccessToken",
    "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}


mount_point = "/mnt/raw-data"

try:
    dbutils.fs.unmount(mount_point)
    # Optional: Catch the exception if the mount point does not exist
except Exception as e:
    print(f"Could not unmount {mount_point}: {e}")

# Now proceed with the mounting
dbutils.fs.mount(
    source = "abfss://raw-data@datalake0012anee.dfs.core.windows.net/",
    mount_point = mount_point,
    extra_configs = configs
)



In [None]:
dbutils.fs.ls("/mnt/raw-data")

[FileInfo(path='dbfs:/mnt/raw-data/dbo.loan_raw.parquet', name='dbo.loan_raw.parquet', size=7275570, modificationTime=1710086103000)]

In [None]:
input_path = "/mnt/raw-data/dbo.loan_raw.parquet"

In [None]:
df = spark.read.format('parquet').options(header='True', inferSchema='True').load(input_path)

In [None]:
display(df)

In [None]:
from pyspark.sql.functions import mean, col, regexp_replace, StringType, lower, count, when, sum as sqlsum


In [None]:
print((df.count(), len(df.columns)))

(148670, 34)


In [None]:
df = df.dropDuplicates()

In [None]:
columns_to_drop = ['loan_limit', 'approv_in_adv', 'loan_type', 'loan_purpose', 'construction_type', 'Credit_Worthiness',
                   'open_credit', 'Neg_ammortization', 'interest_only', 'lump_sum_payment', 'occupancy_type', 'credit_type',
                   'co-applicant_credit_type', 'Security_Type', 'submission_of_application']
df = df.drop(*columns_to_drop)

In [None]:
for col_name in df.columns:
    df = df.withColumnRenamed(col_name, col_name.lower())

In [None]:
numeric_columns = ["rate_of_interest", "interest_rate_spread", "upfront_charges", "property_value", "ltv", "dtir1", "income", "term"]
for col_name in numeric_columns:
    mean_val = df.select(mean(col(col_name))).collect()[0][0]
    df = df.na.fill({col_name: mean_val})

In [None]:
df = df.withColumn("age", regexp_replace("age", " ", "null"))
df = df.na.drop(subset=["age"])

In [None]:
string_columns = [field.name for field in df.schema.fields if field.dataType == StringType()]
for col_name in string_columns:
    df = df.withColumn(col_name, lower(col(col_name)))

In [None]:
null_counts = df.select([sqlsum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()

In [None]:
df.describe().show()


+-------+-----------------+------+-----------------+----------------------+------------------+-------------------+--------------------+-----------------+-----------------+------------------+----------+-----------+-----------------+------------------+------------------------+------+------------------+-------+-------------------+------------------+
|summary|               id|  year|           gender|business_or_commercial|       loan_amount|   rate_of_interest|interest_rate_spread|  upfront_charges|             term|    property_value|secured_by|total_units|           income|      credit_score|co_applicant_credit_type|   age|               ltv| region|             status|             dtir1|
+-------+-----------------+------+-----------------+----------------------+------------------+-------------------+--------------------+-----------------+-----------------+------------------+----------+-----------+-----------------+------------------+------------------------+------+------------------+-

In [None]:
output_path = "abfss://cleaned-data@datalake0012anee.dfs.core.windows.net"


df.coalesce(1).write.mode("overwrite").option("header", "true").parquet(output_path)
