# Construction de règles contre-factuelles


In [1]:
import pandas as pd

PATH = "."
elections_data_path = f"{PATH}/data/elections.csv"

# Séparer les caractéristiques et les étiquettes
elections_data = pd.read_csv(elections_data_path)
X = elections_data.drop("Label", axis=1)
y = elections_data["Label"]

elections_data

Unnamed: 0,Adresse,Majeur?,Nationalite,Label
0,Paris,oui,Francais,1
1,Paris,non,Francais,-1
2,Montpellier,oui,Italien,1
3,Paris,oui,Suisse,-1
4,Strasbourg,non,Italien,-1
5,Strasbourg,non,Francais,-1
6,Strasbourg,oui,Francais,1
7,Montpellier,oui,Suisse,-1


In [2]:
from collections import Counter
from typing import Any
import numpy as np


class Criterion:
    def __init__(self, criterion) -> None:
        self.criterion = criterion

    def __call__(self, *args: Any, **kwds: Any) -> Any:
        if self.criterion == "entropy":
            return self.shannon(*args, **kwds)
        elif self.criterion == "gini":
            return self.gini(*args, **kwds)
        elif self.criterion == "ambiguity":
            return self.ambiguity(*args, **kwds)
        else:
            raise ValueError(f"Unknown impurity criterion: {self.criterion}")

    @staticmethod
    def shannon(labels):
        """Permet de calculer l'entropie de Shannon.

        Renvoie : $\sum_{k=1}^{K} p(c_k|v_j)\log p(c_k|v_j)$.

        \[
            H_S(Y|X) = -\sum{j=1}^{m} p(v_j) \cdot \sum_{k=1}^{K} p(c_k|v_j)\log p(c_k|v_j)
        \]
        """
        label_counts = Counter(labels)
        probabilities = [count / len(labels) for count in label_counts.values()]
        return -sum(p * np.log2(p) for p in probabilities if p > 0)

    @staticmethod
    def gini(labels):
        """Permet de calculer l'indice de diversité de Gini.

        Renvoie : $(1 - \sum_{k=1}^{K} p(c_k | v_j)^2)$.

        \[
            H_G(Y | X) = \sum_{j=1}^{m} p(v_j) \cdot (1 - \sum_{k=1}^{K} p(c_k | v_j)^2)
        \]
        """
        label_counts = Counter(labels)
        probabilities = [count / len(labels) for count in label_counts.values()]
        return 1 - sum(p**2 for p in probabilities)

    @staticmethod
    def ambiguity(labels):
        """Permet de calculer la mesure d'ambiguïté [Yuan & Shaw, 1995].

        Renvoie la mesure de non-spécificité.

        \[
            H_Y(Y|X) = \sum_{j=1}^{m} p(v_j) \cdot g(\Pi(Y|v_j))
        \]

        avec $g$ une mesure de non-spécificité :

        \[
            g(\Pi(C|v_j)) = \sum_{i=2}^{K} \pi_i \cdot (\log(i) - \log(i-1))
        \]

        où $\pi$ est obtenue de la façon suivante :
        1. on ordonne les $p(c_k|v_j) dans l'odre décroissant ;
        2. on définit $\pi_i = \frac{p_i}{p_1}$ pour tout $i = 1, ..., K$.
        """
        label_counts = Counter(labels)
        probabilities = [count / len(labels) for count in label_counts.values()]

        # Calculate π
        probabilities.sort(reverse=True)
        pi = [prob / probabilities[0] for prob in probabilities]

        # Calculate the measure of non-specificity g(Π(C|v_j))
        g = sum(pi[i] * (np.log2(i + 1) - np.log2(i)) for i in range(1, len(pi) + 1))

        return g

In [111]:
from copy import deepcopy
from IPython.display import display, HTML


