#### Listing secret scopes

In [0]:
display(dbutils.secrets.listScopes())
display(dbutils.secrets.list('databricks-secret-scope'))


#### Connecting to ADLS using service principle

In [0]:
service_credential = dbutils.secrets.get(scope="databricks-secret-scope",key="databricks-crdp-service-principle-secret")

storage_account = "crdpadlsdev"
application_id = "fe2198ac-ae05-4b49-96ea-f8ffd5a12c01"
directory_id = "18ea6cf2-8572-415d-9063-f620b0105a11"


spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

#### Listing files in the bronze container of crdpadlsdev

In [0]:

display(dbutils.fs.ls("abfss://bronze@crdpadlsdev.dfs.core.windows.net"))

#### Ingest Data from ADLS (Bronze Layer)

In [0]:

base_path = "abfss://bronze@crdpadlsdev.dfs.core.windows.net/"
 
#  Read each CSV file
df_description = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(base_path + "HomeCredit_columns_description.csv")

df_pos_cash = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(base_path + "POS_CASH_balance.csv")

df_application_train = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(base_path + "application_train.csv")

df_bureau_balance = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(base_path + "bureau_balance.csv")

df_credit_card_balance = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(base_path + "credit_card_balance.csv")

df_installments_payments = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(base_path + "installments_payments.csv")

df_previous_application = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(base_path + "previous_application.csv")


#### Ingest Data from Azure SQL Server

In [0]:
import pandas as pd
import pymssql

def get_sql_connection(server, database, username, password):
    conn = pymssql.connect(
        server=server,
        user=username,
        password=password,
        database=database
    )
    return conn

def read_table_to_pandas(conn, table_name):
    query = f"SELECT * FROM {table_name}"
    return pd.read_sql(query, conn)

def pandas_to_spark(spark, pandas_df):
    return spark.createDataFrame(pandas_df)


In [0]:
# Connection details
server = ''
database = ''
username = ''
password = ''

# Connect to Azure SQL
conn = get_sql_connection(server, database, username, password)

# Load table as pandas
pdf = read_table_to_pandas(conn, 'bureau')

# Convert to Spark DataFrame
df_bureau = pandas_to_spark(spark, pdf)


#### Data Profiling & Schema Overview

In [0]:
from pyspark.sql.functions import col, count, when, count

def profile_dataframe(df, df_name, num_rows=5):
    print(f"\n Profiling: {df_name} -->")
    print("\n Schema:")
    df.printSchema()

    print("\n Top Rows:")
    display(df.limit(num_rows))

    print("\n Null Count per Column:")
    nulls = df.select([ count(when(col(c).isNull(),c)).alias(c) for c in df.columns ])
    display(nulls)

    print("\n Summary Statistics:")
    display(df.describe())


In [0]:
profile_dataframe(df_application_train, "application_train")
profile_dataframe(df_previous_application, "previous_application")
profile_dataframe(df_bureau, "bureau")

#### Exploratory Data Analysis

Distinct SK_ID_CURR ID in df_bureau, df_previous_application and df_application_train

In [0]:
print ('previous_application dataset unique client ID count :-',df_previous_application.select(col("SK_ID_CURR")).distinct().count())
print ('application_train dataset uniqu client ID count :-',df_application_train.select(col("SK_ID_CURR")).distinct().count())
print ('bureau dataset unique client ID count :-',df_bureau.select(col("SK_ID_CURR")).distinct().count())

Distinct CREDIT_ACTIVE and CREDIT_CURRENCY values in bureau table


In [0]:
df_bureau.select('CREDIT_ACTIVE').distinct().display()
df_bureau.select('CREDIT_CURRENCY').distinct().display()

Distribution of data w.r.t to credit_currency. We can later consider only currency 1 since almost all data belongs to this category and drop other categories for consistency in our future calculations and avoid noise

In [0]:
df_bureau.select('CREDIT_CURRENCY').groupBy('CREDIT_CURRENCY').count().display()


Exploration of credit_card_balance dataset

Customer Credit Behavior Summary (SK_ID_CURR = 378907):

The customer had a credit account (SK_ID_PREV = 2562384) that was paid off around month -20. A new credit cycle began around month -11 under the same account ID, indicating revolving credit behavior. AMT_BALANCE and AMT_TOTAL_RECEIVABLE show how the outstanding principal and total dues (including interest/fees) evolved over time. This suggests the customer reuses the same credit line, typical of credit card or revolving loan products.



