# Counterfactuals benchmark on tabular datasets

In [1]:
BASE_PATH = "./counterfactuals"
#set working directory Reconstruction

## Import and preprocessing

In [2]:
import os
import json
import time
import numpy as np
import pandas as pd
import pickle
from pprint import pprint
from datetime import datetime

from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler

# from tensorflow import randoms
from tensorflow.keras.utils import to_categorical # type: ignore
from tensorflow.keras.models import Sequential # type: ignore
from tensorflow.keras.layers import Dense, Add, Input, ActivityRegularization, Concatenate, Multiply # type: ignore
from tensorflow.keras import optimizers, Model, regularizers # type: ignore
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint # type: ignore

In [3]:
def ensure_directory(path):
    """Ensure the directory exists, if not, create it."""
    if not os.path.exists(path):
        os.makedirs(path)

ensure_directory(BASE_PATH)


date = datetime.now().strftime('%Y-%m-%d')
EXPERIMENT_PATH = f"{BASE_PATH}/German_{date}"
ensure_directory(EXPERIMENT_PATH)
#https://archive.ics.uci.edu/dataset/144/statlog+german+credit+data\        

In [4]:

# Set up
INITIAL_CLASS = 0
DESIRED_CLASS = 1
N_CLASSES = 2
target_column = "Outcome"
data_source = "pima" # "pima" or "german"

np.set_printoptions(precision=2) 
np.random.seed(2020)

# =========================================================
# Step 1: Define the Preprocessing Function
# =========================================================

def preprocess_data_german(df, target_column="Outcome"):
    """
    Preprocess the German Credit dataset by encoding categorical variables and splitting the data into 
    train, test, and user simulation sets.
    
    Returns a dictionary with processed train, test, and user datasets.
    """
    
    # Assign meaningful column names
    df.columns = [
        'Status', 'Month', 'Credit_History', 'Purpose', 'Credit_Amount',
        'Savings', 'Employment', 'Installment_Rate', 'Personal_Status', 'Other_Debtors',
        'Residence_Duration', 'Property', 'Age', 'Other_Installment_Plans', 'Housing',
        'Existing_Credits', 'Job', 'Num_Liable_People', 'Telephone', 'Foreign_Worker',
        'Outcome'
    ]
    
    # Mapping categorical features to more meaningful values
    status_mapping = { 'A11': '< 0 DM', 'A12': '0 <= ... < 200 DM', 'A13': '>= 200 DM / salary assignments for at least 1 year', 'A14': 'no checking account' }
    credit_history_mapping = { 'A30': 'no credits taken/ all credits paid back duly', 'A31': 'all credits at this bank paid back duly', 'A32': 'existing credits paid back duly till now', 'A33': 'delay in paying off in the past', 'A34': 'critical account/other credits existing' }
    savings_mapping = { 'A61': '< 100 DM', 'A62': '100 <= ... < 500 DM', 'A63': '500 <= ... < 1000 DM', 'A64': '>= 1000 DM', 'A65': 'unknown/no savings account' }
    employment_mapping = { 'A71': 'unemployed', 'A72': '< 1 year', 'A73': '1 <= ... < 4 years', 'A74': '4 <= ... < 7 years', 'A75': '>= 7 years' }
    personal_status_mapping = { 'A91': 'male: divorced/separated', 'A92': 'female: divorced/separated/married', 'A93': 'male: single', 'A94': 'male: married/widowed', 'A95': 'female: single' }
    other_debtors_mapping = { 'A101': 'none', 'A102': 'co-applicant', 'A103': 'guarantor' }
    property_mapping = { 'A121': 'real estate', 'A122': 'building society savings agreement/life insurance', 'A123': 'car or other, not in attribute 6', 'A124': 'unknown/no property' }
    other_installment_plans_mapping = { 'A141': 'bank', 'A142': 'stores', 'A143': 'none' }
    housing_mapping = { 'A151': 'rent', 'A152': 'own', 'A153': 'for free' }
    telephone_mapping = { 'A191': 'none', 'A192': 'yes, registered under the customer\'s name' }
    foreign_worker_mapping = { 'A201': 'yes', 'A202': 'no' }

    # Apply mappings
    df['Status'] = df['Status'].map(status_mapping)
    df['Credit_History'] = df['Credit_History'].map(credit_history_mapping)
    df['Savings'] = df['Savings'].map(savings_mapping)
    df['Employment'] = df['Employment'].map(employment_mapping)
    df['Personal_Status'] = df['Personal_Status'].map(personal_status_mapping)
    df['Other_Debtors'] = df['Other_Debtors'].map(other_debtors_mapping)
    df['Property'] = df['Property'].map(property_mapping)
    df['Other_Installment_Plans'] = df['Other_Installment_Plans'].map(other_installment_plans_mapping)
    df['Housing'] = df['Housing'].map(housing_mapping)
    df['Telephone'] = df['Telephone'].map(telephone_mapping)
    df['Foreign_Worker'] = df['Foreign_Worker'].map(foreign_worker_mapping)

    # Encode ordinal columns
    ordinal_cols = ['Status', 'Credit_History', 'Savings', 'Employment']
    le = LabelEncoder()
    for col in ordinal_cols:
        df[col] = le.fit_transform(df[col])

    # One-hot encode nominal columns
    nominal_columns = ['Purpose', 'Personal_Status', 'Other_Debtors', 'Property', 
                       'Other_Installment_Plans', 'Housing', 'Job', 'Telephone', 'Foreign_Worker']
    df = pd.get_dummies(df, columns=nominal_columns, drop_first=True)

    # Process target variable
    Y = df[target_column].replace(1, 0).replace(2, 1)
    X = df.drop(columns=[target_column])

    # Get final feature set
    # list all features
    immutable_features = set(X.columns) - set(['Status', 'Credit_History'])
    

    mutable_features = set(X.columns) - set(immutable_features)
    mutable_features = list(mutable_features)

    features = list(mutable_features) + list(immutable_features)

    return  X, Y, features, immutable_features, mutable_features
    
