In [1]:
import docplex
import pandas as pd
import tensorflow as tf
import numpy as np
import utility
import copy
import mlp_explainer
import mymetrics
import time
import os
from sklearn.preprocessing import MinMaxScaler
from sklearn import datasets
from sklearn.model_selection import train_test_split
from milp import codify_network
from teste import get_minimal_explanation
from sklearn.metrics import classification_report
import pickle

In [2]:
def load_glass():
    df = pd.read_csv('./datasets/glass.csv')
    # Avoid to_categorical error due to missing class '4'
    unique_labels = sorted(df['target'].unique())  # Ensure sorted order
    label_map = {original: new for new, original in enumerate(unique_labels)}
    
    df['target'] = df['target'].map(label_map)
    return df
#Glass Dataset
df = load_glass()

scaler = MinMaxScaler()
scaler.fit(df.values[:, :-1])
scaled_df = scaler.transform(df.values[:, :-1])
lower_bound = scaled_df.min()
upper_bound = scaled_df.max()
print(lower_bound, upper_bound)
df_scaled = pd.DataFrame(scaled_df, columns=df.columns[:-1])
#targets = (utility.check_targets_0_1(df.values[:,-1])).astype(np.int32)
targets = df['target'].values
df_scaled['target'] = targets
columns = df_scaled.columns
dataset_name = 'Glass'
result_path = f'{dataset_name}_results'
if not os.path.exists(result_path):
    os.makedirs(result_path)
    print(f"Created directory: {result_path}")
else:
    print(f"Directory already exists: {result_path}")
display(df_scaled)

0.0 1.0
Directory already exists: Glass_results


Unnamed: 0,RI,Na,Mg,Al,Si,K,Ca,Ba,Fe,target
0,0.432836,0.437594,1.000000,0.252336,0.351786,0.009662,0.308550,0.000000,0.0,0
1,0.283582,0.475188,0.801782,0.333333,0.521429,0.077295,0.223048,0.000000,0.0,0
2,0.220808,0.421053,0.790646,0.389408,0.567857,0.062802,0.218401,0.000000,0.0,0
3,0.285777,0.372932,0.821826,0.311526,0.500000,0.091787,0.259294,0.000000,0.0,0
4,0.275241,0.381955,0.806236,0.295950,0.583929,0.088567,0.245353,0.000000,0.0,0
...,...,...,...,...,...,...,...,...,...,...
209,0.223003,0.512782,0.000000,0.806854,0.500000,0.012882,0.348513,0.336508,0.0,5
210,0.250219,0.630075,0.000000,0.529595,0.580357,0.000000,0.276022,0.504762,0.0,5
211,0.417032,0.545865,0.000000,0.538941,0.644643,0.000000,0.279740,0.520635,0.0,5
212,0.235294,0.548872,0.000000,0.514019,0.678571,0.000000,0.283457,0.498413,0.0,5


In [3]:
np.random.seed(50)
X_train, X_test, y_train, y_test = train_test_split(scaled_df, targets, test_size=0.75,random_state=50,stratify=targets)
X = np.concatenate((X_train,X_test),axis=0)
y = np.concatenate((y_train,y_test),axis=0)

training_data = pd.DataFrame(X_train, columns = columns[:-1])
training_data[columns[-1]] = y_train
testing_data = pd.DataFrame(X_test, columns = columns[:-1])
testing_data[columns[-1]] = y_test
dataframe = pd.concat([training_data, testing_data])
data = dataframe.to_numpy()
n_classes = dataframe['target'].nunique()

original_bounds = [[dataframe[dataframe.columns[i]].min(),dataframe[dataframe.columns[i]].max()] for i in range(len(dataframe.columns[:-1]))]
keras_model = tf.keras.models.load_model(f'new_models/{dataset_name}.h5')



In [4]:
original_bounds