In [0]:
df_credit_card_balance.filter(col('SK_ID_CURR') == 378907).orderBy(col('MONTHS_BALANCE').desc()).display()


Customer Credit Behavior Summary (SK_ID_CURR = 260530):

This customer had a credit account (SK_ID_PREV = 2332438) that became active around month -11 with a large ATM drawing (₹247,500), followed by partial repayment in month -10. From month -9 onward, the balance dropped to zero. However, SK_DPD and SK_DPD_DEF show high values (up to 189), indicating significant payment delays or defaults prior to loan closure. Despite the final NAME_CONTRACT_STATUS being “Completed,” the historical overdue days suggest prior delinquency or late repayment behavior.



In [0]:
df_credit_card_balance.filter(col('SK_ID_CURR') == 260530).orderBy(col('MONTHS_BALANCE').desc()).display()


#### Feature Elimination: Dropping Unnecessary Columns

In [0]:

def extract_features_application_train(df):
    """
    Select only the key columns from the DataFrame.
    
    Parameters:
    df (DataFrame): The input DataFrame.

    Returns:
    DataFrame: DataFrame containing only the selected key columns.
    """
    key_columns = [
        "SK_ID_CURR",
        "TARGET",
        "NAME_CONTRACT_TYPE",
        "CODE_GENDER",
        "FLAG_OWN_CAR",
        "FLAG_OWN_REALTY",
        "CNT_CHILDREN",
        "AMT_INCOME_TOTAL",
        "AMT_CREDIT",
        "AMT_ANNUITY",
        "AMT_GOODS_PRICE",
        "NAME_TYPE_SUITE",
        "NAME_INCOME_TYPE",
        "NAME_EDUCATION_TYPE",
        "NAME_FAMILY_STATUS",
        "NAME_HOUSING_TYPE",
        "DAYS_BIRTH",
        "DAYS_EMPLOYED",
        "FLAG_MOBIL",
        "FLAG_WORK_PHONE",
        "FLAG_PHONE",
        "FLAG_EMAIL",
        "CNT_FAM_MEMBERS",
        "EXT_SOURCE_1",
        "EXT_SOURCE_2",
        "EXT_SOURCE_3",
        "DAYS_REGISTRATION",
        "DAYS_ID_PUBLISH"
    ]
    return df.select(*key_columns)


from pyspark.sql import DataFrame

def extract_features_credit_card_balance(df):
    """
    Select only the key columns from the DataFrame.
    
    Parameters:
    df (DataFrame): The input DataFrame.

    Returns:
    DataFrame: DataFrame containing only the selected key columns.
    """

    key_columns = [
        'SK_ID_CURR',            # Application ID
        'SK_ID_PREV',            # Previous credit ID
        'MONTHS_BALANCE',        # Time axis
        'AMT_BALANCE',           # Current balance
        'AMT_CREDIT_LIMIT_ACTUAL',  # Credit limit
        'AMT_DRAWINGS_CURRENT',     # Total current drawings
        'AMT_PAYMENT_CURRENT',      # Latest payment made
        'SK_DPD',                   # Days past due
    ]

    return df.select(*key_columns)


def extract_features_previous_application(df):
    """
    Select only the key columns from the DataFrame.
    
    Parameters:
    df (DataFrame): The input DataFrame.

    Returns:
    DataFrame: DataFrame containing only the selected key columns.
    """

    key_columns = [
        "SK_ID_CURR",
        "SK_ID_PREV",
        "NAME_CONTRACT_TYPE",
        "AMT_APPLICATION",
        "AMT_CREDIT",
        "AMT_DOWN_PAYMENT",
        "AMT_ANNUITY",
        "NAME_CASH_LOAN_PURPOSE",
        "NAME_CLIENT_TYPE",
        "NAME_CONTRACT_STATUS",
        "DAYS_DECISION",
        "CNT_PAYMENT"
    ]

    return df.select(*key_columns)




In [0]:
df_application_train = extract_features_application_train(df_application_train)
# df_credit_card_balance = extract_features_credit_card_balance(df_credit_card_balance)
df_previous_application = extract_features_previous_application(df_previous_application)

#### Data Transformation (Silver Layer Preparation)


#### Clean and handle null values

**Note:-** In bureau dataset since over 99% of records in bureau have CREDIT_CURRENCY = 'currency 1', it's better to filter out the rest.

In [0]:
from pyspark.sql.functions import col, when, lit
from pyspark.sql.functions import median as _median

