In [0]:
from pyspark.sql.functions import when, col
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
)
from pyspark.ml import Pipeline
import mlflow
import mlflow.spark

In [0]:
df_raw = spark.read.csv(
    "dbfs:/Volumes/workspace/credit-risk/credit-risk/german_credit_data.csv",
    header=True,
    inferSchema=True
)

df_raw.display()



In [0]:
def bronze_layer(df):
    # Drop index column if it exists
    if "_c0" in df.columns:
        df = df.drop("_c0")
    
    # Normalize column names
    df = df.toDF(*[
        c.lower().replace(" ", "_").replace("-", "_").replace("/", "_")
        for c in df.columns
    ])
    
    return df


In [0]:
df_bronze = bronze_layer(df_raw)
df_bronze.display()


In [0]:
def silver_layer(df):
    cols_with_na = ["saving_accounts", "checking_account"]

    for col_name in cols_with_na:
        if col_name in df.columns:
            df = df.withColumn(
                col_name,
                when(col(col_name) == "NA", "unknown").otherwise(col(col_name))
            )
    
    return df


In [0]:
df_silver = silver_layer(df_bronze)
df_silver.display()


In [0]:
def gold_layer(df):
    # Identify columns
    categorical_cols = [c for c, t in df.dtypes if t == "string"]
    numeric_cols = [c for c, t in df.dtypes if t in ["int", "double"]]

    # Index categorical columns
    indexers = [
        StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
        for c in categorical_cols
    ]

    # One-hot encode indexed columns
    encoder = OneHotEncoder(
        inputCols=[f"{c}_idx" for c in categorical_cols],
        outputCols=[f"{c}_ohe" for c in categorical_cols]
    )

    # Assemble numeric columns
    numeric_assembler = VectorAssembler(
        inputCols=numeric_cols,
        outputCol="numeric_vector"
    )

    # Normalize numeric vector
    scaler = MinMaxScaler(
        inputCol="numeric_vector",
        outputCol="numeric_scaled"
    )

    # Final features
    final_features = [f"{c}_ohe" for c in categorical_cols] + ["numeric_scaled"]

    assembler = VectorAssembler(
        inputCols=final_features,
        outputCol="features"
    )

    # Pipeline
    pipeline = Pipeline(
        stages=indexers + [encoder, numeric_assembler, scaler, assembler]
    )
    model = pipeline.fit(df)
    transformed = model.transform(df)

    return transformed


In [0]:
df_gold = gold_layer(df_silver)
df_gold.display()
df_gold.printSchema()


In [0]:
best_model_path = "/Volumes/workspace/credit-risk/credit-risk/models/best_model"
best_model = mlflow.spark.load_model(best_model_path)
