In [None]:
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier
from sklearn.cluster import KMeans

def get_bin_edges_csi(expected, bins, method="equal_width"):
    """
    Get bin edges for numerical variables based on the selected binning method.
    """
    if method == "equal_width":
        return np.linspace(min(expected), max(expected), bins + 1)
    
    elif method == "equal_freq":
        return np.percentile(expected, np.linspace(0, 100, bins + 1))
    
    elif method == "adaptive":
        X = np.array(expected).reshape(-1, 1)
        y = np.digitize(expected, bins=np.percentile(expected, np.linspace(0, 100, bins)))
        tree = DecisionTreeClassifier(max_leaf_nodes=bins)
        tree.fit(X, y)
        return np.sort(tree.tree_.threshold[tree.tree_.threshold > 0])
    
    elif method == "kmeans":
        data = np.array(expected).reshape(-1, 1)
        kmeans = KMeans(n_clusters=bins, random_state=42).fit(data)
        return np.sort(kmeans.cluster_centers_.flatten())

    elif method == "domain":
        return bins  # User-defined domain-specific bins (must be passed as a list)
    
    else:
        raise ValueError("Invalid binning method!")

def csi(expected, actual, bins=10, method="auto"):
    """
    Calculate Characteristic Stability Index (CSI) for a given variable.
    
    Parameters:
    - expected (array-like): Baseline dataset values.
    - actual (array-like): New dataset values.
    - bins (int or list): Number of bins (if numeric) or predefined categories.
    - method (str): Binning strategy for numerical data ('equal_width', 'equal_freq', 'adaptive', 'kmeans', or 'domain').

    Returns:
    - dict: CSI DataFrame and final CSI score.
    """
    expected = np.array(expected)
    actual = np.array(actual)

    # Determine if variable is categorical or numerical
    is_categorical = expected.dtype == 'object' or len(np.unique(expected)) < bins

    if is_categorical:
        # Categorical variable binning
        unique_bins = np.unique(np.concatenate([expected, actual]))
        expected_counts = np.array([np.sum(expected == cat) for cat in unique_bins])
        actual_counts = np.array([np.sum(actual == cat) for cat in unique_bins])
        min_bins, max_bins = unique_bins, unique_bins  # Same for categorical values
        
    else:
        # Numerical variable binning
        bin_edges = get_bin_edges_csi(expected, bins, method)
        bin_edges = np.unique(bin_edges)

        # Ensure full coverage by setting first bin to -inf and last to +inf
        bin_edges[0] = -np.inf
        bin_edges[-1] = np.inf

        expected_counts, _ = np.histogram(expected, bins=bin_edges)
        actual_counts, _ = np.histogram(actual, bins=bin_edges)
        min_bins = bin_edges[:-1]
        max_bins = bin_edges[1:]

    # Normalize counts to proportions
    expected_perc = expected_counts / np.sum(expected_counts)
    actual_perc = actual_counts / np.sum(actual_counts)

    # Avoid division by zero
    epsilon = 1e-10
    expected_perc = np.where(expected_perc == 0, epsilon, expected_perc)
    actual_perc = np.where(actual_perc == 0, epsilon, actual_perc)

    # Compute CSI
    csi_values = (actual_perc - expected_perc) * np.log(actual_perc / expected_perc)

    # Create CSI DataFrame
    csi_df = pd.DataFrame({
        "Min Bin": min_bins,
        "Max Bin": max_bins,
        "Expected Count": expected_counts,
        "Actual Count": actual_counts,
        "Expected %": expected_perc,
        "Actual %": actual_perc,
        "CSI Value": csi_values
    })

    total_csi = np.sum(csi_values)

    return {
        "Binning Strategy": method if not is_categorical else "categorical",
        "CSI DataFrame": csi_df,
        "Final CSI": total_csi
    }



In [None]:
# Example Usage
df_old = pd.DataFrame({'feature': np.random.normal(50, 10, 1000)})  # Baseline Data
df_new = pd.DataFrame({'feature': np.random.normal(55, 12, 1000)})  # New Data with drift