def clean_bureau(df):
    # Keep only rows with CREDIT_CURRENCY = 'currency 1'
    df = df.filter(col("CREDIT_CURRENCY") == "currency 1")

    # Drop AMT_ANNUITY due to excessive nulls
    df = df.drop("AMT_ANNUITY")
    
    # Fill other numeric columns with 0 or default
    df = df.fillna({
        "DAYS_CREDIT_ENDDATE": 0,
        "DAYS_ENDDATE_FACT": 0,
        "AMT_CREDIT_MAX_OVERDUE": 0,
        "AMT_CREDIT_SUM_DEBT": 0,
        "AMT_CREDIT_SUM_LIMIT": 0
    })
    
    # Drop rows with null in AMT_CREDIT_SUM (only 13 rows)
    df = df.filter(col("AMT_CREDIT_SUM").isNotNull())
    
    return df


def clean_previous_application(df):
    # Fill numeric nulls with 0
    df = df.fillna({
        "AMT_DOWN_PAYMENT": 0,
        "AMT_ANNUITY": 0,
        "CNT_PAYMENT": 0
    })

    # Drop rows with null in AMT_CREDIT (only 1 row)
    df = df.filter(col("AMT_CREDIT").isNotNull())
    
    return df

def clean_application_train(df):
    # Fill categorical with 'Unknown'
    df = df.fillna({"NAME_TYPE_SUITE": "Unknown"})

    # Get medians for imputation
    medians = df.selectExpr(
        "percentile_approx(AMT_ANNUITY, 0.5) as median_annuity",
        "percentile_approx(AMT_GOODS_PRICE, 0.5) as median_goods_price",
        "percentile_approx(CNT_FAM_MEMBERS, 0.5) as median_fam",
        "percentile_approx(EXT_SOURCE_1, 0.5) as median_ext1",
        "percentile_approx(EXT_SOURCE_2, 0.5) as median_ext2",
        "percentile_approx(EXT_SOURCE_3, 0.5) as median_ext3"
    ).collect()[0]

    df = df.fillna({
        "AMT_ANNUITY": medians["median_annuity"],
        "AMT_GOODS_PRICE": medians["median_goods_price"],
        "CNT_FAM_MEMBERS": medians["median_fam"],
        "EXT_SOURCE_1": medians["median_ext1"],
        "EXT_SOURCE_2": medians["median_ext2"],
        "EXT_SOURCE_3": medians["median_ext3"]
    })

    return df




In [0]:
df_application_train = clean_application_train(df_application_train)
df_previous_application = clean_previous_application(df_previous_application)
df_bureau = clean_bureau(df_bureau)

Bureau Data Aggregation (**df_bureau_agg**), Previous Application Data Aggregation (**df_prev_agg**) and Feature Engineering

In [0]:
from pyspark.sql.functions import col, avg, sum, max, count, when

df_bureau_agg = df_bureau.groupBy("SK_ID_CURR").agg(
    count("*").alias("bureau_record_count"),
    sum("AMT_CREDIT_SUM").alias("total_credit_sum"),
    sum("AMT_CREDIT_SUM_DEBT").alias("total_credit_debt"),
    avg("AMT_CREDIT_SUM_DEBT").alias("avg_credit_debt"),
    avg("DAYS_CREDIT").alias("avg_days_credit"),
    max("AMT_CREDIT_MAX_OVERDUE").alias("max_overdue")
).withColumn(
    "credit_utilization_ratio", 
    col("total_credit_debt") / col("total_credit_sum")
)

df_prev_agg = df_previous_application.groupBy("SK_ID_CURR").agg(
    count("*").alias("previous_app_count"),
    sum("AMT_CREDIT").alias("total_prev_credit"),
    sum("AMT_DOWN_PAYMENT").alias("total_down_payment"),
    avg("AMT_ANNUITY").alias("avg_prev_annuity"),
    avg("CNT_PAYMENT").alias("avg_cnt_payment")
)


Join Aggregated Data with Application Train

In [0]:
df_applicant_profiles_final = df_application_train.alias("app") \
    .join(df_bureau_agg.alias("bur"), on="SK_ID_CURR", how="left") \
    .join(df_prev_agg.alias("prev"), on="SK_ID_CURR", how="left")


In [0]:
df_applicant_profiles_final.display()

#### Save to Silver Layer (Parquet)

In [0]:
df_applicant_profiles_final.write.mode("overwrite").parquet("abfss://silver@crdpadlsdev.dfs.core.windows.net/applicant_profiles/")