In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from optbinning import OptimalBinning
import numpy as np
import pandas as pd

# Initialize SparkSession
spark = SparkSession.builder.appName("OptBinningExample").getOrCreate()
sc = SparkContext.getOrCreate()

# Generate the target variable
np.random.seed(42)
target = np.random.randint(0, 2, 100)

# Create correlated variables
data = pd.DataFrame({
    'var1': target + 0.2 * np.random.randn(100),
    'var2': target + 0.3 * np.random.randn(100),
    'var3': -target + 0.2 * np.random.randn(100),
    'var4': target + 0.1 * np.random.randn(100),
    'var5': target + 0.25 * np.random.randn(100),
    'var6': np.random.rand(100),
    'var7': np.random.rand(100),
    'var8': np.random.rand(100),
    'var9': np.random.rand(100),
    'var10': np.random.rand(100),
    'target': target
})

# Parallelize the DataFrame across worker nodes
rdd = sc.parallelize(data.to_dict(orient='records'))

# Broadcast the entire DataFrame (or the necessary portion of it)
broadcast_data = sc.broadcast(data.to_dict(orient='list'))

def binning_function(var):
    # Perform Optimal Binning for a single variable using the full data column
    optb = OptimalBinning(name=var, dtype="numerical", solver="cp")
    optb.fit(np.array(broadcast_data.value[var]), np.array(broadcast_data.value['target']))

    # Get Information Value
    binning_table = optb.binning_table
    binning_table.build()
    iv = binning_table.iv
    splits = binning_table.splits
    
    # Return the results as a dictionary
    return {
        'variable': var,
        'iv': iv,
        'splits': splits
    }
    return optb.splits.tolist()

# Map the binning function to each variable (excluding target)
binned_results = sc.parallelize(data.columns.difference(['target'])).map(binning_function).collect()

# Print the results
for result in binned_results:
    print(f"Variable: {result['variable']}, IV: {result['iv']}, Splits: {result['splits']}")