In [25]:
import sys
sys.path.append('../')

import numpy as np
import pandas as pd
import copy
from tqdm import tqdm
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from mlp_to_qbaf_converter.mlp_to_qbaf import MLPToQBAF
from sparx.sparx import LocalSpArX
import Uncertainpy.src.uncertainpy.gradual as grad
from sklearn.model_selection import GridSearchCV
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report
from mlp_to_qbaf_converter.counterfactual_contestability_explanation import CounterfactualContestabilityExplanation
from mlp_to_qbaf_converter.utils import forward_pass_dataset, logistic
from scipy.optimize import minimize, differential_evolution

In [26]:
seed = 2025
data = pd.read_csv("../../data/diabetes.csv")
X = data.drop("Outcome", axis=1)
y = data["Outcome"]

input_feature_names = list(X.columns)
output_names = ["Diabetes?"]

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.3,
    random_state=seed,
)

X_train, X_test, y_train, y_test = (
    X_train.to_numpy(),
    X_test.to_numpy(),
    y_train.to_numpy(),
    y_test.to_numpy(),
)

smote = SMOTE(random_state=seed, sampling_strategy="minority")
X_train, y_train = smote.fit_resample(X_train, y_train)

# Feature Scaling (Use StandardScaler instead of MinMaxScaler)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [27]:
size = (5,)
min_accuracy = 0.7

alphas = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
solvers = ["adam", "sgd", "lbfgs"]

best_mlp = None
best_score = 0
params = {"alpha": alphas, "solver": solvers}
grid_search = GridSearchCV(
    MLPClassifier(
        hidden_layer_sizes=size,
        activation="logistic",
        random_state=seed,
        max_iter=5000,
        early_stopping=True,
    ),
    params,
    cv=3,
    n_jobs=-1,
    verbose=2,
)

grid_search.fit(X_train, y_train)
best_mlp = grid_search.best_estimator_
best_score = grid_search.best_score_

if best_score < min_accuracy:
    msg = "Model accuracy is too low"
    raise ValueError(msg)

mlp = best_mlp

Fitting 3 folds for each of 27 candidates, totalling 81 fits


[CV] END ..............................alpha=0.1, solver=sgd; total time=   0.0s
[CV] END ..............................alpha=0.1, solver=sgd; total time=   0.0s
[CV] END .............................alpha=0.1, solver=adam; total time=   0.0s
[CV] END .............................alpha=0.1, solver=adam; total time=   0.0s
[CV] END ..............................alpha=0.1, solver=sgd; total time=   0.1s
[CV] END .............................alpha=0.1, solver=adam; total time=   0.1s
[CV] END .............................alpha=0.2, solver=adam; total time=   0.0s
[CV] END .............................alpha=0.2, solver=adam; total time=   0.0s
[CV] END .............................alpha=0.2, solver=adam; total time=   0.0s
[CV] END ..............................alpha=0.2, solver=sgd; total time=   0.0s
[CV] END ..............................alpha=0.2, solver=sgd; total time=   0.0s
[CV] END ..............................alpha=0.2, solver=sgd; total time=   0.0s
[CV] END ...................

In [28]:
print(classification_report(y_test, mlp.predict(X_test)))

              precision    recall  f1-score   support

           0       0.82      0.74      0.78       141
           1       0.64      0.74      0.69        90

    accuracy                           0.74       231
   macro avg       0.73      0.74      0.73       231
weighted avg       0.75      0.74      0.74       231



\begin{array}{l}
\operatorname{Agg}_{x}^{e}\left(C_{1}, C_{2}\right)=
\quad \sum_{x^{\prime} \in \Delta^{\prime}} \pi_{x^{\prime}, x} \sum_{v_{l, i} \in C_{1}} \frac{1}{\left|C_{2}\right| \cdot \mathcal{O}_{x^{\prime}}^{\mu}\left(v_{C_{1}}\right)} \sum_{v_{l+1, j} \in C_{2}} W_{i, j}^{l} \mathcal{O}_{x^{\prime}}^{\mathcal{M}}\left(v_{l, i}\right)
\end{array}


In [29]:
def merge_weights(partial_weights, n_clusters, layer, cluster_labels):

    merged_weights = [np.mean(partial_weights.T[cluster_labels[layer] == label], axis=0) for label in range(n_clusters)]

    return np.asarray(merged_weights).T

