# imports

In [1]:
from z3 import *
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier

In [14]:
set_option(rational_to_decimal=True)

In [15]:
test = Real('test')
testexp = test >= 5

In [16]:
print(
testexp.arg(0),
testexp.arg(1),
testexp.decl()
)

test 5 >=


# model

In [17]:
from z3 import *
import numpy as np


class XGBoostExplainer:
    """Apenas classificação binária e base_score = None
    data = X. labels = y
    """

    def __init__(self, model, data):
        """_summary_

        Args:
            model (XGBoost): xgboost model fited
            data (DataFrame): dataframe (X or X_train)
            labels (array): y (targets)
        """
        self.model = model
        self.data = data.values
        self.columns = model.feature_names_in_.tolist()
        self.max_categories = 2

    def fit(self):
        """Initialize Z3 expressions from model and categoric features from data.
        z3 expressions are built here for pkl compatibility (use fit after export pkl)
        """
        self.categoric_features = self.get_categoric_features(self.data)
        self.T_model = self.model_trees_expression(self.model)
        self.T = self.T_model

    def explain(self, instance, reorder="asc"):
        self.I = self.instance_expression(instance)
        self.D = self.decision_function_expression(self.model, [instance])

        return self.explain_expression(self.I, self.T, self.D, self.model, reorder)

    def get_categoric_features(self, data: np.ndarray):
        """
        Recebe um dataset e retorna uma fórmula no z3 com:
        - Restrições de valor máximo e mínimo para features contínuas.
        - Restrições de igualdade para features categóricas binárias.
        """
        categoric_features = []
        for i in range(data.shape[1]):
            feature_values = data[:, i]
            unique_values = np.unique(feature_values)
            if len(unique_values) <= self.max_categories:
                categoric_features.append(self.columns[i])

        return categoric_features

    def feature_constraints(self, constraints=[]):
        """ TODO
        esperado receber limites das features pelo usuário
        formato previso: matriz/dataframe [feaature, min/max, valor]
        constraaint_expression = "constraaint_df_to_feature()"
        """
        return

    def model_trees_expression(self, model):
        """
        Constrói expressões lógicas para todas as árvores de decisão em um dataframe de XGBoost.
        Para árvores que são apenas folhas, gera diretamente um And com o valor da folha.

        Args:
            df (pd.DataFrame): Dataframe contendo informações das árvores.
            class_index (int): Índice da classe atual.

        Returns:
            z3.ExprRef: Fórmula representando todos os caminhos de todas as árvores.
        """
        df = model.get_booster().trees_to_dataframe()
        df["Split"] = df["Split"].round(4)
        self.booster_df = df
        class_index = 0  # if model.n_classes_ == 2:
        all_tree_formulas = []

        for tree_index in df["Tree"].unique():
            tree_df = df[df["Tree"] == tree_index]
            o = Real(f"o_{tree_index}_{class_index}")

            if len(tree_df) == 1 and tree_df.iloc[0]["Feature"] == "Leaf":
                leaf_value = tree_df.iloc[0]["Gain"]
                all_tree_formulas.append(And(o == leaf_value))
                continue
            path_formulas = []

            def get_conditions(node_id):
                conditions = []
                current_node = tree_df[tree_df["ID"] == node_id]
                if current_node.empty:
                    return conditions

                parent_node = tree_df[
                    (tree_df["Yes"] == node_id) | (tree_df["No"] == node_id)
                ]
                if not parent_node.empty:
                    parent_data = parent_node.iloc[0]
                    feature = parent_data["Feature"]
                    split_value = parent_data["Split"]
                    x = Real(feature)
                    if parent_data["Yes"] == node_id:
                        conditions.append(x < split_value)
                    else:
                        conditions.append(x >= split_value)
                    conditions = get_conditions(parent_data["ID"]) + conditions

                return conditions

            for _, node in tree_df[tree_df["Feature"] == "Leaf"].iterrows():
                leaf_value = node["Gain"]
                leaf_id = node["ID"]
                conditions = get_conditions(leaf_id)
                path_formula = And(*conditions)
                implication = Implies(path_formula, o == leaf_value)
                path_formulas.append(implication)

            all_tree_formulas.append(And(*path_formulas))
        return And(*all_tree_formulas)

    def decision_function_expression(self, model, x):
        n_classes = 1 if model.n_classes_ <= 2 else model.n_classes_
        predicted_class = model.predict(x)[0]
        n_estimators = len(model.get_booster().get_dump())

        estimator_pred = Solver()
        estimator_pred.add(self.I)
        estimator_pred.add(self.T)
        variables = [Real(f"o_{i}_0") for i in range(n_estimators)]
        if estimator_pred.check() == sat:
            solvermodel = estimator_pred.model()
            total_sum = sum(
                float(solvermodel.eval(var).as_fraction()) for var in variables
            )
        else:
            total_sum = 0
            print("estimator error")
        init_value = model.predict(x, output_margin=True)[0] - total_sum

        equation_list = []
        for class_number in range(n_classes):
            estimator_list = []
            for estimator_number in range(
                int(len(model.get_booster().get_dump()) / n_classes)
            ):
                o = Real(f"o_{estimator_number}_{class_number}")
                estimator_list.append(o)
            equation_o = Sum(estimator_list) + init_value
            equation_list.append(equation_o)

        if n_classes <= 2:
            if predicted_class == 0:
                final_equation = equation_list[0] < 0
            else:
                final_equation = equation_list[0] > 0
        else:
            compare_equation = []
            for class_number in range(n_classes):
                if predicted_class != class_number:
                    compare_equation.append(
                        equation_list[predicted_class] > equation_list[class_number]
                    )
            final_equation = And(compare_equation)

        return final_equation

    def instance_expression(self, instance):
        formula = [Real(self.columns[i]) == value for i, value in enumerate(instance)]
        return formula

    def explain_expression(self, I, T, D, model, reorder):
        i_expression = I.copy()
        T_s = T
        D_s = D

        importances = model.feature_importances_
        non_zero_indices = np.where(importances != 0)[0]

        if reorder == "asc":
            sorted_feature_indices = non_zero_indices[
                np.argsort(importances[non_zero_indices])
            ]
            i_expression = [i_expression[i] for i in sorted_feature_indices]
        elif reorder == "desc":
            sorted_feature_indices = non_zero_indices[
                np.argsort(-importances[non_zero_indices])
            ]
            i_expression = [i_expression[i] for i in sorted_feature_indices]

        for feature in i_expression.copy():

            i_expression.remove(feature)

            # prove(Implies(And(And(i_expression), T), D))
            if self.is_proved(Implies(And(And(i_expression), T_s), D_s)):
                continue
                # print('proved')
            else:
                # print('not proved')
                i_expression.append(feature)
        # print(self.is_proved(Implies(And(And(i_expression), T_s), D_s)))
        return i_expression

    def is_proved(self, f):
        s = Solver()
        s.add(Not(f))
        if s.check() == unsat:
            return True
        else:
            return False

    def delta_expression(self, exp):
        expressions = []
        delta = Real("delta")

        self.delta_features = []
        for name in exp:
            tokens = name.split(" == ")
            z3feature = Real(tokens[0])
            self.delta_features.append(str(z3feature))
            value = tokens[1]

            if tokens[0] in self.categoric_features:
                expressions.append(z3feature == float(value))
            else:
                expressions.append(z3feature >= float(value) - delta)
                expressions.append(z3feature <= float(value) + delta)

        expressions.append(delta >= 0)
        self.deltaexp = expressions
        return expressions

    def get_delta(self, exp):
        expstr = []
        for expression in exp:
            self.cumulative_range_expresson.append(expression)
            expstr.append(str(expression))
            
        self.delta_expressions = self.delta_expression(expstr)

        opt = Optimize()
        opt.add(self.delta_expressions)
        opt.add(self.T)
        opt.add(Not(self.D))

        delta = Real("delta")
        expmin = opt.minimize(delta)
        opt.check()

        # rangemodel = opt.model()

        value = str(expmin.value())

        if "+ epsilon" in value:
            delta_value = float(value.split(" + ")[0])
        elif "epsilon" == value:
            delta_value = 0
        else:
            delta_value = float(value) - 0.01
        return delta_value

    def explain_range(
        self,
        instance,
        reorder="asc",
        dataset_bounds=True,
    ):
        self.cumulative_range_expresson = []
        self.range_metric = 0
        exp = self.explain(instance, reorder)
        if exp != []:
            delta_value = self.get_delta(exp)

            if delta_value == 0:
                expstr = []
                for exppart in exp:
                    expstr.append(str(exppart))
                return expstr
            # save_deltas for model comparison

            range_exp = []
            for item in exp:
                name = str(item.arg(0))
                if name not in self.categoric_features:
                    itemvalue = float(item.arg(1).as_fraction())
                    lower = round(itemvalue - delta_value, 2)
                    upper = round(itemvalue + delta_value, 2)

                    if dataset_bounds == True:
                        idx = list(self.columns).index(name)
                        min_idx = np.min(self.data[:, idx])
                        max_idx = np.max(self.data[:, idx])
                        if lower < min_idx:
                            lower = min_idx
                        if upper > max_idx:
                            upper = max_idx
                    
                    self.range_metric += (upper - lower)
                    range_exp.append(f"{lower} <= {name} <= {upper}")
                else:
                    range_exp.append(f"{name} == {item.arg(1)}")

            return range_exp
        else:
            return exp

    def explain_range_other():
        pass

