In [49]:
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris, load_wine
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from z3 import *
from fractions import Fraction

In [50]:
class XGBoostExplainerMulticlass():
    """Apenas classificação binária e base_score = None
    data = X. labels = y
    """

    def __init__(self, model, data):
        """_summary_

        Args:
            model (XGBoost): xgboost model fited
            data (DataFrame): dataframe (X or X_train)
            labels (array): y (targets)
        """
        self.model = model
        self.data = data.values
        self.columns = data.columns
        self.max_categories = 2
        self.n_classes = 1 if model.n_classes_ <= 2 else model.n_classes_

        set_option(rational_to_decimal=True)
        self.categoric_features = self.get_categoric_features(self.data)
        self.T_model = self.model_trees_expression(self.model, self.n_classes)
        self.T = self.T_model

    def explain(self, instance, reorder="asc", threshold_margin=0):
        self.I = self.instance_expression(instance)

        if self.n_classes > 2:
            self.D, self.D_add = self.decision_function_multiclass(self.model, [instance])
        else:
            self.D, self.D_add = self.decision_function_binary(self.model, [instance])
        return self.explain_expression(self.I, self.T, self.D, self.D_add, self.model, reorder)

    def get_categoric_features(self, data: np.ndarray):
        categoric_features = []
        for i in range(data.shape[1]):
            feature_values = data[:, i]
            unique_values = np.unique(feature_values)
            if len(unique_values) <= self.max_categories:
                categoric_features.append(self.columns[i])

        return categoric_features

    def instance_expression(self, instance):
        instance_exp = [Real(self.columns[i]) == value for i, value in enumerate(instance)]
        self.instance_exp = instance_exp
        return instance_exp

    def model_trees_expression(self, model, n_class_trees):
        df = model.get_booster().trees_to_dataframe()
        if model.get_booster().feature_names == None:
            feature_map = {f"f{i}": col for i, col in enumerate(self.columns)}
            df["Feature"] = df["Feature"].replace(feature_map)

        df["Split"] = df["Split"].round(4)
        self.booster_df = df
        df["Class"] = df["Tree"] % self.n_classes

        all_tree_formulas = []
        for class_index in range(n_class_trees):
          class_tree_formulas = []
          class_tree_df = df[df["Class"] == class_index]
          estimator_number = 0
          for tree_index in class_tree_df["Tree"].unique():
              tree_df = class_tree_df[class_tree_df["Tree"] == tree_index]
              o = Real(f"o_{estimator_number}_{class_index}")
              estimator_number += 1

              if len(tree_df) == 1 and tree_df.iloc[0]["Feature"] == "Leaf":
                  leaf_value = tree_df.iloc[0]["Gain"]
                  class_tree_formulas.append(And(o == leaf_value))
                  continue
              path_formulas = []

              def get_conditions(node_id):
                  conditions = []
                  current_node = tree_df[tree_df["ID"] == node_id]
                  if current_node.empty:
                      return conditions

                  parent_node = tree_df[
                      (tree_df["Yes"] == node_id) | (tree_df["No"] == node_id)
                  ]
                  if not parent_node.empty:
                      parent_data = parent_node.iloc[0]
                      feature = parent_data["Feature"]
                      split_value = parent_data["Split"]
                      x = Real(feature)
                      if parent_data["Yes"] == node_id:
                          conditions.append(x < split_value)
                      else:
                          conditions.append(x >= split_value)
                      conditions = get_conditions(parent_data["ID"]) + conditions

                  return conditions

              for _, node in tree_df[tree_df["Feature"] == "Leaf"].iterrows():
                  leaf_value = node["Gain"]
                  leaf_id = node["ID"]
                  conditions = get_conditions(leaf_id)
                  path_formula = And(*conditions)
                  implication = Implies(path_formula, o == leaf_value)
                  path_formulas.append(implication)

              class_tree_formulas.append(And(*path_formulas))
          all_tree_formulas.append(And(*class_tree_formulas))
        # print(all_tree_formulas)
        return And(all_tree_formulas)

    def get_init_value(self, model, x, estimator_variables):
        estimator_pred = Solver()
        estimator_pred.add(self.I)
        estimator_pred.add(self.T)
        estimator_pred.add(And(estimator_variables))
        if estimator_pred.check() == sat:
            solvermodel = estimator_pred.model()

            total_sum = []
            for j in range(self.n_classes):
              total_sum.append(float(solvermodel.eval(Real(f"sum_class_{j}")).as_fraction()))

        else:
            total_sum = 0
            print("estimator error")
        # print(total_sum)
        self.predicted_margin = model.predict(x, output_margin=True)[0]
        init_value = self.predicted_margin - total_sum
        # print(self.predicted_margin, "init:", init_value)
        self.init_value = init_value
        return init_value

    def decision_function_multiclass(self, model, x):
        predicted_class = model.predict(x)[0]
        self.predicted_class = predicted_class
        n_estimators = int(len(model.get_booster().get_dump()) / self.n_classes)

        estimator_variables = []
        for j in range(self.n_classes):
            class_est_exp = ([Real(f"o_{i}_{j}") for i in range(n_estimators)]) # _0 only for binary classification
            estimator_variables.append(Real(f"sum_class_{j}") == Sum(class_est_exp))
        # print(estimator_variables)
        self.estimator_variables = estimator_variables

        init_value = self.get_init_value(model, x, estimator_variables)
        # print("init:", np.round(init_value, 2))

        equation_list = []
        for j in range(self.n_classes):
          decision = Real(f"decision_class_{j}")
          equation_list.append(decision == Real(f"sum_class_{j}") + init_value[j])

        # print(And(equation_list))
        decision_list = []
        for class_number in range(self.n_classes):
          if class_number != self.predicted_class:
            decision_list.append(Real(f"decision_class_{self.predicted_class}") - Real(f"decision_class_{class_number}") > 0)
        decision_exp = And(decision_list)
        return And(decision_exp), And(And(equation_list), And(estimator_variables))
    
    def decision_function_binary(self, model, x):
        n_classes = 1 if model.n_classes_ <= 2 else model.n_classes_
        predicted_class = model.predict(x)[0]
        n_estimators = len(model.get_booster().get_dump())

        estimator_pred = Solver()
        estimator_pred.add(self.I)
        estimator_pred.add(self.T)
        variables = [Real(f"o_{i}_0") for i in range(n_estimators)]
        if estimator_pred.check() == sat:
            solvermodel = estimator_pred.model()
            total_sum = sum(
                float(solvermodel.eval(var).as_fraction()) for var in variables
            )
        else:
            total_sum = 0
            print("estimator error")
        init_value = model.predict(x, output_margin=True)[0] - total_sum

        equation_list = []
        for class_number in range(n_classes):
            estimator_list = []
            for estimator_number in range(
                int(len(model.get_booster().get_dump()) / n_classes)
            ):
                o = Real(f"o_{estimator_number}_{class_number}")
                estimator_list.append(o)
            equation_o = Sum(estimator_list) + init_value
            equation_list.append(equation_o)

        if predicted_class == 0:
            final_equation = equation_list[0] < 0
        else:
            final_equation = equation_list[0] > 0

        return final_equation, True


    def explain_expression(self, I, T_s, D_s, D_add, model, reorder):
        i_expression = I.copy()

        importances = model.feature_importances_
        non_zero_indices = np.where(importances != 0)[0]

        if reorder == "asc":
            sorted_feature_indices = non_zero_indices[
                np.argsort(importances[non_zero_indices])
            ]
            i_expression = [i_expression[i] for i in sorted_feature_indices]
        elif reorder == "desc":
            sorted_feature_indices = non_zero_indices[
                np.argsort(-importances[non_zero_indices])
            ]
            i_expression = [i_expression[i] for i in sorted_feature_indices]


        for feature in i_expression.copy():
            # print("\n---removed", feature)
            i_expression.remove(feature)

            if self.is_proved_sat(And(And(And(i_expression), T_s), D_add), D_s):
                # print('proved')
                continue
            else:
                # print("----added back");
                # print('not proved')
                i_expression.append(feature)
        # print(self.is_proved(Implies(And(And(i_expression), T_s), D_s)))
        return i_expression

    def is_proved_sat(self, expressions, decision):


      # print(And(decision_list))
      opt = Optimize()
      opt.add(Not(Implies(expressions, decision)))
      if opt.check() == unsat:
        return True
      else:
        return False

    def get_deltas(self, exp):
        if exp and isinstance(exp[0], str):
            expz3 = []
            for token in exp:
                tokens = token.split(" == ")
                expz3.append(Real(tokens[0]) == (tokens[1]))
            exp = expz3
        for expression in exp:
            if str(expression.arg(0)) in self.categoric_features:
                self.caterogic_expressions.append(expression)
                exp = list(filter(lambda expr: not expr.eq(expression), exp))
            else:
                self.cumulative_range_expresson.append(expression)

        delta_list = []
        for expression in exp:

            self.cumulative_range_expresson = list(
                filter(
                    lambda expr: not expr.eq(expression),
                    self.cumulative_range_expresson,
                )
            )
            lower_min, upper_min = self.optimize_delta(expression)

            if lower_min != None:
                delta_value_lower = self.get_delta_value(str(lower_min.value()))
                self.cumulative_range_expresson.append(
                    expression.arg(0) >= expression.arg(1) - delta_value_lower
                )
            else:
                # print("unsat == open range lower")
                delta_value_lower = None

            if upper_min != None:
                delta_value_upper = self.get_delta_value(str(upper_min.value()))
                self.cumulative_range_expresson.append(
                    expression.arg(0) <= expression.arg(1) + delta_value_upper
                )
            else:
                # print("unsat == open range upper")
                delta_value_upper = None

            # print(expression, delta_value_lower, delta_value_upper)
            delta_list.append([expression, delta_value_lower, delta_value_upper])

        self.delta_list = delta_list
        return delta_list

    def get_delta_value(self, value):
        if "+ epsilon" in value:
            delta_value = float(Fraction(value.split(" + ")[0]))
        elif "epsilon" == value:
            delta_value = 0
        elif "0" == value:
            print("ERROR: delta == 0, explanation is incorrect")
            delta_value = 0
        else:
            delta_value = round(float(Fraction(value)) - 0.01, 2)

        return delta_value

    def optimize_delta(self, expression):
        delta_upper = Real("delta_upper")
        delta_lower = Real("delta_lower")

        self.delta_features = []

        delta_expressions = []
        delta_expressions.append(expression.arg(0) >= expression.arg(1) - delta_lower)
        delta_expressions.append(expression.arg(0) <= expression.arg(1) + delta_upper)

        self.delta_expressions = delta_expressions

        expression_list = []
        expression_list.append(And(self.cumulative_range_expresson))
        expression_list.append(And(self.caterogic_expressions))
        expression_list.append(And(self.delta_expressions))
        expression_list.append(self.T)
        expression_list.append(self.D_add)
        expression_list.append(Not(self.D))
        expression_list.append(delta_upper >= 0)
        expression_list.append(delta_lower >= 0)

        opt_lower = Optimize()
        opt_lower.add(And(expression_list))
        opt_lower.add(delta_upper == 0)
        lower_min = opt_lower.minimize(delta_lower)
        if opt_lower.check() != sat:
            # print("lower unsat")
            lower_min = None

        opt_upper = Optimize()
        opt_upper.add(And(expression_list))
        opt_upper.add(delta_lower == 0)
        upper_min = opt_upper.minimize(delta_upper)
        if opt_upper.check() != sat:
            # print("upper unsat")
            upper_min = None

        return lower_min, upper_min

    def explain_range(self, instance, reorder="asc", dataset_bounds=True, exp=None):
        self.cumulative_range_expresson = []
        self.caterogic_expressions = []
        self.range_metric = 0
        if exp == None:
            exp = self.explain(instance, reorder)
        else:
            self.I = self.instance_expression(instance)
            if self.n_classes > 2:
                self.D, self.D_add = self.decision_function_multiclass(self.model, [instance])
            else:
                self.D, self.D_add = self.decision_function_binary(self.model, [instance])
        if exp != []:
            delta_list = self.get_deltas(exp)
            range_exp = []
            for expression, delta_lower, delta_upper in delta_list:
                expname = str(expression.arg(0))

                expvalue = float(expression.arg(1).as_fraction())
                lower = None
                upper = None
                if delta_lower is not None:
                    lower = round(expvalue - delta_lower, 2)
                if delta_upper is not None:
                    upper = round(expvalue + delta_upper, 2)

                if dataset_bounds == True:
                    idx = list(self.columns).index(expname)
                    min_idx = np.min(self.data[:, idx])
                    max_idx = np.max(self.data[:, idx])
                    if lower is not None and lower < min_idx:
                        lower = min_idx
                    if upper is not None and upper > max_idx:
                        upper = max_idx

                    # self.range_metric += (upper - lower)
                if lower == upper:
                    range_exp.append(f"{expression.arg(0)} == {expression.arg(1)}")
                else:
                    if lower is None:
                        range_exp.append(f"{expname} <= {upper}")
                    elif upper is None:
                        range_exp.append(f"{expname} >= {lower}")
                    else:
                        range_exp.append(f"{lower} <= {expname} <= {upper}")

            for expression in self.caterogic_expressions:
                range_exp.append(f"{expression.arg(0)} == {expression.arg(1)}")

            return range_exp
        else:
            return exp