class SymbolicDecisionTree:
    """Un arbre de décision prenant en compte des données symboliques."""

    def __init__(self, criterion="gini"):
        self.tree = None
        self.criterion = Criterion(criterion)

    class Node:
        """Un noeud de l'arbre de décision symbolique."""

        def __init__(
            self,
            feature=None,
            value=None,
            true_branch=None,
            false_branch=None,
            result=None,
            entropy=None,
        ):
            self.feature = feature
            self.value = value
            self.true_branch = true_branch
            self.false_branch = false_branch
            self.result = result
            self.entropy = entropy

    def fit(self, X, y):
        """Construit l'arbre de décision à partir de l'ensemble d'apprentissage (X, y)."""
        training_data = pd.concat([X, y], axis=1)  # Combine features and labels
        self.tree = self.build_tree(training_data)

    def predict(self, X):
        """Prédit les classes pour des échantillons d'entrée."""
        if isinstance(X, pd.DataFrame):
            predictions = [
                self.predict_single_entry(entry) for _, entry in X.iterrows()
            ]
        elif isinstance(X, pd.Series):
            predictions = self.predict_single_entry(X)
        else:
            raise TypeError(
                f"type {type(X)} not supported in predict, please implement or convert to a pandas object"
            )
        return predictions

    def predict_single_entry(self, entry):
        """Prédit la classe pour une seule entrée."""
        node = self.tree
        while node.result is None:
            if entry[node.feature] == node.value:
                node = node.true_branch
            else:
                node = node.false_branch
        return node.result

    def predict_xai(self, entry):
        """Fournit une explication pour un exemple donné."""
        explanation, _ = self.trace_tree(entry, self.tree, [])
        return explanation

    def find_counterfactuals(self, class_to_exclude):
        """Trouve les règles contre-factuelles de l'arbre pour un exemple donné.

        :param class_to_exclude: La classe de l'exemple à interpréter.
        """
        paths = self.untrace_tree(self.tree, [])
        filtered_paths = [path for path in paths if path[-1] != class_to_exclude]
        return filtered_paths

    def explain_counterfactuals(self, entry, class_to_exclude):
        """Renvoie les règles contre-factuelles, la classe prédite et le nombre de tests
        invalidés pour un exemple donné.

        :param entry: L'exemple à interpréter.
        :param class_to_exclude: La classe de l'exemple à interpréter.
        """
        formatted_path = []
        counterfactuals = self.find_counterfactuals(class_to_exclude)
        unsatisfied_rules = self.count_rules(entry, counterfactuals)

        # Trouve les règles qui minimisent le nombre de tests invalidés par l'exemple
        min_count = min(unsatisfied_rules)
        best_indices = [
            i for i, count in enumerate(unsatisfied_rules) if count == min_count
        ]
        best_rules = [counterfactuals[i] for i in best_indices]

        for path, rule_count in zip(counterfactuals, unsatisfied_rules):
            # formatted_path.append(self.format_path(path[0], path[1]))
            formatted_path.append(
                (
                    [
                        f"{feature} {'!=' if not decision else '=='} {value}"
                        for feature, decision, value in path[0]
                    ],
                    path[1],
                    rule_count,
                )
            )
        return formatted_path

    def explain_notebook(self, entry, class_to_exclude):
        desc = ", ".join(
            [
                f"{feature} = {value}"
                for feature, value in zip(entry.index, entry.values)
            ]
        )
        explanation = f"<p><strong>Description</strong> : {desc}, Classe = {class_to_exclude}</p>"
        # Régle de classification
        prediction = self.predict(entry)
        explanation += f"<p><strong>Règle de classification</strong> : l'exemple est classé {prediction}<br/>"
        classification_rule = self.predict_xai(entry)
        classification_rule = [path.split(" ") for path in classification_rule]
        explanation += self.format_path(classification_rule, class_to_exclude, entry)
        explanation += f"</p><p><strong>Règles contre-exemples</strong> : celles dont la conclusion n'est pas {prediction}<br/>"

        # Règles contre-exemples
        counterfactuals = self.explain_counterfactuals(entry, class_to_exclude)
        for cf in counterfactuals:
            rules = [rule.split(" ") for rule in cf[0]]
            explanation += self.format_path(rules, cf[1], entry)
            explanation += "<br/>"
        explanation += "</p>"

        return display(HTML(explanation))

    @staticmethod
    def format_path(path, result, entry):
        """Formatte un chemin en un human-readable string représentant les règles de
        décision. Les règles qui diffèrent des valeurs d'entrée sont mises en évidence
        en couleur.
        """
        # TODO: en fait c'est faux avec la couleur pcq on a des branches true et false
        # qui prennent pas en compte toutes les valeurs des features
        conditions = []
        for feature, decision, value in path:
            verbe = "est" if decision == "==" else "n'est pas"
            if entry[feature] != value:
                condition = (
                    f"<span style='color: #648FFF;'>{feature} {verbe} {value}</span>"
                )
            else:
                condition = f"{feature} {verbe} {value}"
            conditions.append(condition)

        if len(conditions) > 1:
            conditions_str = ", ".join(conditions[:-1]) + " et " + conditions[-1]
        else:
            conditions_str = conditions[0]

        return f"Si {conditions_str} alors classe {result}."

    def build_tree(self, data):
        # If there's no data, or if all targets are the same,
        # return a leaf node with the result
        if len(data) == 0:
            return self.Node()

        current_uncertainty = self.criterion(data.iloc[:, -1])
        best_gain = 0
        best_criteria = None
        best_sets = None

        feature_count = len(data.columns) - 1  # number of attributes

        for col in range(feature_count):  # for each feature
            feature_values = data.iloc[:, col].unique()  # unique values
            for val in feature_values:  # for each value
                partitioned_data = self.partition(data, data.columns[col], val)

                # Information gain
                p = float(partitioned_data[0].shape[0]) / data.shape[0]
                gain = (
                    current_uncertainty
                    - p * self.criterion(partitioned_data[0].iloc[:, -1])
                    - (1 - p) * self.criterion(partitioned_data[1].iloc[:, -1])
                )

                if (
                    gain > best_gain
                    and len(partitioned_data[0]) > 0
                    and len(partitioned_data[1]) > 0
                ):
                    best_gain = gain
                    best_criteria = (data.columns[col], val)
                    best_sets = partitioned_data

        if best_gain > 0:
            true_branch = self.build_tree(best_sets[0])
            false_branch = self.build_tree(best_sets[1])
            return self.Node(
                feature=best_criteria[0],
                value=best_criteria[1],
                true_branch=true_branch,
                false_branch=false_branch,
                entropy=current_uncertainty,
            )
        else:
            # We're at a leaf, determine the outcome most frequent class
            outcome = data.iloc[:, -1].value_counts().idxmax()
            return self.Node(result=outcome, entropy=current_uncertainty)

    @staticmethod
    def partition(data, feature, value):
        true_data = data[data[feature] == value]
        false_data = data[data[feature] != value]
        return true_data, false_data

    def trace_tree(self, entry, node, explanation=[]):
        """Collecte le chemin mené par l'arbre pour prédire un exemple donné.

        :param entry: L'exemple à interpréter.
        :param node: Le noeud actuel de l'arbre.
        :param explanation: Le chemin mené par l'arbre.
        """
        if node.result is not None:
            return explanation, node.result

        if entry[node.feature] == node.value:
            explanation.append(f"{node.feature} == {node.value}")
            return self.trace_tree(entry, node.true_branch, explanation)
        else:
            explanation.append(f"{node.feature} != {node.value}")
            return self.trace_tree(entry, node.false_branch, explanation)

    def untrace_tree(self, node, path=[]):
        """Traverse récursivement l'arbre de décision pour collecter les chemins menant
        aux noeuds feuilles.

        :param node: Le noeud actuel de l'arbre.
        :param path: Le chemin emprunté pour atteindre le nœud courant.
        :return: Une liste de chemins, où chaque chemin est une liste de tuples
        contenant (feature, decision, value) menant à une feuille.
        """
        if node.result is not None:
            # Return the path with the result at the end
            return [(path, node.result)]

        paths = []
        if node.false_branch is not None:
            # Traverse the false branch
            false_path = deepcopy(path)
            false_path.append((node.feature, False, node.value))
            paths.extend(self.untrace_tree(node.false_branch, false_path))

        if node.true_branch is not None:
            # Traverse the true branch
            true_path = deepcopy(path)
            true_path.append((node.feature, True, node.value))
            paths.extend(self.untrace_tree(node.true_branch, true_path))

        return paths

    def count_rules(self, entry, counterfactuals):
        """Compte le nombre de tests invalidés des règles contre-factuelles pour un
        échantillon donné.

        :param entry: L'exemple à interpréter.
        :param counterfactuals: L'ensemble les règles contre-factuelles.
        """
        unsatisfied_rules_count = []
        for path in counterfactuals:
            count = sum(
                1
                for feature, decision, value in path[0]
                if (decision is False and entry[feature] == value)
                or (decision is True and entry[feature] != value)
            )
            unsatisfied_rules_count.append(count)
        return unsatisfied_rules_count

    def display_tree(self, node=None, indent="", branch=""):
        """Visualise la structure arborescente de l'arbre dans un format clair et
        organisé.

        Format de sortie :
        "├──" indique un noeud qui confirme la condition.
        "└──" indique un noeud qui ne confirme pas la condition.
        Les feuilles affichent le résultat de la classe.
        Les noeuds internes affichent le critère de décision et son entropie.

        Exemple :
        feature_name1 == feature_value1? (Entropy = 0.1234)
        ├── Class : Class_A
        └── feature_name2 == feature_value2? (Entropy = 0.5678)
            ├── Class : Class_B
            └── Class : Class_C

        Dans cet exemple :
        - Si feature_name1 == feature_value1, l'arbre de décision le classe comme
        'Class_A'.
        - Sinon, il vérifie si feature_name2 == feature_value2 ; si c'est vrai, il est
        classé comme 'Class_B', et si c'est faux, il est classé comme 'Class_C'.
        """
        if node is None:
            node = self.tree

        # Base case: if it's a leaf node, print the result and return
        if node.result is not None:
            print(f"{indent}{branch}Class: {node.result}")
            return

        # Print the criterion for the current node
        print(
            f"{indent}{branch}{node.feature} == {node.value}? (Entropy = {node.entropy:.4f})"
        )

        # Recursive case: print the true and false branches
        new_indent = indent + ("│   " if branch == "├── " else "    ")

        # For true branch, use '├──'
        self.display_tree(node.true_branch, new_indent, "├── ")

        # For false branch, use '└──'
        self.display_tree(node.false_branch, new_indent, "└── ")

