In [1]:
from Tools import *

In [7]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
import shap
from sklearn.base import BaseEstimator
from sklearn.model_selection import BaseCrossValidator
from typing import Callable, Optional, List, Dict, Tuple

class Kraken:
    """
    Based on the original idea of Mr. Patekha and proudly implemented in the author's vision
    """

    def __init__(
        self, 
        estimator: BaseEstimator, 
        cv: BaseCrossValidator, 
        metric: Callable, 
        meta_info_name: str):
        """
        Initialize Kraken class with given estimator, cross-validator and metric.
        
        Args:
            estimator (BaseEstimator): Estimator object.
            cv (BaseCrossValidator): Cross-validator object.
            metric (Callable): Metric function to evaluate model.
            meta_info_name (str): name for meta_info file
        """
        self.estimator = estimator
        self.cv = cv
        self.metric = metric
        self.meta_info_name = meta_info_name
        
        # temporary data
        self.dict_fold_importances = None
        self.fe_dict = None
        self.rank_dict = None

    def get_rank_dict(
        self, 
        X: np.ndarray, 
        y: np.ndarray, 
        list_of_vars: List[str], 
        group_dt: Optional[np.ndarray] = None):
        """
        Compute SHAP values and create a dictionary with ranked features by their absolute SHAP value.
        
        Args:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target vector.
            list_of_vars (List[str]): List of feature names.
            group_dt (Optional[np.ndarray]): Group labels for the samples.
        
        Returns:
            None.
        """
        self.dict_fold_importances = {'Feature': list_of_vars, 'abs_shap': np.zeros(len(list_of_vars))}
        for fold, (train_idx, val_idx) in enumerate(self.cv.split(X, y, groups=group_dt), 1):
            X_train, X_test = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[val_idx]
            self.estimator.fit(X_train[list_of_vars], y_train)
            explainer = shap.Explainer(self.estimator, X_train[list_of_vars])
            shap_values = explainer(X_test[list_of_vars])
            self.dict_fold_importances['abs_shap'] += np.abs(shap_values.values).mean(axis=0)

        self.fe_dict = {key: value for key, value in zip(self.dict_fold_importances['Feature'], self.dict_fold_importances['abs_shap'])}
        self.rank_dict = {key: rank for rank, key in enumerate(sorted(self.fe_dict, key=self.fe_dict.get, reverse=True), 1)}

    def get_cross_val_score(
        self, 
        X: np.ndarray, 
        y: np.ndarray, 
        var: str, 
        old_scores: np.ndarray, 
        selected_vars: Optional[List[str]] = None, 
        group_dt: Optional[np.ndarray] = None, 
        round_num: int = 3) -> Tuple[np.ndarray, int, float]:
        """
        Compute cross-validation scores for a given variable.
        
        Args:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target vector.
            var (str): Feature to evaluate.
            old_scores (np.ndarray): Old cross-validation scores.
            selected_vars (Optional[List[str]], optional): List of already selected features. Defaults to None.
            group_dt (Optional[np.ndarray], optional): Group labels for the samples. Defaults to None.
            round_num (int, optional): Number of decimal places for the scores. Defaults to 3.
        
        Returns:
            Tuple[np.ndarray, int, float]: Cross-validation scores, sum of the score differences between current and old scores and the mean cross-validation score.
        """
        if selected_vars is None:
            selected_vars = []
        selected_vars.append(var)
        list_scores = []

        for fold, (train_idx, val_idx) in enumerate(self.cv.split(X, y, groups=group_dt), 1):
            X_train, X_test = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[val_idx]
            self.estimator.fit(X_train[selected_vars], y_train)
            preds = self.estimator.predict_proba(X_test[selected_vars])[:, 1]
            error = round(self.metric(y_test, preds), round_num)
            list_scores.append(error)
        fold_scores = np.array(list_scores)
        summa = sum(fold_scores - old_scores < 0) * 1 + sum(fold_scores - old_scores > 0) * -1
        mean_cv_score = round(np.mean(fold_scores), round_num)
        return fold_scores, summa, mean_cv_score

    def get_vars(
        self, 
        X: np.ndarray, 
        y: np.ndarray, 
        early_stopping_rounds: int = 30, 
        summa_approve: int = 1, 
        best_mean_cv: int = 10**10, 
        vars_in_model: Optional[List] = list(), 
        group_dt: Optional[np.ndarray] = None, 
        round_num: int = 3, 
        old_scores: Optional[np.ndarray] = None) -> List[str]:
        """
        Select variables based on their SHAP values and cross-validation scores.
        
        Args:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target vector.
            early_stopping_rounds (int, optional): Number of iterations without improvement to stop the selection.
                Defaults to 30.
            summa_approve (int, optional): Threshold for the sum of score differences to approve the variable. Defaults to 1.
            best_mean_cv (int, optional): Threshold for the mean cross-validation score to approve the variable. Defaults to 10**10.
            vars_in_model (List[str], optional): List of initial variables. Defaults to [].
            group_dt (Optional[np.ndarray], optional): Group labels for the samples. Defaults to None.
            round_num (int, optional): Number of decimal places for the scores. Defaults to 3.
        """
        self.round_num = round_num
        if old_scores is None:
            old_scores = np.array([0.5 for _ in range(self.cv.get_n_splits())])
        iteration_step = 0
        the_list_from_which_we_take_vars = [i for i in list(self.rank_dict.keys()) if i not in vars_in_model]
        feature_was_added = True

        while feature_was_added:
            iteration_step = 0
            var_for_add = ''
            if iteration_step > 0:
                print('начинаем след этап', best_mean_cv)
            else:
                print('запуск первого шага')
            best_positive_groups = summa_approve
            for var in the_list_from_which_we_take_vars:
                iteration_step += 1
                if iteration_step > early_stopping_rounds:
                    print(f'early_stopping_rounds {early_stopping_rounds}')
                    break
                fold_scores, summa, mean_cv_score = self.get_cross_val_score(X=X, y=y, var=var, old_scores=old_scores, selected_vars=vars_in_model.copy(), group_dt=group_dt, round_num=self.round_num)
                if (summa > best_positive_groups) or (summa == best_positive_groups and mean_cv_score < best_mean_cv):
                    best_positive_groups = summa
                    best_mean_cv = mean_cv_score
                    old_scores = fold_scores
                    var_for_add = var
                    iteration_step = 0
                    print(f'new var_for_add ! {var_for_add}')

            if var_for_add != '':
                vars_in_model.append(var_for_add)
                the_list_from_which_we_take_vars.remove(var_for_add)
                print('едем дальше')
                print('в итоге получили список', vars_in_model)
                list_meta = ['vars_list'] + [best_positive_groups] + [best_mean_cv] + old_scores.tolist()
                df_meta = pd.DataFrame(list_meta).T
                df_meta.columns = ['vars', 'summa', 'mean_cv_scores'] + ['cv' + str(i) for i in range(1, self.cv.get_n_splits() + 1)]
                df_meta.at[0, 'vars'] = vars_in_model.copy()
                try:
                    df_meta_info = pd.concat([df_meta_info, df_meta])
                except:
                    df_meta_info = df_meta.copy()
                df_meta_info.to_csv(f'df_meta_info_{self.meta_info_name}.csv')
                continue
            else:
                feature_was_added = False

        print('мы сошлись')
        print(vars_in_model)
        print(best_mean_cv)
        return vars_in_model
    
    def reset_temp_data(self):
        """
        Reset temporary data stored in the class attributes.
        """
        self.dict_fold_importances = None
        self.fe_dict = None
        self.rank_dict = None