[[0.0, 1.0],
 [0.0, 1.0],
 [0.0, 1.0],
 [0.0, 0.9999999999999999],
 [0.0, 1.0],
 [0.0, 1.0],
 [0.0, 1.0],
 [0.0, 0.9999999999999999],
 [0.0, 1.0]]

In [5]:
mp_model, output_bounds = codify_network(keras_model, dataframe, 'fischetti', relax_constraints=False)
with open(f'{result_path}/{dataset_name}_mp_model.pkl', 'wb') as f:
    pickle.dump(mp_model, f)

with open(f'bounds/{dataset_name}_output_bounds.pkl', 'wb') as f:
    pickle.dump(output_bounds, f)

In [6]:
predictions = []
possible_classes = np.unique(y_test)
class_indexes = []
class_predictions = []
for i in range(n_classes):
    class_indexes.append([])
    class_predictions.append([])
possible_classes, class_indexes, class_predictions
data = testing_data.to_numpy()
for i in range(len(data)):
    predictions.append(mlp_explainer.model_classification_output(k_model=keras_model, net_input=data[i, :-1])[1].numpy())    
    for j,p_class in enumerate(possible_classes):
        if predictions[-1] == p_class:
            class_indexes[j].append(i)
            class_predictions[j].append(data[i, :-1])
print("Metrics:", classification_report(testing_data.to_numpy()[:, -1], predictions,digits=4))

Metrics:               precision    recall  f1-score   support

         0.0     0.6452    0.3846    0.4819        52
         1.0     0.4946    0.8070    0.6133        57
         2.0     0.0000    0.0000    0.0000        13
         3.0     0.4545    0.5000    0.4762        10
         4.0     0.0000    0.0000    0.0000         7
         5.0     0.7308    0.8636    0.7917        22

    accuracy                         0.5590       161
   macro avg     0.3875    0.4259    0.3939       161
weighted avg     0.5116    0.5590    0.5106       161



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [7]:
cols = list(testing_data.columns)
if 'target' not in cols:
    cols.append('target')
predicted_dataset = []
for i,pos_class in enumerate(np.unique(y_test)):
    for instance in (testing_data.to_numpy()[:, :-1][class_indexes[i]]):
        instance = np.append(instance, pos_class.astype('int'))
        predicted_dataset.append(instance)
predicted_dataset = np.asarray(predicted_dataset)
pred_dataset_df = pd.DataFrame(predicted_dataset, columns=cols)
pred_dataset_df['target'] = pred_dataset_df['target'].astype('int')

In [8]:
# Save Bounds to CSV
np.savez(f'bounds/{dataset_name}_data_bounds.npz', original_bounds=original_bounds)
# Save Testing Set to CSV
pred_dataset_df.to_csv(f'{dataset_name}_results/{dataset_name}_X_test.csv', index=False)

In [9]:
metrics_dataframes = []
times_onestep = []
sizes_onestep = []
rsum_onestep = []
coverage_onestep = []
pos_exp_onestep = []
neg_exp_onestep = []
onestep_explanations = []

In [10]:
original_bounds

[[0.0, 1.0],
 [0.0, 1.0],
 [0.0, 1.0],
 [0.0, 0.9999999999999999],
 [0.0, 1.0],
 [0.0, 1.0],
 [0.0, 1.0],
 [0.0, 0.9999999999999999],
 [0.0, 1.0]]

In [11]:
def compute_mean_std(arr):
    return np.mean(arr), np.std(arr)

def relative_percentage_diff(new, old):
    if np.any(old == 0):
        print(f'Warning: found possible division by zero')
        return np.where(old != 0, ((new - old) / old) * 100, np.nan)
    return ((new - old) / old) * 100

