### CSI Calculation

##### Numeric Variable

In [None]:
# Step 1: Apply NTILE to baseline dataset (already done)
window_spec = Window.orderBy(F.col(continuous_field))
baseline_binned = baseline_df.withColumn(
    "bin", F.ntile(VDI_N_BINS).over(window_spec)
)

# Get the max and min values from the baseline (these will be used for outlier bins)
max_value_in_baseline = baseline_binned.agg(F.max(continuous_field)).collect()[0][0]
min_value_in_baseline = baseline_binned.agg(F.min(continuous_field)).collect()[0][0]

# Step 2: Summarize baseline bins (already done)
baseline_summary = (
    baseline_binned.groupBy("bin")
    .agg(
        F.min(continuous_field).alias("MIN_VALUE"),
        F.max(continuous_field).alias("MAX_VALUE"),
        F.count("*").alias("n_baseline"),
    )
)

# Step 3: Handle extreme values in the validation dataset (both higher and lower than baseline range)
validation_with_bins = validation_df.join(
    baseline_summary,
    (F.col(continuous_field) >= F.col("MIN_VALUE")) & 
    (F.col(continuous_field) <= F.col("MAX_VALUE")),
    "left",
).withColumn(
    "bin",
    F.when(F.col(continuous_field) > max_value_in_baseline, VDI_N_BINS)  # Assign to upper outlier bin
    .when(F.col(continuous_field) < min_value_in_baseline, 0)  # Assign to lower outlier bin (0 bin)
    .otherwise(F.col("bin"))  # Otherwise assign based on normal binning
)

# Step 4: Summarize the validation bins (already done)
validation_with_bins_summary = validation_with_bins.groupBy("bin").agg(
    F.count("*").alias("n_validation")
)

# Step 5: Calculate VDI metrics (already done)
vdi_table = (
    baseline_summary.join(validation_with_bins_summary, "bin", "outer")
    .fillna(0, subset=["n_baseline", "n_validation"])
    .withColumn("p_baseline", F.col("n_baseline") / F.sum("n_baseline").over(Window.partitionBy()))
    .withColumn("p_validation", F.col("n_validation") / F.sum("n_validation").over(Window.partitionBy()))
    .withColumn("Difference", F.col("p_baseline") - F.col("p_validation"))
    .withColumn("Ratio", F.when(F.col("p_validation") != 0, F.col("p_baseline") / F.col("p_validation")).otherwise(0))
    .withColumn("Weight_of_Evidence", F.when(F.col("Ratio") > 0, F.log(F.col("Ratio"))).otherwise(0))
    .withColumn("Contribution", F.col("Difference") * F.col("Weight_of_Evidence"))
)

# Step 6: Calculate total VDI/CSI (already done)
total_vdi = vdi_table.agg(F.sum("Contribution").alias("VDI")).collect()[0]["VDI"]

# Step 7: Display the results (already done)
print("VDI (Continuous):", total_vdi)
vdi_table.show()


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Apply NTILE to baseline dataset
window_spec = Window.orderBy(F.col(continuous_field))
baseline_binned = baseline_df.withColumn(
    "bin", F.ntile(VDI_N_BINS).over(window_spec)
)

# Get the max and min values from the baseline (these will be used for outlier bins)
max_value_in_baseline = baseline_binned.agg(F.max(continuous_field)).collect()[0][0]
min_value_in_baseline = baseline_binned.agg(F.min(continuous_field)).collect()[0][0]

# Step 2: Summarize baseline bins
baseline_summary = (
    baseline_binned.groupBy("bin")
    .agg(
        F.min(continuous_field).alias("MIN_VALUE"),
        F.max(continuous_field).alias("MAX_VALUE"),
        F.count("*").alias("n_baseline"),
    )
)

# Step 3: Handle extreme values in the validation dataset (both higher and lower than baseline range)
validation_with_bins = validation_df.join(
    baseline_summary,
    (F.col(continuous_field) >= F.col("MIN_VALUE")) & 
    (F.col(continuous_field) <= F.col("MAX_VALUE")),
    "left",
).withColumn(
    "bin",
    F.when(F.col(continuous_field) > max_value_in_baseline, VDI_N_BINS)  # Upper outlier bin
    .when(F.col(continuous_field) < min_value_in_baseline, 0)  # Lower outlier bin
    .otherwise(F.col("bin"))  # Normal binning
)

# Step 4: Add interval column (e.g., "0-10")
validation_with_bins = validation_with_bins.join(
    baseline_summary,
    "bin",
    "left"
).withColumn(
    "interval",
    F.concat_ws("-", F.col("MIN_VALUE"), F.col("MAX_VALUE"))  # Create interval as "MIN_VALUE-MAX_VALUE"
)

# Step 5: Summarize the validation bins
validation_with_bins_summary = validation_with_bins.groupBy("bin").agg(
    F.count("*").alias("n_validation")
)