In [8]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score

# Установка случайного зерна для воспроизводимости
np.random.seed(42)

# Генерация данных
n_samples = 100
feature1 = np.random.rand(n_samples)
feature2 = np.random.rand(n_samples)
feature3 = np.random.rand(n_samples)
feature4 = np.random.rand(n_samples)  # Незначимый признак
feature5 = np.random.rand(n_samples)  # Незначимый признак
noise = np.random.normal(0, 0.1, n_samples)  # Небольшой шум

# Вычисление целевого класса
linear_combination = 0.3 * feature1 + 0.5 * feature2 + 0.2 * feature3 + noise
target = (linear_combination > 0.5).astype(int)

# Создание DataFrame
data = pd.DataFrame({
    'Feature1': feature1,
    'Feature2': feature2,
    'Feature3': feature3,
    'Feature4': feature4,
    'Feature5': feature5,
    'Target': target
})

# Определение признаков и целевой переменной
X = data.drop(columns='Target')
y = data['Target']

# Определение модели, кросс-валидатора и метрики
estimator = RandomForestClassifier(random_state=42)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
def custom()
metric = 

In [36]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
import shap
from sklearn.base import BaseEstimator
from sklearn.model_selection import BaseCrossValidator
from typing import Callable, Optional, List, Dict, Tuple

