In [1]:
import pandas as pd
from sklearn.metrics import roc_auc_score
import xgboost as xgb
import numpy as np
from collections import defaultdict
import scipy
import matplotlib.pyplot as plt



In [2]:
test_merged = pd.read_csv('test_merged.csv')
train_merged = pd.read_csv('train_merged.csv')

In [3]:
X_train = train_merged.drop(['TARGET'], axis = 1)
X_test = test_merged.copy()
y_train = train_merged.TARGET.values
print('X_train shape:', X_train.shape)

X_train shape: (307511, 128)


### Обучаем модель

In [4]:
xgb_model = xgb.XGBClassifier(objective="binary:logistic", random_state=42, eval_metric="auc", 
                              max_depth=4,learning_rate=0.277, gamma=0.382)
xgb_model.fit(X_train, y_train)

XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
       colsample_bynode=1, colsample_bytree=1, eval_metric='auc',
       gamma=0.382, learning_rate=0.277, max_delta_step=0, max_depth=4,
       min_child_weight=1, missing=None, n_estimators=100, n_jobs=1,
       nthread=None, objective='binary:logistic', random_state=42,
       reg_alpha=0, reg_lambda=1, scale_pos_weight=1, seed=None,
       silent=None, subsample=1, verbosity=1)

In [5]:
print(roc_auc_score(y_train, xgb_model.predict_proba(X_train)[:,1]))

0.7662371085332839


In [6]:
xgb_array = xgb.XGBClassifier(objective="binary:logistic", random_state=42, eval_metric="auc", 
                              max_depth=4,learning_rate=0.277, gamma=0.382)
xgb_array.fit(X_train.values, y_train)

XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
       colsample_bynode=1, colsample_bytree=1, eval_metric='auc',
       gamma=0.382, learning_rate=0.277, max_delta_step=0, max_depth=4,
       min_child_weight=1, missing=None, n_estimators=100, n_jobs=1,
       nthread=None, objective='binary:logistic', random_state=42,
       reg_alpha=0, reg_lambda=1, scale_pos_weight=1, seed=None,
       silent=None, subsample=1, verbosity=1)

### Интерпретатор

In [32]:
import shap
import numpy as np
import matplotlib.pyplot as plt
from skater.model import InMemoryModel
from skater.core.explanations import Interpretation
from skater.core.local_interpretation.lime.lime_tabular import LimeTabularExplainer
from skater.util.dataops import show_in_notebook
import types