csi_result = csi(df_old['feature'], df_new['feature'], bins=10, method='equal_freq')
print(f"CSI Value: {csi_result['Final CSI']:.4f}")
print(csi_result["CSI DataFrame"])


In [None]:
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier
from sklearn.cluster import KMeans

def get_bin_edges_csi(expected, bins, method="equal_width"):
    """
    Get bin edges for numerical variables based on the selected binning method.
    """
    if method == "equal_width":
        return np.linspace(min(expected), max(expected), bins + 1)
    
    elif method == "equal_freq":
        return np.percentile(expected, np.linspace(0, 100, bins + 1))
    
    elif method == "adaptive":
        X = np.array(expected).reshape(-1, 1)
        y = np.digitize(expected, bins=np.percentile(expected, np.linspace(0, 100, bins)))
        tree = DecisionTreeClassifier(max_leaf_nodes=bins)
        tree.fit(X, y)
        return np.sort(tree.tree_.threshold[tree.tree_.threshold > 0])
    
    elif method == "kmeans":
        data = np.array(expected).reshape(-1, 1)
        kmeans = KMeans(n_clusters=bins, random_state=42).fit(data)
        return np.sort(kmeans.cluster_centers_.flatten())

    elif method == "domain":
        return bins  # User-defined domain-specific bins (must be passed as a list)
    
    else:
        raise ValueError("Invalid binning method!")

def csi(expected, actual, bins=10, method="auto"):
    """
    Calculate Characteristic Stability Index (CSI) for a given variable.
    """
    expected = np.array(expected)
    actual = np.array(actual)

    # Determine if variable is categorical or numerical
    is_categorical = expected.dtype == 'object' or len(np.unique(expected)) < bins

    if is_categorical:
        # Categorical variable binning
        unique_bins = np.unique(np.concatenate([expected, actual]))
        expected_counts = np.array([np.sum(expected == cat) for cat in unique_bins])
        actual_counts = np.array([np.sum(actual == cat) for cat in unique_bins])
        min_bins, max_bins = unique_bins, unique_bins  # Same for categorical values
    else:
        # Numerical variable binning
        bin_edges = get_bin_edges_csi(expected, bins, method)
        bin_edges = np.unique(bin_edges)

        expected_counts, _ = np.histogram(expected, bins=bin_edges)
        actual_counts, _ = np.histogram(actual, bins=bin_edges)
        min_bins = bin_edges[:-1]
        max_bins = bin_edges[1:]

    # Normalize counts to proportions
    expected_perc = expected_counts / np.sum(expected_counts)
    actual_perc = actual_counts / np.sum(actual_counts)

    # Avoid division by zero
    epsilon = 1e-10
    expected_perc = np.where(expected_perc == 0, epsilon, expected_perc)
    actual_perc = np.where(actual_perc == 0, epsilon, actual_perc)

    # Compute CSI
    csi_values = (actual_perc - expected_perc) * np.log(actual_perc / expected_perc)

    # Create CSI DataFrame
    csi_df = pd.DataFrame({
        "Min Bin": min_bins,
        "Max Bin": max_bins,
        "Expected Count": expected_counts,
        "Actual Count": actual_counts,
        "Expected %": expected_perc,
        "Actual %": actual_perc,
        "CSI Value": csi_values
    })

    total_csi = np.sum(csi_values)

    return total_csi, csi_df