class Kraken:
    """
    Based on the original idea of Mr. Patekha and proudly implemented in the author's vision
    """

    def __init__(
        self, 
        estimator: BaseEstimator, 
        cv: BaseCrossValidator, 
        metric: Callable, 
        meta_info_name: str):
        """
        Initialize Kraken class with given estimator, cross-validator and metric.
        
        Args:
            estimator (BaseEstimator): Estimator object.
            cv (BaseCrossValidator): Cross-validator object.
            metric (Callable): Metric function to evaluate model.
            meta_info_name (str): name for meta_info file
        """
        self.estimator = estimator
        self.cv = cv
        self.metric = metric
        self.meta_info_name = meta_info_name
        
        # temporary data
        self.dict_fold_importances = None
        self.fe_dict = None
        self.rank_dict = None

    def get_rank_dict(
        self, 
        X: np.ndarray, 
        y: np.ndarray, 
        list_of_vars: List[str], 
        group_dt: Optional[np.ndarray] = None):
        """
        Compute SHAP values and create a dictionary with ranked features by their absolute SHAP value.

        Args:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target vector.
            list_of_vars (List[str]): List of feature names.
            group_dt (Optional[np.ndarray]): Group labels for the samples.

        Returns:
            None.
        """
        self.dict_fold_importances = {'Feature': list_of_vars, 'abs_shap': np.zeros(len(list_of_vars))}
        for fold, (train_idx, val_idx) in enumerate(self.cv.split(X, y, groups=group_dt), 1):
            X_train, X_test = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[val_idx]
            self.estimator.fit(X_train[list_of_vars], y_train)
            explainer = shap.Explainer(self.estimator, X_train[list_of_vars])
            shap_values = explainer(X_test[list_of_vars])

            # Проверяем форму shap_values.values
            print(f"shap_values.values.shape: {shap_values.values.shape}")  # Добавлено для отладки
            if len(shap_values.values.shape) == 3:
                shap_values_class1 = shap_values.values[:, :, 1]  # Выбираем класс 1
            else:
                shap_values_class1 = shap_values.values

            # Убедимся, что размеры совпадают
            print(f"shap_values_class1.shape: {shap_values_class1.shape}, len(list_of_vars): {len(list_of_vars)}")  # Добавлено для отладки
            if shap_values_class1.shape[1] != len(list_of_vars):
                print(f"shap_values_class1.shape: {shap_values_class1.shape}, len(list_of_vars): {len(list_of_vars)}")
                raise ValueError("Shape of SHAP values does not match the number of features.")

            self.dict_fold_importances['abs_shap'] += np.abs(shap_values_class1).mean(axis=0)

        self.fe_dict = {key: value for key, value in zip(self.dict_fold_importances['Feature'], self.dict_fold_importances['abs_shap'])}
        self.rank_dict = {key: rank for rank, key in enumerate(sorted(self.fe_dict, key=self.fe_dict.get, reverse=True), 1)}



    def get_cross_val_score(
        self, 
        X: np.ndarray, 
        y: np.ndarray, 
        var: str, 
        old_scores: np.ndarray, 
        selected_vars: Optional[List[str]] = None, 
        group_dt: Optional[np.ndarray] = None, 
        round_num: int = 3) -> Tuple[np.ndarray, int, float]:
        """
        Compute cross-validation scores for a given variable.
        
        Args:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target vector.
            var (str): Feature to evaluate.
            old_scores (np.ndarray): Old cross-validation scores.
            selected_vars (Optional[List[str]], optional): List of already selected features. Defaults to None.
            group_dt (Optional[np.ndarray], optional): Group labels for the samples. Defaults to None.
            round_num (int, optional): Number of decimal places for the scores. Defaults to 3.
        
        Returns:
            Tuple[np.ndarray, int, float]: Cross-validation scores, sum of the score differences between current and old scores and the mean 
            cross-validation score.
        """
        if selected_vars is None:
            selected_vars = []
        selected_vars.append(var)
        list_scores = []

        for fold, (train_idx, val_idx) in enumerate(self.cv.split(X, y, groups=group_dt), 1):
            X_train, X_test = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[val_idx]
            self.estimator.fit(X_train[selected_vars], y_train)
            preds = self.estimator.predict_proba(X_test[selected_vars])[:, 1]
            error = round(self.metric(y_test, preds), round_num)
            list_scores.append(error)
        fold_scores = np.array(list_scores)
        summa = sum(fold_scores - old_scores < 0) * 1 + sum(fold_scores - old_scores > 0) * -1
        mean_cv_score = round(np.mean(fold_scores), round_num)
        return fold_scores, summa, mean_cv_score

    def get_vars(
        self, 
        X: np.ndarray, 
        y: np.ndarray, 
        early_stopping_rounds: int = 30, 
        summa_approve: int = 1, 
        best_mean_cv: int = 100, 
        vars_in_model: Optional[List] = list(), 
        group_dt: Optional[np.ndarray] = None, 
        round_num: int = 3, 
        old_scores: Optional[np.ndarray] = None) -> List[str]:
        """
        Select variables based on their SHAP values and cross-validation scores.
        
        Args:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target vector.
            early_stopping_rounds (int, optional): Number of iterations without improvement to stop the selection.
                Defaults to 30.
            summa_approve (int, optional): Threshold for the sum of score differences to approve the variable. Defaults to 1.
            best_mean_cv (int, optional): Threshold for the mean cross-validation score to approve the variable. Defaults to 10**10.
            vars_in_model (List[str], optional): List of initial variables. Defaults to [].
            group_dt (Optional[np.ndarray], optional): Group labels for the samples. Defaults to None.
            round_num (int, optional): Number of decimal places for the scores. Defaults to 3.
        """
        self.round_num = round_num
        if old_scores is None:
            old_scores = np.array([0.5 for _ in range(self.cv.get_n_splits())])
        iteration_step = 0
        the_list_from_which_we_take_vars = [i for i in list(self.rank_dict.keys()) if i not in vars_in_model]
        feature_was_added = True

        while feature_was_added:
            iteration_step = 0
            var_for_add = ''
            if iteration_step > 0:
                print('начинаем след этап', best_mean_cv)
            else:
                print('запуск первого шага')
            best_positive_groups = summa_approve
            for var in the_list_from_which_we_take_vars:
                iteration_step += 1
                if iteration_step > early_stopping_rounds:
                    print(f'early_stopping_rounds {early_stopping_rounds}')
                    break
                fold_scores, summa, mean_cv_score = self.get_cross_val_score(X=X, y=y, var=var, old_scores=old_scores, selected_vars=vars_in_model.copy(), 
                                                                             group_dt=group_dt, round_num=self.round_num)
                if (summa > best_positive_groups) or (summa == best_positive_groups and mean_cv_score < best_mean_cv):
                    best_positive_groups = summa
                    best_mean_cv = mean_cv_score
                    old_scores = fold_scores
                    var_for_add = var
                    iteration_step = 0
                    print(f'new var_for_add ! {var_for_add}')

            if var_for_add != '':
                vars_in_model.append(var_for_add)
                the_list_from_which_we_take_vars.remove(var_for_add)
                print('едем дальше')
                print('в итоге получили список', vars_in_model)
                list_meta = ['vars_list'] + [best_positive_groups] + [best_mean_cv] + old_scores.tolist()
                df_meta = pd.DataFrame(list_meta).T
                df_meta.columns = ['vars', 'summa', 'mean_cv_scores'] + ['cv' + str(i) for i in range(1, self.cv.get_n_splits() + 1)]
                df_meta.at[0, 'vars'] = vars_in_model.copy()
                try:
                    df_meta_info = pd.concat([df_meta_info, df_meta])
                except:
                    df_meta_info = df_meta.copy()
                df_meta_info.to_csv(f'df_meta_info_{self.meta_info_name}.csv')
                continue
            else:
                feature_was_added = False

        print('мы сошлись')
        print(vars_in_model)
        #print(best_mean_cv)
        return vars_in_model
    
    def reset_temp_data(self):
        """
        Reset temporary data stored in the class attributes.
        """
        self.dict_fold_importances = None
        self.fe_dict = None
        self.rank_dict = None