# test Iris

In [18]:
class ColumnEncoderDecoder:
    def __init__(self):
        self.mapping = {}

    def encode(self, df):
        """Substitui os nomes das colunas por x0, x1, ..., xn"""
        self.mapping = {f"x{i}": col for i, col in enumerate(df.columns)}
        df_encoded = df.rename(
            columns={col: new_col for new_col, col in self.mapping.items()}
        )
        return df_encoded

    def decode(self, text):
        """Substitui x0, x1, ..., xn pelos nomes originais das colunas"""
        for new_col, original_col in self.mapping.items():
            text = text.replace(new_col, original_col)
        return text

In [19]:
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target

# y = np.where(y == 0, 0, 1)  # converte em binario
y[y == 2] = 0
# X = X.iloc[:, :2] # corta colunas do df

In [20]:
encoder_decoder = ColumnEncoderDecoder()
X = encoder_decoder.encode(X)

encoded_text = "decode de x1"
decoded_text = encoder_decoder.decode(encoded_text)

print("\nTexto decodificado:")
print(decoded_text)


Texto decodificado:
decode de sepal width (cm)


In [21]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=101
)

xgbc = XGBClassifier(n_estimators=100, max_depth=3, learning_rate=0.1)
xgbc.fit(X_train, y_train)

preds = xgbc.predict(X_test)
preds