def update_partial_weights(W, layer, sp, merged_weights, merged_biases, n_clusters, activations_dataset):
    new_partial_weights = []
    partial_activations = forward_pass_dataset(
        sp.local_dataset[:, :-1],
        merged_weights, 
        merged_biases,
        logistic
    )[layer + 1]

    new_example_weights = sp.example_weights / np.sum(sp.example_weights)

    for label in range(n_clusters):
        h_star = partial_activations[:, label]
        all_hidden_activations = activations_dataset[layer + 1][
            :,
            sp.cluster_labels[layer] == label,
        ]

        h_star = np.array([1 if hs == 0 else hs for hs in list(h_star)])

        normalised_activations = np.sum(
            np.multiply(all_hidden_activations.T, new_example_weights / h_star).T,
            axis=0,
        )

        new_partial_weights.append(
            np.dot(
                normalised_activations,
                W[sp.cluster_labels[layer] == label],
            ),
        )

    return np.asarray(new_partial_weights)

def merge_layer(W, shape, layer, n_clusters, sp: LocalSpArX, merged_weights, merged_biases, activations_dataset, num_hidden_layers, sparse_weights):

    W = W.reshape(shape)
    
    if layer == 0:
        return np.linalg.norm(merge_weights(W, n_clusters, layer, sp.cluster_labels) - sparse_weights[layer])
    
    new_partial_weights = update_partial_weights(
        W, 
        layer - 1,
        sp,
        merged_weights,
        merged_biases,
        n_clusters,
        activations_dataset
    )

    if layer == num_hidden_layers:
        return np.linalg.norm(new_partial_weights - sparse_weights[layer])

    return np.linalg.norm(merge_weights(new_partial_weights, n_clusters, layer, sp.cluster_labels) - sparse_weights[layer])
    

def compute_new_weights(sp: LocalSpArX, original_weights, original_biases, sparse_weights, sparse_biases):

    computed_weights = []

    num_hidden_layers = len(sp.sparsified_weights) - 1

    activations_dataset = []
        

    for i in range(num_hidden_layers + 1):
        print(f"{i}/{num_hidden_layers}")
        n_clusters = max(sp.cluster_labels[i]) + 1
        W_init = original_weights[i]
        shape = W_init.shape
        W_init = W_init.flatten()
        bounds = [(-1, 1) for _ in W_init]
        sol = minimize(merge_layer, W_init, args=(shape, i, n_clusters, sp, sparse_weights[:i], sparse_biases[:i], activations_dataset, num_hidden_layers, sparse_weights), bounds=bounds)
        computed_weights.append(sol.x.reshape(shape))
        activations_dataset = forward_pass_dataset(
            sp.local_dataset[:, :-1],
            computed_weights,
            original_biases[:i+1],
            logistic,
        )
    
    return computed_weights

def create_new_qbaf(  # noqa: PLR0913
    original_qbaf: grad.BAG,
    original_mlp: tuple[np.ndarray, np.ndarray],
    sparse_qbaf: grad.BAG,
    sp: LocalSpArX,
    counterfactual_dict: dict[tuple[str, str], float],
    input_feature_names: list[str],
    output_names: list[str],
    example: np.ndarray,
) -> grad.BAG:
    """Calculate the change in weights for the original QBAF.

    Based on the difference in weights between the sparse and its counterfactual.

    :param original_qbaf: The original QBAF.
    :param original_mlp: The original MLP weights and biases.
    :param sparse_qbaf: The sparse QBAF.
    :param sp: The LocalSpArX object.
    :param counterfactual_dict: The weights for the sparse QBAF counterfactual.
    :param input_feature_names: The names of the input features.
    :param output_names: The names of the output features.
    :param example: The example to explain.

    :return: A dictionary containing the change in weights for the original QBAF.

    """
    original_weights, original_biases = original_mlp
    sparse_weights, sparse_biases = sp.get_sparsified_mlp()

    for relation in counterfactual_dict:
        inital = relation[0]
        final = relation[1]

        if sparse_qbaf.arguments[inital] in sparse_qbaf.arguments[final].attackers:
            attack = True
        else:
            attack = False

        if inital in input_feature_names:
            inital_layer = 0
            inital_neuron = input_feature_names.index(inital)
        else:
            split_name = inital.split(" ")
            inital_layer = int(split_name[1])
            inital_neuron = int(split_name[3]) - 1
        if final in output_names:
            final_neuron = output_names.index(final)
        else:
            split_name = final.split(" ")
            final_neuron = int(split_name[3]) - 1

        if attack:
            sparse_weights[inital_layer][inital_neuron][final_neuron] = -counterfactual_dict[relation]
        else:
            sparse_weights[inital_layer][inital_neuron][final_neuron] = counterfactual_dict[relation]

    new_weights = compute_new_weights(sp, original_weights, original_biases, sparse_weights, sparse_biases)
    

    neurons_per_layer = [len(input_feature_names)] + [len(bias) for bias in original_biases]
    new_qbaf = MLPToQBAF(
        neurons_per_layer,
        new_weights,
        original_biases,
        "logistic",
        input_feature_names,
        output_names,
        example
    ).get_qbaf()

    counterfactual_qbaf = MLPToQBAF(
        sp.get_sparsified_shape(),
        sparse_weights,
        sparse_biases,
        "logistic",
        input_feature_names,
        output_names,
        example
    ).get_qbaf()

    return new_qbaf, counterfactual_qbaf