# Step 6: Calculate VDI metrics
vdi_table = (
    baseline_summary.join(validation_with_bins_summary, "bin", "outer")
    .fillna(0, subset=["n_baseline", "n_validation"])
    .withColumn("p_baseline", F.col("n_baseline") / F.sum("n_baseline").over(Window.partitionBy()))
    .withColumn("p_validation", F.col("n_validation") / F.sum("n_validation").over(Window.partitionBy()))
    .withColumn("Difference", F.col("p_baseline") - F.col("p_validation"))
    .withColumn("Ratio", F.when(F.col("p_validation") != 0, F.col("p_baseline") / F.col("p_validation")).otherwise(0))
    .withColumn("Weight_of_Evidence", F.when(F.col("Ratio") > 0, F.log(F.col("Ratio"))).otherwise(0))
    .withColumn("Contribution", F.col("Difference") * F.col("Weight_of_Evidence"))
)

# Step 7: Calculate total VDI/CSI
total_vdi = vdi_table.agg(F.sum("Contribution").alias("VDI")).collect()[0]["VDI"]

# Step 8: Display the results
print("VDI (Continuous):", total_vdi)
vdi_table.select("bin", "interval", "p_baseline", "p_validation", "Difference", "Ratio", "Weight_of_Evidence", "Contribution").show()


#### Categorical variable

<!-- # Step 2: Define a function to calculate VDI for a single categorical variable
def calculate_vdi_for_category(variable):
    # Get distinct categories from both datasets
    distinct_categories = (
        baseline_df.select(F.col(variable)).union(validation_df.select(F.col(variable)))
        .distinct()
    )

    # Join distinct categories with baseline counts
    baseline_summary = (
        distinct_categories.join(
            baseline_df.groupBy(variable).agg(F.count("*").alias("n_baseline")),
            variable,
            "left"
        )
        .fillna(0, subset=["n_baseline"])
        .withColumn("p_baseline", F.col("n_baseline") / F.sum("n_baseline").over(Window.partitionBy()))
    )

    # Join distinct categories with validation counts
    validation_summary = (
        distinct_categories.join(
            validation_df.groupBy(variable).agg(F.count("*").alias("n_validation")),
            variable,
            "left"
        )
        .fillna(0, subset=["n_validation"])
        .withColumn("p_validation", F.col("n_validation") / F.sum("n_validation").over(Window.partitionBy()))
    )

    # Combine baseline and validation summaries
    vdi_table = (
        baseline_summary.join(validation_summary, variable, "outer")
        .fillna(0, subset=["n_baseline", "n_validation"])  # Handle missing counts
        .withColumn("Difference", F.col("p_baseline") - F.col("p_validation"))
        .withColumn("Ratio", F.when(F.col("p_validation") != 0, F.col("p_baseline") / F.col("p_validation")).otherwise(0))
        .withColumn("Weight_of_Evidence", F.when(F.col("Ratio") > 0, F.log(F.col("Ratio"))).otherwise(0))
        .withColumn("Contribution", F.col("Difference") * F.col("Weight_of_Evidence"))
    )

    # Calculate the total VDI
    total_vdi = vdi_table.agg(F.sum("Contribution").alias("VDI")).collect()[0]["VDI"]
    return total_vdi, vdi_table -->

In [None]:
# Step 2: Define a function to calculate VDI for a single categorical variable
def calculate_vdi_for_category(variable):
    # Get distinct categories from both datasets
    distinct_categories = (
        baseline_df.select(F.col(variable)).union(validation_df.select(F.col(variable)))
        .distinct()
    )

    # Join distinct categories with baseline counts
    baseline_summary = (
        distinct_categories.join(
            baseline_df.groupBy(variable).agg(F.count("*").alias("n_baseline")),
            variable,
            "left"
        )
        .fillna(0, subset=["n_baseline"])
        .withColumn("p_baseline", F.col("n_baseline") / F.sum("n_baseline").over(Window.partitionBy()))
    )

    # Join distinct categories with validation counts
    validation_summary = (
        distinct_categories.join(
            validation_df.groupBy(variable).agg(F.count("*").alias("n_validation")),
            variable,
            "left"
        )
        .fillna(0, subset=["n_validation"])
        .withColumn("p_validation", F.col("n_validation") / F.sum("n_validation").over(Window.partitionBy()))
    )

    # Combine baseline and validation summaries
    vdi_table = (
        baseline_summary.join(validation_summary, variable, "outer")
        .fillna(0, subset=["n_baseline", "n_validation"])  # Handle missing counts
        .withColumn("Difference", F.col("p_baseline") - F.col("p_validation"))
        .withColumn("Ratio", F.when(F.col("p_validation") != 0, F.col("p_baseline") / F.col("p_validation")).otherwise(0))
        .withColumn("Weight_of_Evidence", F.when(F.col("Ratio") > 0, F.log(F.col("Ratio"))).otherwise(0))
        .withColumn("Contribution", F.col("Difference") * F.col("Weight_of_Evidence"))
    )

    # Calculate the total VDI
    total_vdi = vdi_table.agg(F.sum("Contribution").alias("VDI")).collect()[0]["VDI"]
    return total_vdi, vdi_table