# =========================================================


def preprocess_data_pima(df, target_column="Outcome"):
    """
    Preprocess the Pima Indians Diabetes dataset:
    - Handle missing/zero values for certain features
    - Standardize numeric features
    - Return train-ready feature matrix, target, and feature info
    """

    # # Replace zeros with NaN for features where zero is not physiologically meaningful
    # zero_na_columns = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']
    # df[zero_na_columns] = df[zero_na_columns].replace(0, np.nan)
    
    # # Optionally: impute missing values with median
    # df[zero_na_columns] = df[zero_na_columns].fillna(df[zero_na_columns].median())

    # Separate features and target
    X = df.drop(columns=[target_column])
    Y = df[target_column]

    # Normalize/scale the features
    scaler = StandardScaler()
    X_scaled = pd.DataFrame(scaler.fit_transform(X), columns=X.columns)

    immutable_features = {"Pregnancies", "DiabetesPedigreeFunction", "Age"}
    mutable_features = [col for col in X.columns if col not in immutable_features]

    features = mutable_features + list(immutable_features)


    return X_scaled, Y, features, immutable_features, mutable_features


# Split into train, test, and user sets
def split_data(X_features, Y_target, simul_size=0.05, test_size=0.21, random_state=2020):
    x_temp, x_user, y_temp, y_user = train_test_split(X_features, Y_target, test_size=simul_size, random_state=random_state)
    x_train, x_test, y_train, y_test = train_test_split(x_temp, y_temp, test_size=test_size, random_state=random_state)

    # Convert target variables to categorical
    y_train = to_categorical(y_train, num_classes=2)
    y_test = to_categorical(y_test, num_classes=2)
    y_user = to_categorical(y_user, num_classes=2)
    return x_train, x_test, x_user, y_train, y_test, y_user