p_value = 0.75
print(f"p = {p_value}")
times_twostep = []
sizes_twostep = []
rsum_twostep = []
coverage_twostep = []
twostep_explanations = []
for j in (pred_dataset_df['target'].unique()):
    for i, sample in enumerate((testing_data.to_numpy()[:, :-1][class_indexes[j]])):
        start = time.perf_counter()
        
        explanation, minimal = mlp_explainer.run_explanation_doublestep(sample = sample, n_classes=n_classes, kmodel=keras_model, model=mp_model, output_bounds=output_bounds, og_bounds=original_bounds, p=p_value)
        end = time.perf_counter()
        twostep_explanations.append(explanation)
        times_twostep.append(end-start)
        sizes_twostep.append(len(minimal))
        rsum_twostep.append(mymetrics.range_sum(twostep_explanations[-1]))
        
        coverage_twostep.append(len(mymetrics.calculate_coverage(testing_data, twostep_explanations[-1])))

        start = time.perf_counter()
        explanation, minimal = mlp_explainer.run_explanation(sample = sample, n_classes=n_classes, kmodel=keras_model, model=mp_model, output_bounds=output_bounds, og_bounds=original_bounds, enable_log=False,
                                                             )
        end = time.perf_counter()
        onestep_explanations.append(explanation)
        times_onestep.append(end-start)
        sizes_onestep.append(len(minimal))
        rsum_onestep.append(mymetrics.range_sum(onestep_explanations[-1]))
        coverage_onestep.append(len(mymetrics.calculate_coverage(testing_data, onestep_explanations[-1])))
    #print(j, len(onestep_explanations))
        
times_onestep = np.array(times_onestep)
times_twostep = np.array(times_twostep)
sizes_onestep = np.array(sizes_onestep)
sizes_twostep = np.array(sizes_twostep)
rsum_onestep = np.array(rsum_onestep)
rsum_twostep = np.array(rsum_twostep)
coverage_onestep = np.array(coverage_onestep)
coverage_twostep = np.array(coverage_twostep)

# Compute means and standard deviations
(time_mean_onestep, time_std_onestep) = compute_mean_std(times_onestep)
(time_mean_twostep, time_std_twostep) = compute_mean_std(times_twostep)

(sizes_mean_onestep, sizes_std_onestep) = compute_mean_std(sizes_onestep)
(sizes_mean_twostep, sizes_std_twostep) = compute_mean_std(sizes_twostep)

(rsum_mean_onestep, rsum_std_onestep) = compute_mean_std(rsum_onestep)
(rsum_mean_twostep, rsum_std_twostep) = compute_mean_std(rsum_twostep)

(coverage_mean_onestep, coverage_std_onestep) = compute_mean_std(coverage_onestep)
(coverage_mean_twostep, coverage_std_twostep) = compute_mean_std(coverage_twostep)

# Compute relative percentage differences (Mean & Std)
time_mean_diff = relative_percentage_diff(time_mean_twostep, time_mean_onestep)
sizes_mean_diff = relative_percentage_diff(sizes_mean_twostep, sizes_mean_onestep)
rsum_mean_diff = relative_percentage_diff(rsum_mean_twostep, rsum_mean_onestep)
coverage_mean_diff = relative_percentage_diff(coverage_mean_twostep, coverage_mean_onestep)

time_std_diff = relative_percentage_diff(time_std_twostep, time_std_onestep)
sizes_std_diff = relative_percentage_diff(sizes_std_twostep, sizes_std_onestep)
rsum_std_diff = relative_percentage_diff(rsum_std_twostep, rsum_std_onestep)
coverage_std_diff = relative_percentage_diff(coverage_std_twostep, coverage_std_onestep)

# Compute pointwise relative differences
time_relative_pointwise = relative_percentage_diff(times_twostep, times_onestep)
sizes_relative_pointwise = relative_percentage_diff(sizes_twostep, sizes_onestep)
rsum_relative_pointwise = relative_percentage_diff(rsum_twostep, rsum_onestep)
coverage_relative_pointwise = relative_percentage_diff(coverage_twostep, coverage_onestep)