In [26]:
from sklearn.model_selection import train_test_split

In [39]:
# Установка случайного зерна для воспроизводимости
np.random.seed(42)

# Генерация данных
n_samples = 10000
feature1 = np.random.rand(n_samples)
feature2 = np.random.rand(n_samples)
feature3 = np.random.rand(n_samples)
feature4 = np.random.rand(n_samples)  # Незначимый признак
feature5 = np.random.rand(n_samples)  # Незначимый признак
noise = np.random.normal(0, 0.1, n_samples)  # Небольшой шум

# Вычисление целевого класса
linear_combination = 0.33*feature1 + 0.5*feature2 + 0.17*feature3 + noise
target = (linear_combination > 0.5).astype(int)

# Создание DataFrame
data = pd.DataFrame({
    'Feature1': feature1,
    'Feature2': feature2,
    'Feature3': feature3,
    'Feature4': feature4,
    'Feature5': feature5,
    'Target': target
})

# Определение признаков и целевой переменной
X = data.drop(columns='Target')
y = data['Target']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42)

# Определение модели, кросс-валидатора и метрики
from lightgbm import LGBMClassifier
model = LGBMClassifier(max_depth=3, verbosity = -1)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
def custom_metric(y_test, preds):
    return 1 - roc_auc_score(y_test, preds)