array([0, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0,
       1, 1, 1, 1, 1, 0, 0, 0])

In [22]:
explainer = XGBoostExplainer(xgbc, X)
explainer.fit()

In [23]:
explainer.explain_range(X_test.values[19], reorder="desc")

['5.51 <= x2 <= 5.69', '6.01 <= x0 <= 6.19', '2.51 <= x1 <= 2.69']

In [24]:
xgbc.feature_importances_

array([0.03846328, 0.02926282, 0.36015186, 0.57212204], dtype=float32)

In [27]:
X_test.shape

(30, 4)

In [25]:
range_metric_list = []
for i in range(X_test.shape[0]):
    print(explainer.explain_range(X_test.values[i], reorder="desc"))
    range_metric_list.append(explainer.range_metric)

['1.0 <= x2 <= 2.99']
['1.0 <= x2 <= 2.99']
['1.0 <= x2 <= 2.99']
['x3 == 1.6', 'x2 == 5.8', 'x0 == 7.2', 'x1 == 3']
['1.21 <= x3 <= 1.59', '4.51 <= x2 <= 4.89']
['x3 == 1.8']
['1.31 <= x3 <= 1.69', '4.31 <= x2 <= 4.69']
['0.91 <= x3 <= 1.69', '3.61 <= x2 <= 4.39']
['x2 == 5', 'x0 == 6.3', 'x1 == 2.5']
['1.0 <= x2 <= 2.99']
['1.8 <= x3 <= 2.2']
['1.0 <= x2 <= 2.99']
['1.0 <= x2 <= 2.99']
['x2 == 6.7', 'x0 == 7.7', 'x1 == 2.8']
['x3 == 1.8']
['0.91 <= x3 <= 1.69', '3.91 <= x2 <= 4.69']
['0.91 <= x3 <= 1.69', '3.71 <= x2 <= 4.49']
['0.91 <= x3 <= 1.69', '3.81 <= x2 <= 4.59']
['1.0 <= x2 <= 2.99']
['5.51 <= x2 <= 5.69', '6.01 <= x0 <= 6.19', '2.51 <= x1 <= 2.69']
['0.51 <= x3 <= 1.69', '3.21 <= x2 <= 4.39']
['1.0 <= x2 <= 2.99']
['0.71 <= x3 <= 1.69', '3.91 <= x2 <= 4.89']
['1.11 <= x3 <= 1.69', '4.11 <= x2 <= 4.69']
['0.91 <= x3 <= 1.69', '3.91 <= x2 <= 4.69']
['0.91 <= x3 <= 1.69', '3.21 <= x2 <= 3.99']
['0.7 <= x3 <= 1.3', '3.0 <= x2 <= 3.6']
['1.8 <= x3 <= 2.5']
['1.0 <= x2 <= 2.99']


