In [11]:
import pandas as pd
import numpy as np
import math
from scipy.stats import norm

class BinningProcessor:
    def __init__(self, df):
        self.df = df

    def binning(self, feature_col, label_col, p_value_threshold, min_bin_size, min_event_rate, sort_order):
        # Sort DataFrame based on the feature column
        sorted_df = self.df[[feature_col, label_col]].sort_values(by=[feature_col], ascending=sort_order)
        
        # Extract feature and label values as lists
        features = sorted_df[feature_col].tolist()
        labels = sorted_df[label_col].tolist()
        
        # Initialize stack to store bins
        stack = []

        # Iterate through each data point
        for i in range(len(features)):
            # Initialize a bin
            current_bin = [i, i, 0, labels[i]]
            
            # Determine whether to add the current bin to the stack
            if len(stack) == 0 or stack[-1][-1] > labels[i]:
                stack.append(current_bin)
            else:
                # Combine bins if necessary
                previous_bin = stack.pop()
                stack.append(self.combine_bins(previous_bin, current_bin))
                
                # Merge adjacent bins if needed
                j = len(stack) - 2
                while True:
                    if len(stack) == 1 or j == len(stack) - 1:
                        break
                    elif stack[j][-1] > stack[j + 1][-1]:
                        j += 1
                    elif stack[j][-1] <= stack[j + 1][-1]:
                        temp_1 = stack.pop()
                        temp_2 = stack.pop()
                        stack.append(self.combine_bins(temp_1, temp_2))
                        j -= 1

        # Iteratively merge bins based on p-value until threshold is met
        while True:
            p_value_table = []
            for i in range(1, len(stack)):
                p_value = self.calculate_p_value(stack[i - 1], stack[i], labels)
                if (stack[i - 1][1] - stack[i - 1][0] + 1 < min_bin_size) or (
                        sum(labels[stack[i - 1][0]:stack[i - 1][1] + 1]) < min_event_rate) or (
                        stack[i][1] - stack[i][0] + 1 < min_bin_size) or (
                        sum(labels[stack[i][0]:stack[i][1] + 1]) < min_event_rate):
                    p_value = p_value + 1
                p_value_table.append([p_value, stack[i]])
            if len(p_value_table) > 0:
                max_val = (p_value_table[0][0], 0)
                for j in range(1, len(p_value_table)):
                    if p_value_table[j][0] > max_val[0]:
                        max_val = (p_value_table[j][0], j)
                if max_val[0] > p_value_threshold:
                    bin_1 = stack.pop(max_val[1])
                    bin_2 = stack.pop(max_val[1])
                    stack.insert(max_val[1], self.combine_bins(bin_1, bin_2))
                else:
                    break
            else:
                break

        # Generate bins for final output
        bins = [features[i[1]] for i in stack]
        
        # Handle sorting order
        if sort_order:
            bins = [features[i[1]] for i in stack]
            bins.insert(0, -np.inf)
            bins[-1] = np.inf
        else:
            bins = [features[i[0]] for i in stack]
            bins[0] = -np.inf
            bins.insert(-1, np.inf)
            bins.sort()

        # Calculate Weight of Evidence (WoE) and Information Value (IV)
        return self.calculate_woe_iv(sorted_df, feature_col, bins, label_col, sort_order), bins

    def combine_bins(self, bin_1, bin_2):
        # Combine two bins
        left_2, right_2 = bin_2[0], bin_2[1]
        left_1, right_1 = bin_1[0], bin_1[1]
        size_1, size_2 = right_1 - left_1 + 1, right_2 - left_2 + 1
        std_1, std_2 = bin_1[-2], bin_2[-2]
        mean_1, mean_2 = bin_1[-1], bin_2[-1]
        total_size = size_1 + size_2

        # Calculate combined mean and standard deviation
        if left_1 == right_1 and left_2 == right_2:
            combined_mean = (mean_1 + mean_2) / 2
        elif left_1 == right_1:
            combined_mean = (mean_1 + (size_2) * mean_2) / total_size
        elif left_2 == right_2:
            combined_mean = ((size_1) * mean_1 + mean_2) / total_size
        else:
            combined_mean = ((size_1) * mean_1 + (size_2) * mean_2) / total_size

        if total_size == 2:
            combined_std = np.std([mean_1, mean_2], ddof=1)
        else:
            combined_std = np.sqrt((size_1 * (std_1 ** 2) + size_2 * (std_2 ** 2)) / total_size)

        return [min(left_1, left_2), max(right_1, right_2), combined_std, combined_mean]

    def calculate_p_value(self, bin_1, bin_2, labels):
        # Calculate p-value between two bins
        mean_1, mean_2 = bin_1[-1], bin_2[-1]
        size_1 = bin_1[1] - bin_1[0] + 1
        size_2 = bin_2[1] - bin_2[0] + 1
        std_1 = bin_1[-2]
        std_2 = bin_2[-2]
        pooled_std = ((size_1) * std_1 ** 2 + (size_2) * std_2 ** 2) / (size_1 + size_2 - 2)
        if pooled_std > 0:
            z_value = (mean_1 - mean_2) / math.sqrt(pooled_std * (1 / size_1 + 1 / size_2))
            p_value = 1 - norm.cdf(z_value)
        else:
            p_value = 2
        return p_value

    def calculate_woe_iv(self, df, feature_col, bins, target_col, sort_order):
        # Calculate Weight of Evidence (WoE) and Information Value (IV)
        bins_x = pd.cut(df[feature_col], bins, right=sort_order)
        binned_df_x = pd.DataFrame(bins_x)
        binned_df_x[target_col] = df[target_col]
        group_df_x = binned_df_x.groupby(feature_col).agg(
            events=(target_col, 'sum'),
            total=(target_col, 'count')
        ).reset_index()
        group_df_x['non events'] = group_df_x['total'] - group_df_x['events']
        good_sum = group_df_x['non events'].sum()
        bad_sum = group_df_x['events'].sum()
        group_df_x['% of events'] = group_df_x['events'] / bad_sum
        group_df_x['% of non events'] = group_df_x['non events'] / good_sum
        group_df_x['WoE'] = np.log(group_df_x['% of non events'] / group_df_x['% of events'])
        WoEgood = np.where(~np.isinf(group_df_x['WoE']))[0]
        WoEmin = np.min(group_df_x['WoE'][WoEgood])
        WoEmax = np.max(group_df_x['WoE'][WoEgood])
        group_df_x['IV'] = group_df_x['WoE'] * (group_df_x['% of non events'] - group_df_x['% of events'])
        iv_val = group_df_x['IV'].sum()
        return group_df_x, iv_val