metric = custom_metric


# Создание экземпляра класса Kraken
kraken = Kraken(estimator=estimator, cv=cv, metric=metric, meta_info_name="meta_info")

# Получение ранжированного словаря признаков
kraken.get_rank_dict(X_train, y_train, list(X.columns))

# Получение выбранных переменных
selected_vars = kraken.get_vars(X, y)

print("Выбранные переменные:", selected_vars)




shap_values.values.shape: (1340, 5, 2)
shap_values_class1.shape: (1340, 5), len(list_of_vars): 5




shap_values.values.shape: (1340, 5, 2)
shap_values_class1.shape: (1340, 5), len(list_of_vars): 5




shap_values.values.shape: (1340, 5, 2)
shap_values_class1.shape: (1340, 5), len(list_of_vars): 5




shap_values.values.shape: (1340, 5, 2)
shap_values_class1.shape: (1340, 5), len(list_of_vars): 5




shap_values.values.shape: (1340, 5, 2)
shap_values_class1.shape: (1340, 5), len(list_of_vars): 5
запуск первого шага
мы сошлись
['Feature4', 'Feature5', 'Feature2', 'Feature1', 'Feature3']
Выбранные переменные: ['Feature4', 'Feature5', 'Feature2', 'Feature1', 'Feature3']


In [28]:
X_train

Unnamed: 0,Feature1,Feature2,Feature3,Feature4,Feature5
2761,0.007842,0.936177,0.671368,0.111800,0.884194
123,0.110052,0.458941,0.999461,0.895067,0.973941
1808,0.174109,0.246961,0.256952,0.776391,0.291659
2286,0.971533,0.983712,0.441274,0.947284,0.069708
2147,0.357798,0.899249,0.921069,0.346760,0.288958
...,...,...,...,...,...
1638,0.992484,0.751825,0.056333,0.750811,0.612855
1095,0.176528,0.465337,0.592483,0.382824,0.997650
1130,0.723420,0.271955,0.282667,0.993502,0.668259
1294,0.800587,0.396313,0.160713,0.484039,0.323850