In [51]:
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model_iris = XGBClassifier(n_estimators=10, max_depth=3, learning_rate=0.1, objective='multi:softmax')
model_iris.fit(X_train, y_train)

In [52]:
model_iris.predict(X_train)

array([0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0, 0, 1, 2, 2, 1, 2, 1, 2,
       1, 0, 2, 1, 0, 0, 0, 1, 2, 0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 0, 2, 2,
       1, 1, 2, 1, 0, 1, 2, 0, 0, 1, 1, 0, 2, 0, 0, 2, 1, 2, 2, 2, 2, 1,
       0, 0, 2, 2, 0, 0, 0, 1, 2, 0, 2, 2, 0, 1, 1, 2, 1, 2, 0, 2, 1, 2,
       1, 1, 1, 0, 1, 1, 0, 1, 2, 2, 0, 1, 2, 2, 0, 2, 0, 1, 2, 2, 1, 2,
       1, 1, 2, 2, 0, 1, 1, 0, 1, 2], dtype=int32)

In [53]:
explainer_iris = XGBoostExplainerMulticlass(model_iris, X)

In [54]:
sample = X_test.values[2]
print(sample)

[7.7 2.6 6.9 2.3]


In [55]:
print(explainer_iris.explain(sample, reorder="asc"))

[petal length (cm) == 6.9]