def calculate_csi_for_dataframe(df_old, df_new, bins=10, method="auto"):
    """
    Compute CSI for all columns in the dataframe.

    Parameters:
    - df_old (DataFrame): Baseline dataset.
    - df_new (DataFrame): New dataset.
    - bins (int or dict): Number of bins (for numerical) or category handling.
    - method (str): Binning method ('equal_width', 'equal_freq', 'adaptive', 'kmeans', 'domain').

    Returns:
    - DataFrame with CSI values for each column.
    - Dictionary of detailed CSI DataFrames for each column.
    """
    if not isinstance(df_old, pd.DataFrame) or not isinstance(df_new, pd.DataFrame):
        raise ValueError("Both inputs should be pandas DataFrames.")

    csi_results = []
    detailed_csi_dfs = {}

    for col in df_old.columns:
        if col not in df_new.columns:
            continue  # Skip columns missing in new dataset

        expected = df_old[col].dropna()
        actual = df_new[col].dropna()

        # Skip if column is empty in either dataset
        if len(expected) == 0 or len(actual) == 0:
            continue

        try:
            csi_value, csi_df = csi(expected, actual, bins=bins, method=method)
            csi_results.append({"Feature": col, "CSI": csi_value})
            detailed_csi_dfs[col] = csi_df
        except Exception as e:
            print(f"Skipping column {col} due to error: {e}")

    return pd.DataFrame(csi_results).sort_values(by="CSI", ascending=False), detailed_csi_dfs

# Example Usage
df_old = pd.DataFrame({
    'numerical_feature': np.random.normal(50, 10, 1000),
    'categorical_feature': np.random.choice(['A', 'B', 'C'], 1000)
})

df_new = pd.DataFrame({
    'numerical_feature': np.random.normal(55, 12, 1000),
    'categorical_feature': np.random.choice(['A', 'B', 'C'], 1000, p=[0.7, 0.2, 0.1])  # Shifted distribution
})

csi_summary, detailed_csi = calculate_csi_for_dataframe(df_old, df_new, bins=10, method='equal_freq')

print("CSI Summary:")
print(csi_summary)

# To view detailed CSI for a particular feature
feature_name = 'numerical_feature'
if feature_name in detailed_csi:
    print(f"\nDetailed CSI for {feature_name}:")
    print(detailed_csi[feature_name])


In [5]:
def calculate_csi_for_dataframe(df_old, df_new, bins=10, method="auto"):
    """
    Compute CSI for all columns in the dataframe.

    Parameters:
    - df_old (DataFrame): Baseline dataset.
    - df_new (DataFrame): New dataset.
    - bins (int or dict): Number of bins (for numerical) or category handling.
    - method (str): Binning method ('equal_width', 'equal_freq', 'adaptive', 'kmeans', 'domain').

    Returns:
    - DataFrame with CSI values for each column.
    - Dictionary of detailed CSI DataFrames for each column.
    """
    if not isinstance(df_old, pd.DataFrame) or not isinstance(df_new, pd.DataFrame):
        raise ValueError("Both inputs should be pandas DataFrames.")

    csi_results = []
    detailed_csi_dfs = {}

    for col in df_old.columns:
        if col not in df_new.columns:
            continue  # Skip columns missing in new dataset

        expected = df_old[col].dropna()
        actual = df_new[col].dropna()

        # Skip if column is empty in either dataset
        if len(expected) == 0 or len(actual) == 0:
            continue

        try:
            csi_value, csi_df = csi(expected, actual, bins=bins, method=method)
            csi_results.append({"Feature": col, "CSI": csi_value})
            detailed_csi_dfs[col] = csi_df
        except Exception as e:
            print(f"Skipping column {col} due to error: {e}")

    return pd.DataFrame(csi_results).sort_values(by="CSI", ascending=False), detailed_csi_dfs





def get_bin_edges_csi(expected, bins, method="equal_width"):
    """
    Get bin edges for numerical variables based on the selected binning method.
    """
    if method == "equal_width":
        return np.linspace(min(expected), max(expected), bins + 1)
    
    elif method == "equal_freq":
        return np.percentile(expected, np.linspace(0, 100, bins + 1))
    
    elif method == "adaptive":
        X = np.array(expected).reshape(-1, 1)
        y = np.digitize(expected, bins=np.percentile(expected, np.linspace(0, 100, bins)))
        tree = DecisionTreeClassifier(max_leaf_nodes=bins)
        tree.fit(X, y)
        return np.sort(tree.tree_.threshold[tree.tree_.threshold > 0])
    
    elif method == "kmeans":
        data = np.array(expected).reshape(-1, 1)
        kmeans = KMeans(n_clusters=bins, random_state=42).fit(data)
        return np.sort(kmeans.cluster_centers_.flatten())

    elif method == "domain":
        return bins  # User-defined domain-specific bins (must be passed as a list)
    
    else:
        raise ValueError("Invalid binning method!")




        