In [30]:
def check_validity(
    new_qbaf: grad.BAG,
    desired_strength: float,
    delta: float,
    topic_arg_name: str,
) -> tuple[bool, float]:
    """Check if the new QBAF is valid based on the desired strength."""
    # Get the strength of the topic argument
    topic_arg = new_qbaf.arguments[topic_arg_name]
    topic_strength = topic_arg.strength

    # Check if the strength is greater than or equal to the desired strength
    dist = abs(desired_strength - topic_strength)
    return (dist < delta, dist)

# TODO: FIX CE class to given assign functions

In [31]:
sparsification = 10

X_test = np.clip(X_test, 0, 1)

neurons_per_layer = [
    mlp.n_features_in_,
    *list(mlp.hidden_layer_sizes),
    mlp.n_outputs_,
]
train_set = np.column_stack((X_train, y_train))

example_num = 50

example = X_test[example_num]
example_row = np.append(X_test[example_num], y_test[example_num])

original_qbaf = MLPToQBAF(
    neurons_per_layer,
    mlp.coefs_,
    mlp.intercepts_,
    "logistic",
    input_feature_names,
    output_names,
    example,
).get_qbaf()

sp = LocalSpArX(
    mlp.coefs_,
    mlp.intercepts_,
    "logistic",
    sparsification,
    example_row,
    train_set,
    np.sqrt(X_test.shape[1]) * 0.75,
)

sp_weights, sp_biases = sp.get_sparsified_mlp()

sparse_qbaf = MLPToQBAF(
    sp.get_sparsified_shape(),
    sp_weights,
    sp_biases,
    "logistic",
    input_feature_names,
    output_names,
    example,
).get_qbaf()

cf_obj = CounterfactualContestabilityExplanation(
    sparse_qbaf,
    grad.SumAggregation(),
    grad.MLPBasedInfluence(),
    output_names[0],
    seed,
)

sparse_counterfactual = cf_obj.get_ce_explanation()

for relation, val in sparse_counterfactual.items():
    inital = sparse_qbaf.arguments[relation[0]]
    final = sparse_qbaf.arguments[relation[1]]

    if inital in final.attackers:
        final.attackers[inital] = val
    else:
        final.supporters[inital] = val
    
grad.computeStrengthValues(sparse_qbaf, grad.SumAggregation(), grad.MLPBasedInfluence())

print(f"Counterfactual sparse QBAF: {check_validity(sparse_qbaf, cf_obj.desired_strength, cf_obj.delta, output_names[0])}")

    
new_estimated_qbaf, counterfactual_qbaf = create_new_qbaf(
    original_qbaf,
    (mlp.coefs_, mlp.intercepts_),
    sparse_qbaf,
    sp,
    sparse_counterfactual,
    input_feature_names,
    output_names,
    example,
)

print(f"Function counterfactual sparse QBAF: {check_validity(counterfactual_qbaf, cf_obj.desired_strength, cf_obj.delta, output_names[0])}")

is_valid, dist = check_validity(
    new_estimated_qbaf,
    cf_obj.desired_strength,
    cf_obj.delta,
    output_names[0],
)

print(f"Validity: {is_valid}")
print(f"Distance: {dist}")
print(f"Desired strength: {cf_obj.desired_strength}")
print(f"Original Strength: {original_qbaf.arguments[output_names[0]].strength}")
print(f"New Strength: {new_estimated_qbaf.arguments[output_names[0]].strength}")