In [112]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score


# Diviser les données en ensembles d'entraînement et de test
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42
)

# Initialiser et entraîner l'arbre de décision symbolique
tree_xai = SymbolicDecisionTree()
tree_xai.fit(X_train, y_train)

# Évaluer la précision de l'arbre de décision
predictions = tree_xai.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy: {accuracy * 100:.4f}%")

Accuracy: 100.0000%


In [113]:
tree_xai.display_tree()

Nationalite == Suisse? (Entropy = 0.4800)
    ├── Class: -1
    └── Majeur? == oui? (Entropy = 0.4444)
        ├── Class: 1
        └── Class: -1


In [114]:
sample, sample_label = X.iloc[0], y.iloc[0]

tree_xai.predict_xai(sample)

['Nationalite != Suisse', 'Majeur? == oui']

In [115]:
tree_xai.explain_counterfactuals(sample, sample_label)

[(['Nationalite != Suisse', 'Majeur? != oui'], -1, 1),
 (['Nationalite == Suisse'], -1, 1)]

In [116]:
tree_xai.explain_notebook(sample, sample_label)

# Génération de bases d’apprentissage et d’explications


### Méthode LIME


In [7]:
# from copy import copy
# from sklearn.linear_model import Ridge


# # Nom des fonctions issus de la notation du cours
# class CustomExplainer:
#     def __init__(self, class_names, card_Z=10000):
#         self.class_names = class_names
#         self.card_Z = card_Z