def csi(expected, actual, bins=10, method="auto"):
    """
    Calculate Characteristic Stability Index (CSI) for a given variable.
    """
    expected = np.array(expected)
    actual = np.array(actual)

    # Determine if variable is categorical or numerical
    is_categorical = expected.dtype == 'object' or len(np.unique(expected)) < bins

    # Auto-select method based on variable type
    if method == "auto":
        if is_categorical:
            method = "domain"  # Treat categories as predefined bins
        else:
            if len(expected) > 1000:  # Large dataset → Adaptive binning
                method = "adaptive"
            elif np.ptp(expected) / np.std(expected) > 5:  # High dispersion → Equal Freq
                method = "equal_freq"
            else:
                method = "equal_width"  # Default for normal-like distributions

    if is_categorical:
        unique_bins = np.unique(np.concatenate([expected, actual]))
        expected_counts = np.array([np.sum(expected == cat) for cat in unique_bins])
        actual_counts = np.array([np.sum(actual == cat) for cat in unique_bins])
        min_bins, max_bins = unique_bins, unique_bins
    else:
        bin_edges = get_bin_edges_csi(expected, bins, method)
        bin_edges = np.unique(bin_edges)
        bin_edges[0] = -np.inf
        bin_edges[-1] = np.inf

        expected_counts, _ = np.histogram(expected, bins=bin_edges)
        actual_counts, _ = np.histogram(actual, bins=bin_edges)
        min_bins = bin_edges[:-1]
        max_bins = bin_edges[1:]

    # Normalize counts to proportions
    expected_perc = expected_counts / np.sum(expected_counts)
    actual_perc = actual_counts / np.sum(actual_counts)

    # Avoid division by zero
    epsilon = 1e-10
    expected_perc = np.where(expected_perc == 0, epsilon, expected_perc)
    actual_perc = np.where(actual_perc == 0, epsilon, actual_perc)

    # Compute CSI
    csi_values = (actual_perc - expected_perc) * np.log(actual_perc / expected_perc)

    # Create CSI DataFrame
    csi_df = pd.DataFrame({
        "Min Bin": min_bins,
        "Max Bin": max_bins,
        "Expected Count": expected_counts,
        "Actual Count": actual_counts,
        "Expected %": expected_perc,
        "Actual %": actual_perc,
        "CSI Value": csi_values
    })

    total_csi = np.sum(csi_values)

    return total_csi, csi_df


In [6]:
import numpy as np
import pandas as pd

# Set random seed for reproducibility
np.random.seed(42)

# Generate numerical features
age_old = np.random.normal(40, 10, 1000).astype(int)  # Normally distributed ages
income_old = np.random.normal(50000, 15000, 1000)  # Normally distributed income

# Introduce a shift in new dataset
age_new = np.random.normal(42, 12, 1000).astype(int)  # Slightly higher mean & std
income_new = np.random.normal(52000, 16000, 1000)  # Income distribution shift

# Generate categorical features
gender_old = np.random.choice(['Male', 'Female'], 1000, p=[0.6, 0.4])
marital_status_old = np.random.choice(['Single', 'Married', 'Divorced'], 1000, p=[0.4, 0.5, 0.1])

# Introduce shifts in categorical distributions
gender_new = np.random.choice(['Male', 'Female'], 1000, p=[0.5, 0.5])  # Gender ratio changed
marital_status_new = np.random.choice(['Single', 'Married', 'Divorced'], 1000, p=[0.35, 0.55, 0.1])  # Slight shift