class BaseInterpretator:
    """
    Базовый класс интерпретатора
    """

    def __init__(self, model, objective = 'classification', algorithm = 'boosting'):
        """
        Создаёт объект интерпретатора
        :type algorithm: Алгоритм модели. Допустимые значения: boosting, random_forest
        :type objective: Тип целевой переменной в модели. Допустимые значения: classification, regression
        :param model: Модель для интерпретации
        """
        if objective not in ['classification', 'regression']:
            raise BaseException('Unknown Objective')
        if algorithm not in ['boosting', 'random_forest']:
            raise BaseException('Unknown algorithm')

        self.__model = model
        self.__shap_explainer = None
        self.__skater_explainer = None
        self.__annotated_model = None

        self.__objective = objective
        self.__algo = algorithm
    
    def fit_shap(self):
        self.__shap_explainer = shap.TreeExplainer(self.__model)
        return

    def shap(self, data, type = 'summary_plot', num_features = None):
        """
        Плейсхолдер для метода интепретации
        :param type: Тип графика
        :param data: Данные, на которых построенна модель. Используются для отдельных видоп интепретации
        :return: Возвращает результат интепретации
        """
        # Проверка параметров
        if self.__shap_explainer is None:
            raise BaseException("SHAP explainer is not fitted. Run fit_shap at first")

        if self.__algo == "random_forest":
            shap_values = self.__shap_explainer.shap_values(data)[1]
            expected_value = self.__shap_explainer.expected_value[1]
        else:
            shap_values = self.__shap_explainer.shap_values(data)
            expected_value = self.__shap_explainer.expected_value

        if type == 'summary_plot':
            return shap.summary_plot(shap_values, data, max_display = num_features)
        elif type == 'summary_bar_plot':
            return shap.summary_plot(shap_values, data, plot_type='bar', max_display = num_features)
        elif type == 'individual_plot':
            shap.initjs()
            return shap.force_plot(expected_value, shap_values, data)
        else:
            raise BaseException('Unknown SHAP plot type')
        
    def fit_skater(self, data):
        """
        :param data: Набор данных
        """
        self.__skater_explainer = Interpretation(data, feature_names=data.columns)

        if self.__objective == 'classification':
            self.__annotated_model = InMemoryModel(self.__model.predict_proba, examples=data)
        elif self.__objective == 'regression':
            self.__annotated_model = InMemoryModel(self.__model.predict, examples=data)
        
    def pdp(self, features, grid_resolution = 30, n_samples=10000):
        """
        Возврщает график PDP
        :param features: tuple из 1 или 2 фичей
        :param grid_resolution: Количество ячеек по каждой из осей
        :param n_samples: The number of samples to use from the original dataset
        :return: Возвращает график PDP
        """

        if self.__skater_explainer is None or self.__annotated_model is None:
            raise BaseException("Skater explainer is not fitted. Run fit_skater at first")

        pdp_features = [features]

        return self.__skater_explainer.partial_dependence.plot_partial_dependence(pdp_features,
                                                       self.__annotated_model,
                                                       grid_resolution=grid_resolution,
                                                       n_samples=n_samples,
                                                       n_jobs=-1)
        
    def analyze_voters(self, obj, figsize=[10, 7]):
        """
        Проводит анализ голосвания деревьев в лесу
        :param obj: Анализируемое наблюдение
        :param figsize: Размер выходного графика
        :return: Результаты голосования деревьев
        """
        if self.__algo != 'random_forest':
            raise BaseException("Can be used only for Random Forest")

        def get_voters(obj):
            predicted_pobas = list()

            for est in self.__model.estimators_:
                probas = est.predict_proba(obj)
                predicted_pobas.append([p[1] for p in probas][0])
            return predicted_pobas


        predicted_pobas = get_voters(obj)
        mean_pred = np.mean(predicted_pobas)
        std_pred = np.std(predicted_pobas)

        fig = plt.figure(figsize=figsize)
        plt.hlines(mean_pred, xmin=0, xmax=len(predicted_pobas), label='mean prediction')
        bar_char = plt.bar(x=list(range(len(predicted_pobas))), height=predicted_pobas)
        cum_vote = plt.plot(sorted(predicted_pobas), c='red', label='cum votes')
        plt.legend()

        return predicted_pobas, bar_char, cum_vote
    
    def get_decision_rules(self, X_train, y_train, filename):
        """
        ВАЖНО! Работает только для обучающей выборки
        :X_train: DataFrame, 
        :y_train: Series or numpy array, вектор таргетов
        """
        
        surrogate_explainer = self.__skater_explainer.tree_surrogate(oracle=self.__annotated_model, seed=33)
        f1 = surrogate_explainer.fit(X_train, y_train, use_oracle=True, prune='pre', scorer_type='f1')
        print('F1 score for the surrogate tree: ', f1)

        def plot_tree_new(self, features_names, colors=None, 
                                         enable_node_id=True, random_state=0, 
                                         file_name=filename,
                                          show_img=False, fig_size=(20, 8)):
            """ Visualizes the decision policies of the surrogate tree.
            """
            self.feature_names = features_names
            graph_inst = plot_tree(self.__model, self.__mfodel_type, feature_names=self.feature_names, color_list=colors,
                                       class_names=self.class_names, enable_node_id=enable_node_id, seed=random_state)
            f_name = "interpretable_tree.png" if file_name is None else file_name
            graph_inst.write_png(f_name)

            try:
                import matplotlib
                matplotlib.use('agg')
                import matplotlib.pyplot as plt
            except ImportError:
                raise exceptions.MatplotlibUnavailableError("Matplotlib is required but unavailable on the system.")
            except RuntimeError:
                raise exceptions.MatplotlibDisplayError("Matplotlib unable to open display")

            if show_img:
                plt.rcParams["figure.figsize"] = fig_size
                img = plt.imread(f_name)
                if self.__model_type == 'regressor':
                    cax = plt.imshow(img, cmap=plt.cm.get_cmap(graph_inst.get_colorscheme()))
                    plt.colorbar(cax)
                else:
                    plt.imshow(img)
            return graph_inst
        
        
        surrogate_explainer.plot_tree = types.MethodType(plot_tree_new, surrogate_explainer)
        surrogate_explainer.plot_tree(X_train.columns)
        
        show_in_notebook(filename, width=1200, height=800);
    
    def lime(self, data, index_example, class_names = None):
        """
        Важно! Для LIME модель должна быть обучена на numpy array
        :data: DataFrame, датасет с исходными данными
        :class_names: имена классов 
        :index_example: list, номер индекса объекта, который хотим интерпретировать
        """
        #принимает в качестве данных только numpy array
        exp = LimeTabularExplainer(data.values, feature_names=data.columns, discretize_continuous=True, 
                           class_names=class_names)
        #if type(index_example) 
        for i in index_example:
            predictions = xgb_array.predict_proba(data)
            print('Predicted:', predictions[i])
            exp.explain_instance(X_train.iloc[i].values, self.__model.predict_proba).show_in_notebook()
        