In [56]:
sample_exp = ["petal length (cm) == 6.9", 
            #   "petal length (cm) == 6.9",
              ]

In [57]:
print(explainer_iris.explain_range(sample, reorder="asc", exp=sample_exp))

['petal length (cm) >= 5.1']


# check binary compatibility

In [58]:
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target
y[y == 2] = 0

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model_iris_bin = XGBClassifier(n_estimators=100, max_depth=3, learning_rate=0.1)
model_iris_bin.fit(X_train, y_train)
model_iris_bin.predict(X_train)

array([0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 0, 1, 0,
       1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0,
       1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1,
       0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0,
       1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0,
       1, 1, 0, 0, 0, 1, 0, 0, 1, 0])

In [59]:
explainer_iris_bin = XGBoostExplainerMulticlass(model_iris_bin, X)

In [60]:
sample = X_test.values[2]
print(sample)

[7.7 2.6 6.9 2.3]


In [61]:
print(explainer_iris_bin.explain(sample, reorder="asc"))

[petal width (cm) == 2.3]


In [62]:
sample_exp = ["petal width (cm) == 2.3", 
            #   "petal length (cm) == 6.9",
              ]

In [63]:
print(explainer_iris_bin.explain_range(sample, reorder="asc", exp=sample_exp))

['petal width (cm) >= 1.8']