# Create DataFrames
df_old = pd.DataFrame({
    'age': age_old,
    'income': income_old,
    'gender': gender_old,
    'marital_status': marital_status_old
})

df_new = pd.DataFrame({
    'age': age_new,
    'income': income_new,
    'gender': gender_new,
    'marital_status': marital_status_new
})

print("Old Dataset Sample:")
print(df_old.head())

print("\nNew Dataset Sample:")
print(df_new.head())


Old Dataset Sample:
   age        income  gender marital_status
0   44  70990.331549    Male         Single
1   38  63869.505244    Male        Married
2   46  50894.455549  Female         Single
3   55  40295.948334    Male         Single
4   37  60473.349704  Female         Single

New Dataset Sample:
   age        income  gender marital_status
0   33  21475.079074  Female        Married
1   40  38233.839828    Male        Married
2   32  45382.311465  Female        Married
3   38  82203.002517    Male         Single
4   19  60904.849993  Female        Married


In [7]:
# Run the CSI calculation on the generated data
csi_summary, detailed_csi = calculate_csi_for_dataframe(df_old, df_new, bins=10, method='auto')

# Display the CSI Summary
print("\nCSI Summary:")
print(csi_summary)

# View detailed CSI for a specific feature
feature_name = 'income'  # Example: Check detailed CSI for income
if feature_name in detailed_csi:
    print(f"\nDetailed CSI for {feature_name}:")
    print(detailed_csi[feature_name])



CSI Summary:
          Feature       CSI
0             age  0.078715
1          income  0.035361
2          gender  0.026087
3  marital_status  0.021375

Detailed CSI for income:
        Min Bin       Max Bin  Expected Count  Actual Count  Expected %  \
0          -inf  3.209502e+04             100           121         0.1   
1  3.209502e+04  3.877809e+04             100           104         0.1   
2  3.877809e+04  4.350830e+04             100            88         0.1   
3  4.350830e+04  4.743257e+04             100            86         0.1   
4  4.743257e+04  5.094616e+04             100            81         0.1   
5  5.094616e+04  5.482240e+04             100            82         0.1   
6  5.482240e+04  5.842725e+04             100            90         0.1   
7  5.842725e+04  6.387912e+04             100           119         0.1   
8  6.387912e+04  6.994130e+04             100            88         0.1   
9  6.994130e+04           inf             100           141         0.

In [8]:
detailed_csi