def classify_and_filter_cases(classifier, cases, opposite_CLASS=DESIRED_CLASS):
    # Get the predicted probabilities
    predicted_probs = classifier.predict_proba(cases)
    
    # Convert probabilities to class predictions (0 or 1)
    predicted_classes = [np.argmax(p, axis=1) for p in predicted_probs][0]

    
    # Filter cases where the predicted class is the opposite class
    opposite_cases = cases[predicted_classes == opposite_CLASS]        
    
    return opposite_cases

# =========================================================
# Step 4: Load the Dataset
# =========================================================
if data_source == "pima":
    # Load the Pima Indians Diabetes dataset
    df = pd.read_csv('pima-indians-diabetes-database/diabetes.csv')
    X, Y, features, immutable_features, mutable_features = preprocess_data_pima(df)
elif data_source == "german":

    df = pd.read_csv('statlog_german_credit_data/german.data', delim_whitespace=True, skiprows=1, header=None)
    X, Y, features, immutable_features, mutable_features = preprocess_data_german(df)


# =========================================================
# Step 5: Preprocess the Data
# =========================================================



x_train, x_test, x_user, y_train, y_test, y_user = split_data(X, Y)




In [5]:
def compute_reconstruction_error(x, autoencoder):
    """Compute the reconstruction error for a given autoencoder and data points."""
    preds = autoencoder.predict(x)
    preds_flat = preds.reshape((preds.shape[0], -1))
    x_flat = x.reshape((x.shape[0], -1))
    return np.linalg.norm(x_flat - preds_flat, axis=1)

def format_metric(metric):
    """Return a formatted version of a metric, with the confidence interval."""
    mean_val = metric.mean()
    std_val = metric.std()
    confidence_interval = 1.96 * std_val / np.sqrt(len(metric))
    return f"{mean_val:.3f} ± {confidence_interval:.3f}"


def compute_metrics (samples, counterfactuals, latencies, classifier, autoencoder, batch_latency=None):
    """Summarize the relevant metrics of root to leave method in a dictionary."""           
    
    #  Ensure predict_proba returns a NumPy-compatible structure
    sample_preds = np.array(classifier.predict_proba(samples))[:, DESIRED_CLASS]
    counterfactual_preds = np.array(classifier.predict_proba(counterfactuals))[:, DESIRED_CLASS]

 
    # Compute the metrics
    reconstruction_error = compute_reconstruction_error(counterfactuals, autoencoder)
    delta = np.abs(samples - counterfactuals)
    l1_distances = delta.reshape(delta.shape[0], -1).sum(axis=1)
    prediction_gain = counterfactual_preds - sample_preds

    # Store results
    metrics = {
        "reconstruction_error": format_metric(reconstruction_error),
        "prediction_gain": format_metric(prediction_gain),
        "sparsity": format_metric(l1_distances),
        "latency": format_metric(latencies),
        "latency_batch": f"{(batch_latency or sum(latencies)):.3f}",
    }
    
    return metrics




def save_experiment(method_name, samples, counterfactuals, latencies,
                    classifier, autoencoder, batch_latency=None):
    """Create an experiment folder and save counterfactuals, latencies, and metrics."""
    method_path = f"{EXPERIMENT_PATH}/{method_name}"
    ensure_directory(method_path)

    # Save results
    np.save(f"{method_path}/counterfactuals.npy", counterfactuals)
    np.save(f"{method_path}/latencies.npy", latencies)

    # Compute and save metrics
    metrics = compute_metrics(samples, counterfactuals, latencies, classifier, autoencoder)
    with open(f"{method_path}/metrics.json", "w") as f:
        json.dump(metrics, f)
    pprint(metrics)
    

## Train classifier

In [6]:
# Set the seed for reproducibility
np.random.seed(2020)

# Convert data to float32
x_train = np.array(x_train, dtype=np.float32)
y_train = np.array(y_train, dtype=np.float32)
x_test = np.array(x_test, dtype=np.float32)
y_test = np.array(y_test, dtype=np.float32)



# Create the classifier
classifier = DecisionTreeClassifier(max_depth=5, random_state=2020)


# Train the model

training = classifier.fit(x_train, y_train)   

# calculate the accuracy
accuracy = classifier.score(x_test, y_test)
print("Accuracy: ", accuracy)