In [14]:
# Reading a csv file
df = pd.read_csv('C:/Users/adity/OneDrive/Documents/Monotonic Binning/train.csv')

# Instantiate the BinningProcessor with your DataFrame
processor = BinningProcessor(df)

# Call the binning method
feature_col = 'V2'  # Column to perform binning on
label_col = 'Class'  # Label column
p_value_threshold = 0.05  # Threshold for p-value
min_bin_size = 10000  # Minimum bin size
min_event_rate = 1  # Individual variable rate
sort_order = False  # Sign of sorting (True for ascending, False for descending)

result = processor.binning(feature_col, label_col, p_value_threshold, min_bin_size, min_event_rate, sort_order)

In [13]:
result

((                V2  events  total  non events  % of events  % of non events  \
  0   [-inf, -0.313)     104  75115       75011     0.221748         0.343049   
  1  [-0.313, 0.357)      94  59444       59350     0.200426         0.271426   
  2   [0.357, 1.047)      84  50245       50161     0.179104         0.229402   
  3     [1.047, inf)     187  34325       34138     0.398721         0.156124   
  
          WoE        IV  
  0  0.436329  0.052927  
  1  0.303242  0.021530  
  2  0.247506  0.012449  
  3 -0.937613  0.227462  ,
  0.3143677131983261),
 [-inf, -0.313137858145954, 0.356628999499892, 1.04663468333539, inf])