Counterfactual sparse QBAF: (True, 0.009620286010456891)
0/1
1/1
Function counterfactual sparse QBAF: (True, 0.009620286010456891)
Validity: False
Distance: 0.10772967268538941
Desired strength: 0.27409607462665897
Original Strength: 0.0852817418684515
New Strength: 0.16636640194126956


In [32]:
# from mlp_to_qbaf_converter.utils import forward_pass

# def sparsify_and_compare(
#         weights_guess,
#         sizes,
#         original_biases,
#         sparse_activations_example,
#         example,
#         sparse_weights,
#         sp,
#         train_set,
#     ):

#     weights_guess_reshaped = []

#     for size in sizes:
#         weights_guess_reshaped.append(np.reshape(weights_guess[:np.prod(size)], size))
#         weights_guess = weights_guess[np.prod(size):]
    
#     sparse_approx = LocalSpArX(
#         weights_guess_reshaped,
#         original_biases,
#         "logistic",
#         sp.shrinkage_percentage,
#         sp.example_row,
#         train_set,
#         sp.kernel_width,
#     )

#     approx_sparse_weights, approx_sparse_biases = sparse_approx.get_sparsified_mlp()
#     approx_sparse_activations_example = forward_pass(
#         example,
#         approx_sparse_weights,
#         approx_sparse_biases,
#         logistic,
#     )
    
#     return np.linalg.norm(sparse_activations_example[-1] - approx_sparse_activations_example[-1]) 
        
# def create_new_qbaf(
#         original_mlp,
#         sparse_qbaf,
#         sp,
#         counterfactual_dict,
#         input_feature_names,
#         output_names,
#         example,
#         train_set,
#         ):
    
#     original_weights, original_biases = original_mlp

#     sparse_weights, sparse_biases = sp.get_sparsified_mlp()
    
#     # Apply the counterfactual dict to get new sparse weights
#     for relation in counterfactual_dict:
#         inital = relation[0]
#         final = relation[1]

#         if sparse_qbaf.arguments[inital] in sparse_qbaf.arguments[final].attackers:
#             attack = True
#         else:
#             attack = False

#         if inital in input_feature_names:
#             inital_layer = 0
#             inital_neuron = input_feature_names.index(inital)
#         else:
#             split_name = inital.split(" ")
#             inital_layer = int(split_name[1])
#             inital_neuron = int(split_name[3]) - 1
#         if final in output_names:
#             final_neuron = output_names.index(final)
#         else:
#             split_name = final.split(" ")
#             final_neuron = int(split_name[3]) - 1

#         if attack:
#             sparse_weights[inital_layer][inital_neuron][final_neuron] = -counterfactual_dict[relation]
#         else:
#             sparse_weights[inital_layer][inital_neuron][final_neuron] = counterfactual_dict[relation]
    
#     sparse_activations_example = forward_pass(
#         example,
#         sparse_weights,
#         sparse_biases,
#         logistic,        
#     )

#     weights_list = []
#     sizes = []
    
#     for i in range(len(original_weights)):
#         sizes.append(original_weights[i].shape)
#         weights_list += list(original_weights[i].flatten())
    
#     sol = differential_evolution(
#         sparsify_and_compare,
#         args=(sizes, original_biases, sparse_activations_example, example, sparse_weights, sp, train_set),
#         bounds=[(-1, 1) for _ in range(len(weights_list))],
#         strategy="best1bin",
#         maxiter=1000,
#         disp=True,
#     )

#     new_weights = sol.x

#     new_weights_reshaped = []

#     for size in sizes:
#         new_weights_reshaped.append(np.reshape(new_weights[:np.prod(size)], size))
#         new_weights = new_weights[np.prod(size):]

#     neurons_per_layer = [len(input_feature_names)] + [len(bias) for bias in original_biases]
    
#     estimated_qbaf = MLPToQBAF(
#         neurons_per_layer,
#         new_weights_reshaped,
#         original_biases,
#         "logistic",
#         input_feature_names,
#         output_names,
#         example,
#     ).get_qbaf()

#     counterfactual_qbaf = MLPToQBAF(
#         sp.get_sparsified_shape(),
#         sparse_weights,
#         sparse_biases,
#         "logistic",
#         input_feature_names,
#         output_names,
#         example
#     ).get_qbaf()
        
#     return estimated_qbaf, counterfactual_qbaf