# Save the decision tree model

filename = f"{EXPERIMENT_PATH}/classifier.sav"
pickle.dump(classifier, open(filename, 'wb'))


Accuracy:  0.7467532467532467


## Estimate density with the reconstruction error of a (denoising) autoencoder


In [7]:
# Add noise with dynamic scaling based on the standard deviation of the input
def add_noise(x, noise_factor=1e-6):
    noise_factor = np.std(x) * noise_factor  # Scale noise by the data's standard deviation
    x_noisy = x + (noise_factor * np.random.normal(loc=0.0, scale=1.0, size=x.shape))
    return x_noisy


def create_autoencoder(in_shape=(x_train.shape[1],)):
    """Define and compile the autoencoder model with L2 regularization."""
    input_ = Input(shape=in_shape)

    x = Dense(32, activation="relu", kernel_regularizer=regularizers.l2(0.01))(input_)  # L2 regularization
    encoded = Dense(8)(x)
    x = Dense(32, activation="relu", kernel_regularizer=regularizers.l2(0.01))(encoded)  # L2 regularization
    decoded = Dense(in_shape[0], activation="tanh")(x)

    autoencoder = Model(input_, decoded)
    optimizer = optimizers.Nadam()
    autoencoder.compile(optimizer, loss="mse")
    return autoencoder

# Create and train the autoencoder
autoencoder = create_autoencoder(in_shape=(x_train.shape[1],))
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
model_checkpoint = ModelCheckpoint(f"{EXPERIMENT_PATH}/best_autoencoder.keras", save_best_only=True, monitor='val_loss', mode='min')

# Train the model with early stopping and model checkpoint callbacks
training = autoencoder.fit( 
    x_train, add_noise(x_train), epochs=100, batch_size=32, shuffle=True,
    validation_data=(add_noise(x_test), x_test), verbose=0,
    callbacks=[early_stopping, model_checkpoint]
)

# Print the final training and validation loss
print(f"Training loss: {training.history['loss'][-1]:.4f}")
print(f"Validation loss: {training.history['val_loss'][-1]:.4f}")

# Compute the reconstruction error for noise data
n_samples = 1000
r_samples = np.random.randn(n_samples, x_train.shape[1])
reconstruction_error_noise = compute_reconstruction_error(r_samples, autoencoder)

# Compute the reconstruction error for the test data
reconstruction_error = compute_reconstruction_error(x_test, autoencoder)

# Save and print the autoencoder metrics
autoencoder_metrics = {
    "reconstruction_error": format_metric(reconstruction_error),
    "reconstruction_error_noise": format_metric(reconstruction_error_noise),
}
json.dump(autoencoder_metrics, open(f"{EXPERIMENT_PATH}/autoencoder_metrics.json", "w"))
pprint(autoencoder_metrics)

# Save the model
autoencoder.save(f"{EXPERIMENT_PATH}/autoencoder.keras")

Training loss: 0.2470
Validation loss: 0.3038
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step 
{'reconstruction_error': '1.045 ± 0.164',
 'reconstruction_error_noise': '1.018 ± 0.033'}


## GAN-based counterfactual search

In [8]:
def generate_fake_samples(x, generator):
    """Use the input generator to generate samples."""
    return generator.predict(x)

def data_stream(x, y=None, batch_size=500):
    """Generate batches until exhaustion of the input data."""
    n_train = len(x)  

    if y is not None:
        assert n_train == len(y)
    n_complete_batches, leftover = divmod(n_train, batch_size)
    n_batches = n_complete_batches + bool(leftover)

    perm = np.random.permutation(n_train)
    for i in range(n_batches):
        batch_idx = perm[i * batch_size:(i + 1) * batch_size]
        if y is not None:
            output = (x[batch_idx], y[batch_idx])
        else:
            output = x[batch_idx]
        yield output


def infinite_data_stream(x, y=None, batch_size=500):
    """Infinite batch generator."""
    batches = data_stream(x, y, batch_size=batch_size)
    while True:
        try:
            yield next(batches)
        except StopIteration:
            batches = data_stream(x, y, batch_size=batch_size)
            yield next(batches)