# Compute pointwise means
time_relative_mean = np.mean(time_relative_pointwise) 
sizes_relative_mean = np.mean(sizes_relative_pointwise)
rsum_relative_mean = np.mean(rsum_relative_pointwise)
coverage_relative_mean = np.mean(coverage_relative_pointwise)

# Compute pointwise standard deviations
time_relative_std = np.std(time_relative_pointwise) 
sizes_relative_std = np.std(sizes_relative_pointwise)
rsum_relative_std = np.std(rsum_relative_pointwise)
coverage_relative_std = np.std(coverage_relative_pointwise)

# Organize Data
all_metrics_data = {
    'Metric': ['Time', 'Size', 'Ranges_Sum', 'Coverage'],
    'ONESTEP_MEAN': [time_mean_onestep, sizes_mean_onestep, rsum_mean_onestep, coverage_mean_onestep],
    'ONESTEP_STD': [time_std_onestep, sizes_std_onestep, rsum_std_onestep, coverage_std_onestep],
    'TWOSTEP_MEAN': [time_mean_twostep, sizes_mean_twostep, rsum_mean_twostep, coverage_mean_twostep],
    'TWOSTEP_STD': [time_std_twostep, sizes_std_twostep, rsum_std_twostep, coverage_std_twostep],
    'MEAN_DIFF_%': [time_mean_diff, sizes_mean_diff, rsum_mean_diff, coverage_mean_diff],
    'STD_DIFF_%': [time_std_diff, sizes_std_diff, rsum_std_diff, coverage_std_diff],
    'POINTWISE_MEAN_%': [time_relative_mean, sizes_relative_mean, rsum_relative_mean, coverage_relative_mean],
    'POINTWISE_STD_%': [time_relative_std, sizes_relative_std, rsum_relative_std, coverage_relative_std]
}
# Display and save
all_metrics_df = pd.DataFrame(all_metrics_data)
display(all_metrics_df)
all_metrics_df.to_csv(f'{result_path}/results_{p_value}.csv', index=False)

#Save Raw Metric Data
raw_df = pd.DataFrame({
    "times_onestep": times_onestep, 
    "times_twostep": times_twostep,
    "sizes_onestep": sizes_onestep, 
    "sizes_twostep": sizes_twostep,
    "rsum_onestep": rsum_onestep, 
    "rsum_twostep": rsum_twostep,
    "coverage_onestep": coverage_onestep, 
    "coverage_twostep": coverage_twostep,
    "time_relative_%": time_relative_pointwise,
    "sizes_relative_%": sizes_relative_pointwise,
    "rsum_relative_%": rsum_relative_pointwise,
    "coverage_relative_%": coverage_relative_pointwise
})

# Save to CSV
raw_df.to_csv(f"{result_path}/raw_metric_data_{p_value}.csv", index=False)

# Save onestep explanations
np.savez(f'{result_path}/onestep_explanations_{p_value}.npz', 
         onestep_explanations=onestep_explanations)

# Save twostep explanations
np.savez(f'{result_path}/twostep_explanations{p_value}.npz', 
         twostep_explanations=twostep_explanations)

p = 0.75
0 31
1 124
3 135
5 161


Unnamed: 0,Metric,ONESTEP_MEAN,ONESTEP_STD,TWOSTEP_MEAN,TWOSTEP_STD,MEAN_DIFF_%,STD_DIFF_%,POINTWISE_MEAN_%,POINTWISE_STD_%
0,Time,0.340337,0.08527,0.510757,0.146042,50.074174,71.269104,53.658711,37.344907
1,Size,6.658385,1.028208,6.658385,1.028208,0.0,0.0,0.0,0.0
2,Ranges_Sum,3.681785,0.780144,3.680875,0.797784,-0.024693,2.261134,-0.101002,2.700808
3,Coverage,1.459627,1.427302,1.751553,1.686744,20.0,18.177143,22.014936,57.338672