In [33]:
interp = BaseInterpretator(xgb_model)
interp.fit_skater(X_train)
interp.get_decision_rules(X_train, y_train, 'test_tree_sur.png')

KeyboardInterrupt: 

In [45]:
interpreter = Interpretation(training_data=X_train, training_labels=y_train, feature_names=X_train.columns)
im_model = InMemoryModel(xgb_model.predict_proba, examples=X_train)

In [46]:
surrogate_explainer = interpreter.tree_surrogate(oracle=im_model, seed=33)
f1 = surrogate_explainer.fit(X_train, y_train, use_oracle=True, prune='pre', scorer_type='f1')
print('F1 score for the surrogate tree: ', f1)

2021-05-11 23:50:33,670 - skater.core.global_interpretation.tree_surrogate - INFO - pre pruning applied ...
2021-05-11 23:50:33,671 - skater.core.global_interpretation.tree_surrogate - INFO - Scorer used f1-score
2021-05-11 23:54:43,358 - skater.core.global_interpretation.tree_surrogate - INFO - Done generating prediction using the surrogate, shape (307511, 2)
2021-05-11 23:54:43,472 - skater.core.global_interpretation.tree_surrogate - INFO - Done scoring, surrogate score 0.931; oracle score 0.883


F1 score for the surrogate tree:  -0.048


In [49]:
class A:
    def __init__(self):
        self.num = 11
        pass

a = A()

def my_num(self):
    print(self.num)
    
a.my_method = types.MethodType( my_num, a )
a.my_method()

11


In [48]:
def plot_tree_upgrade(self, features_names, file_name='', colors=None, 
                                         enable_node_id=True, random_state=0, 
                                          show_img=False, fig_size=(20, 8)):
            """ Visualizes the decision policies of the surrogate tree.
            """
            self.feature_names = features_names
            graph_inst = plot_tree(self.__model, self.__mfodel_type, feature_names=self.feature_names, color_list=colors,
                                       class_names=self.class_names, enable_node_id=enable_node_id, seed=random_state)
            f_name = "interpretable_tree.png" if file_name is None else file_name
            graph_inst.write_png(f_name)

            try:
                import matplotlib
                matplotlib.use('agg')
                import matplotlib.pyplot as plt
            except ImportError:
                raise exceptions.MatplotlibUnavailableError("Matplotlib is required but unavailable on the system.")
            except RuntimeError:
                raise exceptions.MatplotlibDisplayError("Matplotlib unable to open display")

            if show_img:
                plt.rcParams["figure.figsize"] = fig_size
                img = plt.imread(f_name)
                if self.__model_type == 'regressor':
                    cax = plt.imshow(img, cmap=plt.cm.get_cmap(graph_inst.get_colorscheme()))
                    plt.colorbar(cax)
                else:
                    plt.imshow(img)
            return graph_inst
        
        
surrogate_explainer.plot_tree = types.MethodType(plot_tree_upgrade, surrogate_explainer)
surrogate_explainer.plot_tree(X_train.columns, file_name='test_tree_sur.png')
        
show_in_notebook('test_tree_sur.png', width=1200, height=800);

AttributeError: 'TreeSurrogate' object has no attribute '__model'

In [None]:
from skater.util.dataops import show_in_notebook

# 'Low Quality' (score <= 5) class in pink and 'High Quality' class (score > 5) in red
surrogate_explainer.plot_global_decisions(X_train.columns, colors=['green', 'red'], file_name='test_tree_sur.png', fig_size=(8,8))

show_in_notebook('test_tree_sur.png', width=1200, height=800);