def create_generator(in_shape, mutable_features, residuals=True):
    generator_input = Input(shape=in_shape, name='generator_input')
    mutable_mask_input = Input(shape=(len(mutable_features),), name='mutable_mask_input')
    
    # Apply the mutable mask to the generator input
    masked_input = Multiply()([generator_input[:, :len(mutable_features)], mutable_mask_input])
    
    generator = Dense(64, activation='relu')(masked_input)
    generator = Dense(32, activation='relu')(generator)
    generator = Dense(64, activation='relu')(generator)
    generator = Dense(len(mutable_features), activation='tanh')(generator)
    generator_output = ActivityRegularization(l1=0., l2=1e-6)(generator)

    if residuals:
        # Only add the mutable features part
        generator_output = Add(name="output")([generator_input[:, :len(mutable_features)], generator_output])

    # Concatenate the immutable features part back to the output
    immutable_features_part = generator_input[:, len(mutable_features):]
    generator_output = Concatenate()([generator_output, immutable_features_part])

    return Model(inputs=[generator_input, mutable_mask_input], outputs=generator_output)


def create_discriminator(in_shape):
    model = Sequential()
    model.add(Dense(64, activation='relu', input_shape=in_shape))
    model.add(Dense(1, activation='sigmoid'))
    
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    
    return model  # Ensure this is present!



def define_countergan(generator, discriminator,mutable_features):
    input_shape = (generator.input_shape[0][1],)  # Ensure input_shape is a tuple
    countergan_input = Input(shape=input_shape, name='countergan_input')
    mutable_mask_input = Input(shape=(len(mutable_features),), name='mutable_mask_input')

    x_generated = generator([countergan_input, mutable_mask_input])
    discriminator_output = discriminator(x_generated)

    countergan = Model(inputs=[countergan_input, mutable_mask_input], outputs=discriminator_output)

    optimizer = optimizers.RMSprop(learning_rate=2e-4)
    countergan.compile(optimizer, "binary_crossentropy")

    return countergan


In [9]:
def train_countergan(n_discriminator_steps, n_generator_steps, n_training_iterations, classifier, discriminator, generator, batches):

    def check_divergence(x_generated):
        return np.any(np.isnan(x_generated))

    def print_training_information(generator, classifier, x_test, iteration):
        X_gen = generator.predict([x_test, np.ones((x_test.shape[0], len(features)))])
        clf_pred_test = np.array(classifier.predict_proba(x_test))[:, DESIRED_CLASS]
        clf_pred = np.array(classifier.predict_proba(X_gen))[:, DESIRED_CLASS]
        delta_clf_pred = clf_pred - clf_pred_test
        print('='*88)
        print(f"Training iteration {iteration} at {datetime.now()}")
        reconstruction_error = np.mean(np.abs(autoencoder.predict(X_gen) - X_gen))
        print(f"Autoencoder reconstruction error (infinity to 0): {reconstruction_error:.3f}")
        print(f"Counterfactual prediction gain (0 to 1): {delta_clf_pred.mean():.3f}")
        print(f"Sparsity (L1, infinity to 0): {np.mean(np.abs(X_gen - x_test)):.3f}")

    countergan = define_countergan(generator, discriminator, mutable_features=features)

    for iteration in range(n_training_iterations):
        print(f"Training iteration {iteration} at {datetime.now()}")
        if iteration > 0:
            x_generated = generator.predict([x_fake_input, np.ones((x_fake_input.shape[0], len(features)))])
            if check_divergence(x_generated):
                print("Training diverged, stopping early.")
                break

        if (iteration % 1000 == 0) or (iteration == n_training_iterations - 1):
            print_training_information(generator, classifier, x_test, iteration)
        
        # Train the discriminator
        discriminator.trainable = True
        for _ in range(n_discriminator_steps):
            x_fake_input, _ = next(batches)
            x_fake = generator.predict([x_fake_input, np.ones((x_fake_input.shape[0], len(features)))])
            x_real = x_fake_input
            x_batch = np.concatenate([x_real, x_fake])
            y_batch = np.concatenate([np.ones(len(x_real)), np.zeros(len(x_fake))])
            p = np.random.permutation(len(y_batch))
            x_batch, y_batch = x_batch[p], y_batch[p]

            y_batch = y_batch.reshape(-1, 1)  # Ensure it has shape (512, 1)

            discriminator.compile(
                optimizer= optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
                loss='binary_crossentropy',
                metrics=['accuracy']
            )

            discriminator.train_on_batch(x_batch, y_batch)

        discriminator.trainable = False
        for _ in range(n_generator_steps):
            x_fake_input, _ = next(batches)
            y_fake = np.ones(len(x_fake_input))
            countergan.train_on_batch([x_fake_input, np.ones((x_fake_input.shape[0], len(features)))], y_fake)

    return countergan