In [26]:
print("sum:", np.sum(range_metric_list), "mean:", np.mean(range_metric_list))

sum: 39.10000000000001 mean: 1.3033333333333337


In [24]:
for i in range(X_test.shape[0]):
    print(i, explainer.explain_range(X_test.values[i], reorder="asc"))

0 ['1.0 <= x2 <= 2.99']
1 ['1.0 <= x2 <= 2.99']
2 ['1.0 <= x2 <= 2.99']
3 ['x1 == 3', 'x0 == 7.2', 'x2 == 5.8', 'x3 == 1.6']
4 ['4.51 <= x2 <= 4.89', '1.21 <= x3 <= 1.59']
5 ['x3 == 1.8']
6 ['4.31 <= x2 <= 4.69', '1.31 <= x3 <= 1.69']
7 ['3.61 <= x2 <= 4.39', '0.91 <= x3 <= 1.69']
8 ['1.8 <= x3 <= 2.0']
9 ['1.0 <= x2 <= 2.99']
10 ['1.8 <= x3 <= 2.2']
11 ['1.0 <= x2 <= 2.99']
12 ['1.0 <= x2 <= 2.99']
13 ['1.8 <= x3 <= 2.2']
14 ['x3 == 1.8']
15 ['3.91 <= x2 <= 4.69', '0.91 <= x3 <= 1.69']
16 ['3.71 <= x2 <= 4.49', '0.91 <= x3 <= 1.69']
17 ['3.81 <= x2 <= 4.59', '0.91 <= x3 <= 1.69']
18 ['1.0 <= x2 <= 2.99']
19 ['2.51 <= x1 <= 2.69', '6.01 <= x0 <= 6.19', '5.51 <= x2 <= 5.69']
20 ['3.21 <= x2 <= 4.39', '0.51 <= x3 <= 1.69']
21 ['1.0 <= x2 <= 2.99']
22 ['3.91 <= x2 <= 4.89', '0.71 <= x3 <= 1.69']
23 ['4.11 <= x2 <= 4.69', '1.11 <= x3 <= 1.69']
24 ['3.91 <= x2 <= 4.69', '0.91 <= x3 <= 1.69']
25 ['3.21 <= x2 <= 3.99', '0.91 <= x3 <= 1.69']
26 ['3.0 <= x2 <= 3.6', '0.7 <= x3 <= 1.3']
27 ['1.8

# check correct

In [21]:
def check_correct_explanation(exp, explainer):
    opt = Optimize()

    exprange_z3 = []
    exptokens = []
    for item in exp:
        item = str(item)
        if "<=" in item:
            tokens = item.split(" <= ")
            exprange_z3.append((tokens[0]) <= Real(tokens[1]))
            exprange_z3.append(Real(tokens[1]) <= (tokens[2]))
            exptokens.append(tokens[1])
        else:
            tokens = item.split(" == ")
            exprange_z3.append(Real(tokens[0]) == (tokens[1]))
            exptokens.append(tokens[0])
    opt.add(exprange_z3)

    inst = explainer.I
    deltaexp = []
    for item in inst:
        item = str(item)
        tokens = item.split(" == ")
        if tokens[0] not in exptokens:
            if tokens[0] in explainer.categoric_features:
                deltaexp.append(Real(tokens[0]) == (tokens[1]))
            else:
                deltaexp.append(Real(tokens[0]) >= (tokens[1]) - Real("delta"))
                deltaexp.append(Real(tokens[0]) <= (tokens[1]) + Real("delta"))
    opt.add(deltaexp)

    opt.add(explainer.T_model)

    opt.add(Not(explainer.D))

    opt.add(Real("delta") >= 0)

    delta = Real("delta")
    expmin = opt.minimize(delta)

    printlist = []
    if opt.check() == sat:
        for var in opt.model():
            if str(var) in explainer.columns:
                printlist.append(f"{var} = {opt.model()[var]}")
        printlist.append(f"delta = {opt.model().eval(delta)}")
    else:
        printlist.append("unsat == correct")
    #   print(printlist)
    value = str(expmin.value())
    #   print(value)
    return printlist, exprange_z3, deltaexp, explainer.T, Not(explainer.D)

In [22]:
count = 0
explanationstest = []
for i in range(0, len(X)):
    exprange = explainer.explain_range(X.values[i])
    explanationstest.append(exprange)
    ans, ansrange, ansdelta, anst, ansnotd = check_correct_explanation(
        exprange, explainer
    )
    if ans[0] == ("unsat == correct"):
        count += 1
    else:
        print(i, ans)
count, len(X)

(150, 150)