In [50]:
seed = 2025
data = pd.read_csv("../../data/diabetes.csv")
X = data.drop("Outcome", axis=1)
y = data["Outcome"]

input_feature_names = list(X.columns)
output_names = ["Diabetes?"]

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.3,
    random_state=seed,
)

X_train, X_test, y_train, y_test = (
    X_train.to_numpy(),
    X_test.to_numpy(),
    y_train.to_numpy(),
    y_test.to_numpy(),
)

smote = SMOTE(random_state=seed, sampling_strategy="minority")
X_train, y_train = smote.fit_resample(X_train, y_train)

# Feature Scaling (Use StandardScaler instead of MinMaxScaler)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

size = (10,)
min_accuracy = 0.7

alphas = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
solvers = ["adam", "sgd", "lbfgs"]

best_mlp = None
best_score = 0
params = {"alpha": alphas, "solver": solvers}
grid_search = GridSearchCV(
    MLPClassifier(
        hidden_layer_sizes=size,
        activation="logistic",
        random_state=seed,
        max_iter=5000,
        early_stopping=True,
    ),
    params,
    cv=3,
    n_jobs=-1,
    verbose=2,
)

grid_search.fit(X_train, y_train)
best_mlp = grid_search.best_estimator_
best_score = grid_search.best_score_

if best_score < min_accuracy:
    msg = "Model accuracy is too low"
    raise ValueError(msg)

mlp = best_mlp


sparsification = 10

X_test = np.clip(X_test, 0, 1)

neurons_per_layer = [
    mlp.n_features_in_,
    *list(mlp.hidden_layer_sizes),
    mlp.n_outputs_,
]
train_set = np.column_stack((X_train, y_train))

example_num = 50

example = X_test[example_num]
example_row = np.append(X_test[example_num], y_test[example_num])

original_qbaf = MLPToQBAF(
    neurons_per_layer,
    mlp.coefs_,
    mlp.intercepts_,
    "logistic",
    input_feature_names,
    output_names,
    example,
).get_qbaf()

sp = LocalSpArX(
    mlp.coefs_,
    mlp.intercepts_,
    "logistic",
    sparsification,
    example_row,
    train_set,
    np.sqrt(X_test.shape[1]) * 0.75,
)

sp_weights, sp_biases = sp.get_sparsified_mlp()

sparse_qbaf = MLPToQBAF(
    sp.get_sparsified_shape(),
    sp_weights,
    sp_biases,
    "logistic",
    input_feature_names,
    output_names,
    example,
).get_qbaf()



Fitting 3 folds for each of 27 candidates, totalling 81 fits
[CV] END .............................alpha=0.1, solver=adam; total time=   0.1s
[CV] END ..............................alpha=0.1, solver=sgd; total time=   0.1s
[CV] END .............................alpha=0.1, solver=adam; total time=   0.1s
[CV] END ..............................alpha=0.1, solver=sgd; total time=   0.1s
[CV] END .............................alpha=0.1, solver=adam; total time=   0.1s
[CV] END ..............................alpha=0.1, solver=sgd; total time=   0.1s
[CV] END .............................alpha=0.2, solver=adam; total time=   0.0s
[CV] END .............................alpha=0.2, solver=adam; total time=   0.0s
[CV] END ..............................alpha=0.2, solver=sgd; total time=   0.0s
[CV] END .............................alpha=0.2, solver=adam; total time=   0.1s
[CV] END ..............................alpha=0.2, solver=sgd; total time=   0.0s
[CV] END ..............................alpha=0.2

In [51]:

cf_obj_orig = CounterfactualContestabilityExplanation(
    original_qbaf,
    grad.SumAggregation(),
    grad.MLPBasedInfluence(),
    output_names[0],
    seed,
)

orig_counterfactual = cf_obj_orig.get_ce_explanation()

print(len(orig_counterfactual))

np.mean(
    [abs(val) for val in orig_counterfactual.values()]
)  # noqa: T001

90


np.float64(0.4936048257898619)

In [52]:
cf_obj_sparse = CounterfactualContestabilityExplanation(
    sparse_qbaf,
    grad.SumAggregation(),
    grad.MLPBasedInfluence(),
    output_names[0],
    seed,
)

sparse_counterfactual = cf_obj_sparse.get_ce_explanation()

print(len(sparse_counterfactual))

np.mean(
    [abs(val) for val in sparse_counterfactual.values()]
)  # noqa: T001

81


np.float64(0.49876833311891816)