#     def generate_z(self, x_ref):
#         Z = []  # list of all new exemple
#         weights = []
#         card_x_ref = len(x_ref)
#         sigma = int(
#             0.75 * np.sqrt(card_x_ref)
#         )  # on change maximum sigma nombre de mots dans la phrase
#         for _ in range(self.card_Z):
#             z_i = copy(x_ref)
#             nb_word_to_change = np.random.randint(0, sigma)
#             index_to_remove = np.random.randint(0, card_x_ref, size=nb_word_to_change)
#             for i in index_to_remove:
#                 splited_text_tmp[i] = ""
#             Z.append(" ".join(splited_text_tmp))
#             weights.append(1 - nb_word_to_change / card_x_ref)
#         return Z, weights

#     def explain_instance(self, text_instance, classifier_fn, num_features=None):
#         modified_text_instances, weights = self.generate_z(text_instance)
#         prob = classifier_fn(modified_text_instances).max(1)
#         clf = Ridge(fit_intercept=True)
#         clf.fit(
#             self.vectorizer.transform(modified_text_instances),
#             prob,
#             sample_weight=weights,
#         )
#         return pd.Series(clf.coef_, index=vectorizer.get_feature_names_out())


# explainer = CustomLimeTextExplainer(
#     class_names=newsgroups_test.target_names, vectorizer=vectorizer
# )
# coef = explainer.explain_instance(
#     newsgroups_test.data[idx], c.predict_proba, num_features=6
# )

