# Фильтрационный отбор признаков

---



In [None]:
# !pip install ucimlrepo

In [None]:
from tqdm import tqdm
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

from sklearn.linear_model import LogisticRegression

[UCI: Taiwanese Bankruptcy Prediction](https://archive.ics.uci.edu/dataset/572/taiwanese+bankruptcy+prediction)

## Импортируем наш игрушечный набор данных

In [None]:
from ucimlrepo import fetch_ucirepo

# fetch dataset
taiwanese_bankruptcy_prediction = fetch_ucirepo(id=572)

# data (as pandas dataframes)
X = taiwanese_bankruptcy_prediction.data.features
y = taiwanese_bankruptcy_prediction.data.targets

In [None]:
X = X.drop([' Net Income Flag'], axis=1)

In [None]:
features_names = X.columns

In [None]:
df = X.copy()
df['target'] = y.iloc[:, 0]

In [None]:
SEED = 42

In [None]:
df_train, df_test = train_test_split(df, test_size=0.2, random_state=SEED)

In [None]:
df_train.shape, df_test.shape

In [None]:
y_train, y_test = df_train.target, df_test.target

In [None]:
model = LogisticRegression(random_state=SEED)

## Используем несколько фильтрационных методов отбора признаков

### Ранговая корреляция Спирмана

In [None]:
spearman_cormat = df_train.corr(method='spearman')
spearman_corvec = spearman_cormat.iloc[:-1, -1]

In [None]:
spearman_corvec

In [None]:
# absolute_correlations

In [None]:
def FilterFitValidate(correlations, n_select):
    """
    Вспомогательная функция для фильтрации признаков по вектору корреляций
    с таргетом, обучения и валидации модели
    """
    best_filtered = correlations.abs().nlargest(n_select).index
    X_train = df_train[best_filtered]
    X_test = df_test[best_filtered]
    model_filtered = model.fit(X_train, y_train)
    y_pred = model_filtered.predict_proba(X_test)[:, 1]

    roc_auc_value = roc_auc_score(y_test, y_pred)

    return round(roc_auc_value, 2), best_filtered

In [None]:
roc_auc_value, selected_features = FilterFitValidate(spearman_corvec, 10)
print(roc_auc_value, selected_features)

### Взаимная информация

In [None]:
# При желании можно поиграть с матрицей взаимной информации

mi_vectors = []
for i in tqdm(df_train.columns):
  try:
    mi_vectors.append(mutual_info_classif(df_train, y=df_train[i]))
  except:
    mi_vectors.append(mutual_info_regression(df_train, y=df_train[i]))

In [None]:
mi_cormat = pd.DataFrame(mi_vectors, columns=df_train.columns, index=df_train.columns)
mi_corvec = mi_cormat.iloc[:-1, -1]

In [None]:
mi_corvec = pd.Series(mutual_info_classif(df_train, y=y_train)[:-1], index=features_names)

In [None]:
roc_auc_value, selected_features = FilterFitValidate(mi_corvec, 10)
print(roc_auc_value, selected_features)

### ROC AUC

In [None]:
auc_corvec = pd.Series({i: roc_auc_score(y_train, df_train.loc[:, i]) for i in features_names})

In [None]:
roc_auc_value, selected_features = FilterFitValidate(auc_corvec, 10)
print(roc_auc_value, selected_features)

## Немного фантазии на тему избыточности информации

[немного про аггломерацию признаков](https://scikit-learn.org/stable/auto_examples/cluster/plot_feature_agglomeration_vs_univariate_selection.html#sphx-glr-auto-examples-cluster-plot-feature-agglomeration-vs-univariate-selection-py)

In [None]:
from sklearn.cluster import AgglomerativeClustering

In [None]:
n_select = 10

In [None]:
# clustering = AgglomerativeClustering(n_select).fit(spearman_cormat)
clustering = AgglomerativeClustering(n_select).fit(mi_cormat)
clustering

In [None]:
# spearman_cormat_ = spearman_cormat.copy()
spearman_cormat_ = mi_cormat.copy()
spearman_cormat_['label'] = clustering.labels_
spearman_cormat_ = spearman_cormat_.iloc[:-1, ]

In [None]:
plt.figure(figsize=(20, 20))
sns.heatmap(spearman_cormat_.sort_values(by='label').drop('label', axis=1))
plt.show()

In [None]:
filtration_with_clustering = [
    spearman_cormat_.loc[spearman_cormat_.label.eq(i), :].target.abs().idxmax() for i in range(n_select)
    ]

In [None]:
filtration_with_clustering

In [None]:
X_train_clust = df_train[filtration_with_clustering]
X_test_clust = df_test[filtration_with_clustering]
model_filtered = model.fit(X_train_clust, y_train)
y_pred = model_filtered.predict_proba(X_test_clust)[:, 1]

roc_auc_value = roc_auc_score(y_test, y_pred)

In [None]:
round(roc_auc_value, 2)

# Обёрточный отбор признаков

## Жадный прямой отбор

In [None]:
from sklearn.feature_selection import SequentialFeatureSelector, RFECV

In [None]:
n_select = 10

In [None]:
def forward_feature_selection(learner, X_train_, y_train, X_test_, y_test, n_features_to_select=2):
    n=n_features_to_select

    best_features = []
    best_scores = []

    for i in range(1, n+1):
        scores = []
        for feature in X_train_.columns:
            selected_features = best_features.copy()
            selected_features.append(feature)
            model = learner.fit(X_train_[selected_features], y_train)
            score = roc_auc_score(y_test, model.predict_proba(X_test_[selected_features])[:, 1])
            scores.append(score)
        selected_feature = X_train_.columns[np.argmax(scores)]
        best_features.append(selected_feature)
        best_model = learner.fit(X_train_[best_features], y_train)
        best_score_ = roc_auc_score(y_test, best_model.predict_proba(X_test_[best_features])[:, 1])
        best_scores.append(best_score_)

    return best_score_, best_features

In [None]:
scores_trace = []
for i in tqdm(range(1, n_select)):
    best_score, best_features = forward_feature_selection(
        model,
        df_train.iloc[:, :-1],
        y_train,
        df_test.iloc[:, :-1],
        y_test,
        i
    )
    scores_trace.append(best_score)

In [None]:
best_score, best_features

In [None]:
plt.plot(np.arange(9), scores_trace, label='Test ROC AUC')

## Неградиентная оптимизация для отбора признаков

In [None]:
# !pip install optuna

In [None]:
import optuna
from optuna.samplers import TPESampler
import warnings
# from sklearn.model_selection import RepeatedStratifiedKFold

In [None]:
def FeatureSelector(trial, df_train, df_test, random_state):

    features_series = pd.Series(
        {
            i: trial.suggest_categorical(i, [0, 1])
            for i in df_train.iloc[:, :-1].columns
        }
    )
    best_features = features_series[features_series.eq(1)].index.to_list()
    X_train, y_train = df_train[best_features], df_train.target
    X_test, y_test = df_test[best_features], df_test.target

    model = LogisticRegression(random_state=random_state)
    model.fit(X_train, y_train)
    y_pred = model.predict_proba(X_test)[:, 1]

    roc_auc_value = roc_auc_score(y_test, y_pred)

    return round(roc_auc_value, 2)

In [None]:
optuna.logging.set_verbosity(optuna.logging.ERROR)

In [None]:
random_state = SEED

### Для начала в лоб

In [None]:
sampler = TPESampler(seed=SEED)
warnings.filterwarnings("ignore")
study = optuna.create_study(direction='maximize', sampler=sampler)
study.optimize(
    lambda trial: FeatureSelector(
        trial,
        df_train=df_train,
        df_test=df_test,
        random_state=SEED
    ),
    n_trials=200, timeout=600, show_progress_bar=True, n_jobs=-1,
    gc_after_trial=False
)

In [None]:
study.best_trial.value

### Попробуем переписать целевую функцию


In [None]:
def FeatureSelector(trial, df_train, df_test, n_select, random_state):

    best_features = [
        trial.suggest_categorical(
            f"X{i}",
            df_train.iloc[:, :-1].columns
        ) for i in range(1, n_select+1)
    ]
    best_features = list(set(best_features))

    X_train, y_train = df_train[best_features], df_train.target
    X_test, y_test = df_test[best_features], df_test.target

    model = LogisticRegression(random_state=random_state)
    model.fit(X_train, y_train)
    y_pred = model.predict_proba(X_test)[:, 1]

    roc_auc_value = roc_auc_score(y_test, y_pred)

    return round(roc_auc_value, 2)

In [None]:
sampler = TPESampler(seed=SEED)
warnings.filterwarnings("ignore")
study_two = optuna.create_study(direction='maximize', sampler=sampler)
study_two.optimize(
    lambda trial: FeatureSelector(
        trial,
        df_train=df_train,
        df_test=df_test,
        n_select=n_select,
        random_state=SEED
    ),
    n_trials=200, timeout=600, show_progress_bar=True, n_jobs=-1,
    gc_after_trial=False
)

In [None]:
best_features_opt = list(study_two.best_params.values())
best_features_opt

# Настройка Гиперпараметров

### Пример с частичным обучением

In [None]:
from sklearn.linear_model import SGDClassifier

def objective(trial, df_train, df_test, random_state):

    X_train, y_train = df_train.iloc[:, :-1], df_train.target
    X_test, y_test = df_test.iloc[:, :-1], df_test.target
    classes = np.unique(y_train)

    alpha = trial.suggest_float("alpha", 1e-5, 1e-1, log=True)
    clf = SGDClassifier(alpha=alpha, loss='log_loss', random_state=random_state)

    for step in range(100):
        clf.partial_fit(X_train, y_train, classes=classes)

        # Report intermediate objective value.
        intermediate_value = roc_auc_score(y_test, clf.predict_proba(X_test)[:, 1])
        trial.report(intermediate_value, step)

        # Handle pruning based on the intermediate value.
        if trial.should_prune():
            raise optuna.TrialPruned()


    return roc_auc_score(y_test, clf.predict_proba(X_test)[:, 1])

In [None]:
warnings.filterwarnings("ignore")
study_three = optuna.create_study(
    pruner=optuna.pruners.MedianPruner(),
    direction='maximize',
    sampler=sampler
)
study_three.optimize(
    lambda trial: objective(
        trial,
        df_train=df_train[best_features_opt+['target']],
        df_test=df_test[best_features_opt+['target']],
        random_state=SEED
    ),
    n_trials=100,
    timeout=600, show_progress_bar=True, n_jobs=-1,
    gc_after_trial=False
)

In [None]:
study_three.best_params

# Differential Evolution

In [None]:
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import differential_evolution
from matplotlib.animation import FuncAnimation
from mpl_toolkits.mplot3d import Axes3D
from IPython.display import HTML

# Define the Rastrigin function
def rastrigin(x):
    return 20 + (x[0]**2 - 10 * np.cos(2 * np.pi * x[0])) + (x[1]**2 - 10 * np.cos(2 * np.pi * x[1]))

# Bounds for x and y
bounds = [(-5, 5), (-5, 5)]

# Create a meshgrid for visualization
x = np.linspace(-5, 5, 100)
y = np.linspace(-5, 5, 100)
X, Y = np.meshgrid(x, y)
Z = rastrigin([X, Y])

# Set up the figure
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
ax.plot_surface(X, Y, Z, cmap='viridis', alpha=0.6)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('f(x, y)')
ax.set_title('Differential Evolution Optimization on Rastrigin Function')


# Implement our own simple DE algorithm (better for teaching)


def simple_de(func, bounds, popsize=7, maxiter=10, F=0.7, CR=0.3):
    dim = len(bounds)
    pop = np.random.rand(popsize, dim)
    for i in range(popsize):
        for j in range(dim):
            pop[i,j] = bounds[j][0] + pop[i,j]*(bounds[j][1]-bounds[j][0])

    history = [pop.copy()]
    fitness = np.array([func(ind) for ind in pop])

    for gen in range(maxiter):
        for i in range(popsize):
            # Mutation
            candidates = [c for c in range(popsize) if c != i]
            a, b, c = pop[np.random.choice(candidates, 3, replace=False)]
            mutant = a + F * (b - c)

            # Crossover
            cross_points = np.random.rand(dim) < CR
            trial = np.where(cross_points, mutant, pop[i])

            # Clip to bounds
            trial = np.clip(trial, [b[0] for b in bounds], [b[1] for b in bounds])

            # Selection
            f = func(trial)
            if f < fitness[i]:
                pop[i] = trial
                fitness[i] = f

        history.append(pop.copy())
        print(f"Generation {gen+1}, best fitness: {min(fitness):.4f}")

    best_idx = np.argmin(fitness)
    return pop[best_idx], fitness[best_idx], history

# Run our simple DE
best_solution, best_fitness, population_history = simple_de(rastrigin, bounds)

print(f"Best solution: x={best_solution[0]:.4f}, y={best_solution[1]:.4f}")
print(f"Best fitness: {best_fitness:.4f}")

# Animation function
def update(frame):
    ax.clear()
    ax.plot_surface(X, Y, Z, cmap='viridis', alpha=0.6)
    ax.set_xlabel('x')
    ax.set_ylabel('y')
    ax.set_zlabel('f(x, y)')
    ax.set_title(f'Generation {frame}')

    # Plot current population
    population = population_history[frame]
    for individual in population:
        ax.scatter(individual[0], individual[1], rastrigin(individual), color='red', s=50)

    # Plot best individual so far
    current_fitness = [rastrigin(ind) for ind in population]
    best_in_gen = population[np.argmin(current_fitness)]
    ax.scatter(best_in_gen[0], best_in_gen[1], rastrigin(best_in_gen),
               color='blue', s=100, marker='*')

# Create animation
ani = FuncAnimation(fig, update, frames=len(population_history), interval=500)
plt.close()

# Display animation
HTML(ani.to_jshtml())

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

# Rastrigin function
def rastrigin(x):
    return 20 + (x[0]**2 - 10*np.cos(2*np.pi*x[0])) + (x[1]**2 - 10*np.cos(2*np.pi*x[1]))

# Setup
bounds = [(-5, 5), (-5, 5)]
x = np.linspace(-5, 5, 100)
y = np.linspace(-5, 5, 100)
X, Y = np.meshgrid(x, y)
Z = rastrigin([X, Y])

# Figure setup
fig, ax = plt.subplots(figsize=(10, 8))
contour = ax.contourf(X, Y, Z, levels=20, cmap='viridis')
plt.colorbar(contour, label='f(x,y)')
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_title('Differential Evolution with Convergence Path')


def differential_evolution_2d(func, bounds, popsize=15, maxiter=30, F=0.7, CR=0.3):
    global best_history
    dim = len(bounds)
    pop = np.random.rand(popsize, dim)
    for i in range(popsize):
        for j in range(dim):
            pop[i,j] = bounds[j][0] + pop[i,j]*(bounds[j][1]-bounds[j][0])

    history = [pop.copy()]
    fitness = np.array([func(ind) for ind in pop])
    best_history = [pop[np.argmin(fitness)]]

    for gen in range(maxiter):
        new_pop = pop.copy()
        for i in range(popsize):
            candidates = [c for c in range(popsize) if c != i]
            a, b, c = pop[np.random.choice(candidates, 3, replace=False)]
            mutant = a + F * (b - c)
            trial = np.where(np.random.rand(dim) < CR, mutant, pop[i])
            trial = np.clip(trial, [b[0] for b in bounds], [b[1] for b in bounds])

            if func(trial) < fitness[i]:
                new_pop[i] = trial
                fitness[i] = func(trial)

        pop = new_pop
        history.append(pop.copy())
        best_history.append(pop[np.argmin(fitness)])
        print(f"Gen {gen+1}, Best: {min(fitness):.4f} at {best_history[-1]}")

    best_idx = np.argmin(fitness)
    return pop[best_idx], fitness[best_idx], history

# Run optimization
best_solution, best_fitness, population_history = differential_evolution_2d(rastrigin, bounds)

# Animation function
def update(frame):
    ax.clear()
    ax.contourf(X, Y, Z, levels=20, cmap='viridis', alpha=0.7)

    # Current population
    pop = population_history[frame]
    ax.scatter(pop[:,0], pop[:,1], color='red', s=30, alpha=0.7, label='Population')

    # Current best
    current_best = best_history[frame]
    ax.scatter(current_best[0], current_best[1], color='yellow', s=100,
               marker='*', edgecolor='black', label='Current Best')

    # Convergence path (up to current frame)
    if frame > 0:
        path = np.array(best_history[:frame+1])
        ax.plot(path[:,0], path[:,1], 'w-', linewidth=2, alpha=0.7, label='Convergence Path')
        ax.scatter(path[1:,0], path[1:,1], color='white', s=15, alpha=0.5)

    # Global optimum
    ax.scatter(0, 0, color='white', s=100, marker='P', edgecolor='black', label='Global Optimum')

    ax.set_xlabel('x')
    ax.set_ylabel('y')
    ax.set_title(f'Generation {frame}')
    ax.legend(loc='upper right')

# Create and display animation
ani = FuncAnimation(fig, update, frames=len(population_history), interval=500)
# Save as GIF (using Pillow writer)
# ani.save('de_optimization.gif', writer='pillow', fps=4, dpi=100)  # Adjust fps for speed

# print("GIF saved as 'de_optimization.gif'")
# plt.close()
plt.close()
HTML(ani.to_jshtml())