## Counterfactual search 

In [10]:
def train_gan(method_name, residuals):
    samples = classify_and_filter_cases(classifier, x_test, opposite_CLASS=DESIRED_CLASS)
    samples = np.array(samples, dtype=np.float32)  # Ensure x_test is a NumPy array with a consistent data type

    discriminator = create_discriminator(in_shape=(x_train.shape[1],))
    discriminator.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
       
    generator = create_generator(in_shape=(x_train.shape[1],), mutable_features=features, residuals=residuals)
    
    batches = infinite_data_stream(x_train, y_train, batch_size=256)

    countergan = train_countergan(2, 4, 10, classifier, discriminator, generator, batches)
   
    
    # Measure batch latency
    t_initial = time.time()
    mutable_mask = np.ones((samples.shape[0], len(features)))  # Create a mutable mask
    print(mutable_mask)
    
    counterfactuals = generator.predict([samples, mutable_mask])
    batch_latency = 1000 * (time.time() - t_initial)

    # Measure latency per sample
    latencies = np.zeros(len(samples))
    # for i, x in enumerate(samples): ## casing error <shapes [1, 2] and [1, 37]>
    #     t_initial = time.time()
    #     _ = generator.predict([np.expand_dims(x, axis=0), np.ones(1, len(mutable_features))])
    #     latencies[i] = 1000 * (time.time() - t_initial)

    save_experiment(method_name, samples, counterfactuals, latencies, classifier, autoencoder, batch_latency)

    generator.save(f"{EXPERIMENT_PATH}/{method_name}/generator.keras")
    discriminator.save(f"{EXPERIMENT_PATH}/{method_name}/discriminator.keras")
    countergan.save(f"{EXPERIMENT_PATH}/{method_name}/countergan.keras")


In [11]:
train_gan(method_name="regular_gan", residuals=False)
train_gan(method_name="countergan", residuals=True)