## Méthode LORE


In [8]:
from icecream import ic
from tqdm import tqdm
from distances import simple_match_distance


class CustomExplainer:
    def __init__(
        self,
        test_set: pd.DataFrame,
        model,
        distance,
        pc=0.5,
        pm=0.5,
        nb_genetics=100,
        N=1000,
    ):
        self.N = N
        self.pc = pc
        self.pm = pm
        self.nb_genetics = nb_genetics
        self.model = model
        self.distance = distance

        self.value_distribution = {}
        for col in test_set.columns:
            vc = test_set[col].value_counts(normalize=True)
            self.value_distribution[col] = (vc.index, vc.values)

    def geneticNeigh(self, x: pd.Series, same: bool):
        if same:
            fitness = self.fitness_same
        else:
            fitness = self.fitness_dif

        P = pd.DataFrame([x] * self.N)  # Population init
        M = self.evaluate(x, P, fitness)
        top_M = np.flip(M.argsort())
        for _ in tqdm(range(self.nb_genetics)):
            P = P.iloc[top_M].head(self.N)
            P = self.crossover(P)
            P = self.mutate(P)

            M = self.evaluate(x, P, fitness)
            top_M = np.flip(M.argsort())
        return P

    def evaluate(self, x, P: pd.DataFrame, fitness):
        x_hat = self.model.predict(x)
        M_list = []
        for _, z_i in P.iterrows():
            M_list.append(fitness(x, x_hat, z_i))
        return np.array(M_list)

    def fitness_same(self, x, x_hat, z_i):
        z_i_hat = self.model.predict(z_i)
        return (z_i_hat == x_hat) + (1 - self.distance(x, z_i)) - ((x == z_i).all())

    def fitness_dif(self, x, x_hat, z_i):
        z_i_hat = self.model.predict(z_i)
        return (z_i_hat != x_hat) + (1 - self.distance(x, z_i)) - ((x == z_i).all())

    def mutate(
        self,
        P: pd.DataFrame,
    ):
        idx_to_mutate = np.random.randint(0, len(P), size=int(len(P) * self.pm))
        mutations = P.iloc[idx_to_mutate].apply(self.mutate_one, axis=1)
        P_mutated = pd.concat((mutations, P))
        return P_mutated

    def mutate_one(self, parent):
        """
        Ici je fais deux mutation, il est possible d'en faire qu'une
        Le text du diapo en fait une, mais l'image en fait deux
        """
        attribute_list = parent.index
        att_1 = np.random.choice(attribute_list)
        att_2 = np.random.choice(attribute_list.drop(att_1))
        children = parent.copy()
        children[att_1] = np.random.choice(
            self.value_distribution[att_1][0], p=self.value_distribution[att_1][1]
        )
        children[att_2] = np.random.choice(
            self.value_distribution[att_2][0], p=self.value_distribution[att_2][1]
        )
        return children

    def crossover(self, P: pd.DataFrame):
        nb_parents = int(len(P) * self.pm)
        idx_parents = np.random.randint(0, len(P), size=(2, nb_parents // 2))
        crossovers = pd.concat(
            (
                self.crossover_one(P.iloc[idx_parents[0, i]], P.iloc[idx_parents[1, i]])
                for i in range(nb_parents // 2)
            )
        )
        P_mutated = pd.concat((crossovers, P))
        return P_mutated

    @staticmethod
    def crossover_one(parent_1, parent_2):
        attribute_list = parent_1.index
        att_1 = np.random.choice(attribute_list)
        att_2 = np.random.choice(
            attribute_list.drop(att_1)
        )  # on veut deux attribut différent
        att_list = [att_1, att_2]

        children_1 = parent_1.copy()
        children_1[att_list] = parent_2[att_list]

        children_2 = parent_2.copy()
        children_2[att_list] = parent_1[att_list]

        return pd.DataFrame((children_1, children_2))


custom_explainer = CustomExplainer(X, tree_xai, simple_match_distance)

In [9]:
Z_true = custom_explainer.geneticNeigh(X.iloc[0], True)

  0%|          | 0/100 [00:00<?, ?it/s]

100%|██████████| 100/100 [01:26<00:00,  1.15it/s]


In [10]:
Z_false = custom_explainer.geneticNeigh(X.iloc[0], False)

100%|██████████| 100/100 [01:25<00:00,  1.17it/s]


In [11]:
Z = pd.concat((Z_true, Z_false)).reset_index(drop=True)
Z_hat = pd.Series(tree_xai.predict(Z), name="Label")
pd.concat((Z, Z_hat), axis=1).iloc[:, -1].value_counts().idxmax()

1

Bon c'est super lent, aussi j'ai codé comme ça me venais un peu, au pire on utilisera l'[implémentation](https://github.com/riccotti/LORE/blob/master/neighbor_generator.py) mais c'est codé differement j'crois

Je sais pas c'est quoi les ordre de grandeur des hyprparamètre de l'algo génétique aussi


In [12]:
X_train

Unnamed: 0,Adresse,Majeur?,Nationalite
7,Montpellier,oui,Suisse
2,Montpellier,oui,Italien
4,Strasbourg,non,Italien
3,Paris,oui,Suisse
6,Strasbourg,oui,Francais


In [13]:
Z = pd.concat((Z_true, Z_false)).reset_index(drop=True)
Z_hat = pd.Series(tree_xai.predict(Z), name="Label")
# Z, _, Z_hat, _ = train_test_split(
#     X, y, test_size=0.01, random_state=42
# ) # gros bricolage ici, j'arrive pas à faire passer
# Initialiser et entraîner l'arbre de décision symbolique
substitution_tree = SymbolicDecisionTree()
substitution_tree.fit(Z, Z_hat)

sample = X.iloc[0]
class_to_not_keep = y.iloc[0]
print(sample)
print(class_to_not_keep)

Adresse           Paris
Majeur?             oui
Nationalite    Francais
Name: 0, dtype: object
1


In [14]:
counterfactuals = substitution_tree.find_counterfactuals(class_to_not_keep)
n_r_list = substitution_tree.count_rules(sample, counterfactuals)
np.argwhere(n_r_list == np.amax(n_r_list))

array([[0],
       [1]])

In [15]:
substitution_tree.explain_counterfactuals(sample, class_to_not_keep)

[(['Majeur? != non', 'Nationalite == Suisse'], 1), (['Majeur? == non'], 1)]

In [16]:
substitution_tree.display_tree()

Majeur? == non? (Entropy = 0.5000)
    ├── Class: -1
    └── Nationalite == Suisse? (Entropy = 0.1839)
        ├── Class: -1
        └── Class: 1