{'age':    Min Bin  Max Bin  Expected Count  Actual Count  Expected %  Actual %  \
 0     -inf     27.0              89            98       0.089     0.098   
 1     27.0     31.0              83            78       0.083     0.078   
 2     31.0     34.0              92            66       0.092     0.066   
 3     34.0     37.0             115            96       0.115     0.096   
 4     37.0     40.0             111           105       0.111     0.105   
 5     40.0     42.0              85            57       0.085     0.057   
 6     42.0     45.0             119            87       0.119     0.087   
 7     45.0     48.0             103           107       0.103     0.107   
 8     48.0     53.0             102           142       0.102     0.142   
 9     53.0      inf             101           164       0.101     0.164   
 
    CSI Value  
 0   0.000867  
 1   0.000311  
 2   0.008635  
 3   0.003431  
 4   0.000333  
 5   0.011189  
 6   0.010023  
 7   0.000152  
 8   0.0132

In [9]:
csi_summary

Unnamed: 0,Feature,CSI
0,age,0.078715
1,income,0.035361
2,gender,0.026087
3,marital_status,0.021375


In [10]:
import pandas as pd

def save_csi_report_html(summary_df, detailed_dfs, file_path="csi_report.html"):
    """
    Save the summary and detailed CSI DataFrames into a single HTML file.

    Parameters:
    - summary_df (DataFrame): DataFrame containing summary CSI values.
    - detailed_dfs (dict): Dictionary of detailed CSI DataFrames.
    - file_path (str): Output HTML file path.
    """
    with open(file_path, "w") as f:
        # Start of HTML
        f.write("<html><head><title>CSI Report</title>")
        f.write("""
        <style>
            body { font-family: Arial, sans-serif; padding: 20px; }
            h2 { color: #2a6592; }
            table { border-collapse: collapse; width: 100%; margin-bottom: 40px; }
            th, td { border: 1px solid #ccc; padding: 8px; text-align: right; }
            th { background-color: #f2f2f2; }
            tr:nth-child(even) { background-color: #f9f9f9; }
        </style>
        """)
        f.write("</head><body>")

        # Summary Table
        f.write("<h2>CSI Summary</h2>")
        f.write(summary_df.to_html(index=False, float_format="%.4f", border=0))

        # Detailed Tables per Feature
        for feature, df in detailed_dfs.items():
            f.write(f"<h2>CSI Detail for Feature: {feature}</h2>")
            f.write(df.to_html(index=False, float_format="%.4f", border=0))

        # End of HTML
        f.write("</body></html>")

    print(f"CSI report saved to: {file_path}")


In [12]:
summary, detailed = calculate_csi_for_dataframe(df_old, df_new)
save_csi_report_html(summary, detailed, file_path="csi_report.html")


CSI report saved to: csi_report.html


In [13]:
def save_csi_report_html(summary_df, detailed_dfs, file_path="csi_report.html", csi_threshold=0.25):
    """
    Save the summary and detailed CSI DataFrames into a single HTML file with Pass/Fail test.

    Parameters:
    - summary_df (DataFrame): Summary CSI values per feature.
    - detailed_dfs (dict): Dictionary of detailed CSI DataFrames.
    - file_path (str): Output HTML file path.
    - threshold (float): CSI threshold for test pass/fail.
    """
    # Add Pass/Fail column
    summary_df = summary_df.copy()
    summary_df["Test Result"] = summary_df["CSI"].apply(lambda x: "Pass" if x <= csi_threshold else "Fail")

    with open(file_path, "w") as f:
        f.write("<html><head><title>CSI Report</title>")
        f.write("""
        <style>
            body { font-family: Arial, sans-serif; padding: 20px; }
            h2 { color: #2a6592; }
            table { border-collapse: collapse; width: 100%; margin-bottom: 40px; }
            th, td { border: 1px solid #ccc; padding: 8px; text-align: center; }
            th { background-color: #f2f2f2; }
            tr:nth-child(even) { background-color: #f9f9f9; }
            .pass { background-color: #d4edda; color: #155724; font-weight: bold; }
            .fail { background-color: #f8d7da; color: #721c24; font-weight: bold; }
        </style>
        """)
        f.write("</head><body>")

        # Summary Table with conditional formatting
        f.write("<h2>CSI Summary</h2>")
        f.write("<table>")
        f.write("<tr><th>Feature</th><th>CSI</th><th>Test Result</th></tr>")
        for _, row in summary_df.iterrows():
            css_class = "pass" if row["Test Result"] == "Pass" else "fail"
            f.write(
                f"<tr>"
                f"<td>{row['Feature']}</td>"
                f"<td>{row['CSI']:.4f}</td>"
                f"<td class='{css_class}'>{row['Test Result']}</td>"
                f"</tr>"
            )
        f.write("</table>")

        # Detailed CSI per feature
        for feature, df in detailed_dfs.items():
            f.write(f"<h2>CSI Detail for Feature: {feature}</h2>")
            f.write(df.to_html(index=False, float_format="%.4f", border=0))

        f.write("</body></html>")

    print(f"CSI report with results saved to: {file_path}")


In [15]:
summary, detailed = calculate_csi_for_dataframe(df_old, df_new)
save_csi_report_html(summary, detailed, file_path="csi_report.html", csi_threshold = 0.05)


CSI report with test results saved to: csi_report.html