Training iteration 0 at 2025-05-06 17:13:52.445336
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Training iteration 0 at 2025-05-06 17:13:52.571345
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step 
Autoencoder reconstruction error (infinity to 0): 0.028
Counterfactual prediction gain (0 to 1): 0.000
Sparsity (L1, infinity to 0): 0.783
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Training iteration 1 at 2025-05-06 17:13:54.012791
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Training iteration 2 at 2025-05-06 17:13:54.755457
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Training iteration 3 at 2025-05-06 17:13:55.487526
[1m2/2[

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step
Autoencoder reconstruction error (infinity to 0): 0.226
Counterfactual prediction gain (0 to 1): 0.000
Sparsity (L1, infinity to 0): 0.157
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step 
Training iteration 1 at 2025-05-06 17:14:02.820693
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Training iteration 2 at 2025-05-06 17:14:03.604311
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Training iteration 3 at 2025-05-06 17:14:04.371814
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m

## Root to leaf counterfactuals

In [12]:
def tree_branches(model):
  n_nodes = model.tree_.node_count
  children_left = model.tree_.children_left
  children_right = model.tree_.children_right
  value = model.tree_.value

  all_branches = list(retrieve_branches(n_nodes, children_left, children_right))

  # convert all branches to an array with 297 columns (nodes) and 149 rows (branches) with values 0 or 1
  branches = np.zeros((len(all_branches), n_nodes))
  for index, branch in enumerate(all_branches):
      branches[index, branch] = 1

  # create an array with leaf nodes and defin the class of each leaf node
  leaf_nodes = np.zeros((n_nodes, 2))
  for index, branch in enumerate(all_branches):
      leaf_nodes[branch[-1], 0] = branch[-1]
      if value[branch[-1]][0][0] > value[branch[-1]][0][1]:
          leaf_nodes[branch[-1], 1] = 11
      else:
          leaf_nodes[branch[-1], 1] = 12

  return [branches,leaf_nodes]

#Retrieve all decision tree branches
def retrieve_branches(number_nodes, children_left_list, children_right_list):

    # Calculate if a node is a leaf
    is_leaves_list = [(False if cl != cr else True) for cl, cr in zip(children_left_list, children_right_list)]

    # Store the branches paths
    paths = []

    for i in range(number_nodes):
        if is_leaves_list[i]:
            # Search leaf node in previous paths
            end_node = [path[-1] for path in paths]

            # If it is a leave node yield the path
            if i in end_node:
                output = paths.pop(np.argwhere(i == np.array(end_node))[0][0])
                yield output

        else:

            # Origin and end nodes
            origin, end_l, end_r = i, children_left_list[i], children_right_list[i]

            # Iterate over previous paths to add nodes
            for index, path in enumerate(paths):
                if origin == path[-1]:
                    paths[index] = path + [end_l]
                    paths.append(path + [end_r])

            # Initialize path in first iteration
            if i == 0:
                paths.append([i, children_left_list[i]])
                paths.append([i, children_right_list[i]])
    return paths


In [13]:
def get_branch(classifier, test_case):
    """
    Retrieves the branch (decision path) for a given test case in the classifier.
    """
    node_indicator = classifier.decision_path(test_case.reshape(1, -1))
    branch = node_indicator.indices  # Get node indices in the path
    return branch

def retrieve_branches_recursive(node, path, children_left, children_right):
    """
    Recursively retrieve all branches in the decision tree.

    Parameters:
        node (int): Current node index.
        path (list): Path of node indices from the root to the current node.
        children_left (array): Left child indices for all nodes.
        children_right (array): Right child indices for all nodes.

    Yields:
        list: A branch represented as a list of node indices.
    """
    # Check if the current node is a leaf node (i.e., both left and right children are equal)
    if children_left[node] == children_right[node]:  # Leaf node
        yield path + [node]  # Yield the path including this leaf node
    else:
        # Recurse into the left child if it exists
        if children_left[node] != -1:
            yield from retrieve_branches_recursive(children_left[node], path + [node], children_left, children_right)
        # Recurse into the right child if it exists
        if children_right[node] != -1:
            yield from retrieve_branches_recursive(children_right[node], path + [node], children_left, children_right)


def find_similar_branch(branches, test_branch, desired_class_branches):
    """
    Finds the most similar branch to the given test branch among desired class branches.
    """
    # Example similarity: number of overlapping nodes
    def similarity_score(branch1, branch2):
        return len(set(branch1) & set(branch2))  # Intersection of nodes

    # Compare test branch with each desired class branch
    most_similar_branch = max(desired_class_branches, key=lambda b: similarity_score(test_branch, b))
    return most_similar_branch

def generate_synthetic_case(new_branch, original_case, classifier):
    """
    Generates a synthetic test case based on the new branch and the original test case.
    """
    synthetic_case = original_case.copy()
    feature = classifier.tree_.feature
    threshold = classifier.tree_.threshold

    # Update synthetic case based on new branch conditions
    for node in new_branch:
        if feature[node] != -2:  # Ignore leaf nodes
            if original_case[feature[node]] <= threshold[node]:
                synthetic_case[feature[node]] = threshold[node] - 0.1
            else:
                synthetic_case[feature[node]] = threshold[node] + 0.1

    return synthetic_case


In [14]:
def root_to_laef(classifier, x_test, INITIAL_CLASS, DESIRED_CLASS):
    # Step 1: Shortlist test cases that are classified as the initial class
    test_cases = classify_and_filter_cases(classifier, x_test, INITIAL_CLASS)

    # Step 2: Retrieve branches for all desired class cases
    all_branches = list(retrieve_branches_recursive(0, [], classifier.tree_.children_left, classifier.tree_.children_right))
    desired_class_branches = [branch for branch in all_branches if np.argmax(classifier.tree_.value[branch[-1]][0]) == DESIRED_CLASS]

    # Step 3: Process each opposite class case
    synthetic_cases = []
    
    latencies = np.zeros(len(test_cases))    
    
    for i, test_case in enumerate(test_cases):
        t_initial = time.time()
        test_branch = get_branch(classifier, test_case)
        similar_branch = find_similar_branch(all_branches, test_branch, desired_class_branches)
        synthetic_case = generate_synthetic_case(similar_branch, test_case, classifier)
        synthetic_cases.append(synthetic_case)
        latencies[i] = 1000*(time.time() - t_initial)

    

    return test_cases,np.array(synthetic_cases),latencies


In [15]:
method_name = "root_to_leaf"
samples, counterfactuals, latencies = root_to_laef(classifier, x_test, INITIAL_CLASS, DESIRED_CLASS)

save_experiment(method_name, samples, counterfactuals, latencies, classifier, autoencoder)


[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
{'latency': '0.105 ± 0.089',
 'latency_batch': '5.971',
 'prediction_gain': '0.000 ± 0.000',
 'reconstruction_error': '2.614 ± 0.335',
 'sparsity': '4.517 ± 0.584'}


## Generate the benchmark table

In [16]:
METHODS = ["regular_gan","countergan", "root_to_leaf"]
# METHODS = ["root_to_leaf"]
METRIC_NAMES = [
    "reconstruction_error", "prediction_gain", "sparsity", "latency", "latency_batch"
]

metrics = dict()
results = dict()
for method in METHODS:
    method_metrics = json.load(open(f"{EXPERIMENT_PATH}/{method}/metrics.json", "r"))
    method_metrics = {k: v for k, v in method_metrics.items() if k in METRIC_NAMES}
    metrics[method] = method_metrics

    results[method] = np.load(f"{EXPERIMENT_PATH}/{method}/counterfactuals.npy")
    


metrics = pd.DataFrame(metrics)


metrics.index = [
    "↓ reconstruction_error (Realism)",
    "↑ Prediction gain",
    "↓ Sparsity (Actionability)",
    "↓ Latency (ms)",
    "↓ Batch latency (ms)",
]

metrics

# Compute the difference between the original test data and the results for each case instanse in the test data 
def compute_difference(x_test, results):
    differences = []
    for method, result in results.items():
        diff = np.abs(x_test - result)
        differences.append(diff)
    return differences

# differences = compute_difference(x_test, results)
# dif1 = pd.DataFrame(differences[0])
# dif2 = pd.DataFrame(differences[1])

# classifiy dif2 to the desired class
dif2_classified = classifier.predict(results["countergan"])
dif2_classified = to_categorical(dif2_classified, num_classes=2)
dif2_classified = pd.DataFrame(dif2_classified[:,1])

dif1_classified = classifier.predict(results["regular_gan"])
dif1_classified = to_categorical(dif1_classified, num_classes=2)
dif1_classified = pd.DataFrame(dif1_classified[:,1])



metrics

Unnamed: 0,regular_gan,countergan,root_to_leaf
↓ reconstruction_error (Realism),0.102 ± 0.004,0.948 ± 0.207,2.614 ± 0.335
↑ Prediction gain,0.000 ± 0.876,0.000 ± 0.000,0.000 ± 0.000
↓ Sparsity (Actionability),6.158 ± 0.469,1.300 ± 0.088,4.517 ± 0.584
↓ Latency (ms),0.000 ± 0.000,0.000 ± 0.000,0.105 ± 0.089
↓ Batch latency (ms),0.000,0.000,5.971
