In [1]:
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.feature_selection import (
    RFE,
    RFECV,
    SelectKBest,
    SelectPercentile,
    chi2,
    f_classif,
    f_regression,
    mutual_info_classif,
    mutual_info_regression,
    r_regression,
)
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.metrics import roc_auc_score

# Create dataset

In [2]:
n_features = 10
n_informative = 5
n_repeated = 0
n_redundant = n_features - n_informative - n_repeated

x, y = make_classification(
    n_samples=100,
    n_features=n_features,
    n_informative=n_informative,
    n_redundant=n_redundant,
    n_repeated=n_repeated,
    shuffle=False,
    random_state=0
)

cols = (
    [f"informative_{i+1}" for i in range(n_informative)]
    + [f"redundant_{i+1}" for i in range(n_redundant)]
    + [f"repeated_{i+1}" for i in range(n_repeated)]
)
df = pd.DataFrame(x, columns=cols)
np.random.shuffle(cols)
df = df.loc[:, cols]

df.head()

Unnamed: 0,informative_3,informative_4,redundant_2,redundant_5,redundant_4,redundant_3,redundant_1,informative_1,informative_5,informative_2
0,2.26121,3.783299,-2.255731,-2.075617,-0.129983,-3.091119,-3.291618,0.3122,0.097078,1.625442
1,-2.55234,1.592792,0.456445,0.036939,4.652468,0.011461,0.827081,-2.034687,-0.268015,2.938311
2,-1.768784,1.546135,2.776303,-2.213776,2.459416,-0.569547,0.983283,-3.289471,2.007283,2.412555
3,-1.085284,1.153899,-1.281468,1.061403,2.270966,-0.042542,0.004709,1.241933,-1.462497,0.565236
4,-0.09727,0.843422,-0.416925,-0.019261,0.875982,-0.619473,0.502971,0.056355,0.276243,1.128508


In [3]:
scaler = StandardScaler()
x_scaled = scaler.fit_transform(df)
df = pd.DataFrame(x_scaled, columns=cols)
df.head()

Unnamed: 0,informative_3,informative_4,redundant_2,redundant_5,redundant_4,redundant_3,redundant_1,informative_1,informative_5,informative_2
0,1.95485,1.840792,-1.40518,-0.611478,-0.491174,-2.352364,-0.807773,0.534391,0.357408,1.033299
1,-1.480864,0.378531,0.075905,0.389441,2.069961,0.155934,0.85231,-1.123114,0.120178,1.828515
2,-0.921593,0.347386,1.342749,-0.676937,0.895521,-0.313785,0.915269,-2.009314,1.59862,1.51006
3,-0.433739,0.08555,-0.873147,0.874827,0.794601,0.112275,0.520845,1.191021,-0.655972,0.391122
4,0.271465,-0.121707,-0.401031,0.362813,0.047548,-0.354148,0.721674,0.353699,0.473826,0.732301


In [4]:
df.describe()

Unnamed: 0,informative_3,informative_4,redundant_2,redundant_5,redundant_4,redundant_3,redundant_1,informative_1,informative_5,informative_2
count,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0
mean,1.1102230000000002e-17,2.420286e-16,-2.2204460000000003e-17,1.776357e-16,1.576517e-16,8.382184000000001e-17,6.661338000000001e-17,1.19349e-16,-3.053113e-17,4.8849810000000005e-17
std,1.005038,1.005038,1.005038,1.005038,1.005038,1.005038,1.005038,1.005038,1.005038,1.005038
min,-2.484115,-2.304834,-1.879214,-2.879501,-2.25696,-2.545571,-2.364498,-2.306163,-2.351453,-2.67253
25%,-0.6313777,-0.6139425,-0.6273937,-0.5983725,-0.7474635,-0.527625,-0.8088819,-0.7036224,-0.7011157,-0.4749471
50%,0.08725528,-0.03779414,-0.2082622,0.122723,-0.03274234,-0.01709938,0.0001567513,-0.03628556,-0.007681122,0.1689089
75%,0.5940521,0.7726914,0.4364138,0.6185508,0.8019425,0.4512292,0.6778138,0.4674671,0.5965928,0.7282716
max,2.600969,2.523463,2.133019,2.222831,2.069961,2.687635,2.603251,2.560535,2.466923,2.03269


# Univariate feature selection

In [5]:
class UnivariateFeatureSelection:

    def __init__(self, n_features, problem_type, scoring):
        """
        Custom univariate feature selection wrapper on different
        univariate feature selection models from sklearn.
        :param n_features: SelectPercentile if float, SelectKBest if int
        :param problem_type: classification or regression
        :param scoring: scoring function, string
        """

        # for a given problem type, there are only a few valid scoring methods
        if problem_type == "classification":
            valid_scoring = {
                "chi2": chi2,
                "f_classif": f_classif,
                "mutual_info_classif": mutual_info_classif,
            }
        elif problem_type == "regression":
            valid_scoring = {
                "r_regression": r_regression,
                "f_regression": f_regression,
                "mutual_info_regression": mutual_info_regression,
            }
        else:
            raise Exception(
                "Invalid problem type. Select regression or classification."
            )

        # raise exception if scoring is not valid
        # seach in valid_scoring keys
        if scoring not in valid_scoring.keys():
            raise Exception("Invalid scoring function.")

        # if n_features == int, SelectKBest
        if isinstance(n_features, int):
            self.selection = SelectKBest(valid_scoring[scoring], k=n_features)
        # if n_features == float, SelectPercentile
        elif isinstance(n_features, float):
            self.selection = SelectPercentile(
                valid_scoring[scoring], percentile=round(n_features * 100)
            )
        else:
            raise Exception("Invalid type of feature.")

        self.feature_names = None

    def fit(self, X, y):
        
        # Fit the feature selector
        self.selection.fit(X, y)
        
        # Check if X is a DataFrame for .columns to work
        if isinstance(X, pd.DataFrame):
            self.feature_names = X.columns[self.selection.get_support()]
        else:
            message = "X must be a pandas DF to extract feature names."
            raise Exception(message)
        
        # Save feature names
        self.feature_names = X.columns[self.selection.get_support()]

        return self

    def transform(self, X):
        return self.selection.transform(X)

    def fit_transform(self, X, y):
        # we have to fit first to be able to obtain feature names
        self.fit(X, y)
        return self.transform(X)

    def get_feature_names(self):
        # if fit is not yet called, raise exception
        if self.feature_names is None:
            raise Exception("Must call fit method first.")
        return self.feature_names

In [6]:
scorings = ["f_classif", "mutual_info_classif"]
for scoring in scorings:
    f_selection = UnivariateFeatureSelection(n_features=n_informative, problem_type="classification", scoring=scoring)
    f_selection.fit_transform(df, y)
    print(f"scoring {scoring}")
    print(f"selected features = {sorted(f_selection.get_feature_names())}")
    print("=" * 100)

scoring f_classif
selected features = ['informative_1', 'informative_2', 'informative_3', 'redundant_1', 'redundant_4']
scoring mutual_info_classif
selected features = ['informative_1', 'informative_2', 'informative_3', 'redundant_1', 'redundant_4']


# Recursive feature elimination

In [7]:
rfe = RFE(estimator=LogisticRegression(random_state=0), n_features_to_select=n_informative, step=1)
rfe.fit(df, y)
mask = rfe.support_
selected = df.loc[:, mask].columns

print("RFE")
print(f"selected features = {sorted(selected)}")
print("=" * 100)

RFE
selected features = ['informative_1', 'informative_2', 'informative_5', 'redundant_1', 'redundant_4']


In [8]:
rfecv = RFECV(
    estimator=LogisticRegression(random_state=0), step=1, cv=StratifiedKFold(), scoring="accuracy"
)
rfecv.fit(df, y)
mask = rfecv.support_
selected = df.loc[:, mask].columns

print("RFECV")
print(f"selected features = {sorted(selected)}")
print("=" * 100)

RFECV
selected features = ['informative_1', 'redundant_1']


# Greedy feature selection

In [9]:
class GreedyFeatureSelection:
    """
    A simple custom class for greedy feature selection.
    """

    def __init__(self):
        self.selected_feature_list = []  # for debugging

    def evaluate_score(self, X, y):
        """
        Fits a logistic regression model to the given data
        and calculates the AUC score.

        Args:
            X (pd.DataFrame): the feature matrix.
            y (pd.Series): the target vector.

        Returns:
            float: the AUC score.
        """
        model = LogisticRegression(random_state=0)
        model.fit(X, y)
        y_pred_proba = model.predict_proba(X)[:, 1]
        auc = roc_auc_score(y, y_pred_proba)
        return auc

    def _feature_selection(self, X, y):
        """
        Private method that performs the feature selection process.

        Args:
            X (pd.DataFrame): the feature matrix.
            y (pd.Series): the target vector.

        Returns:
            tuple: a tuple containing the best scores
            and the list of selected features.
        """
        good_features = []
        best_scores = []

        # get list of features
        features = X.columns

        # Perform feature selection in an infinite loop
        # until no further improvement in scores.
        while True:
            this_feature = None
            best_score = 0

            # loop over all features
            for feature in features:
                # If feature is already in the good features list, skip it
                if feature in good_features:
                    continue

                # Add this feature to the list of selected features for test
                selected_features = good_features + [feature]
                X_train = X.loc[:, selected_features]
                self.selected_feature_list.append(selected_features)  # for debugging

                # Calculate the score using the evaluate_score method
                auc = self.evaluate_score(X_train, y)

                # If score is greater than the best score of this for loop,
                # update best feature and score
                if auc > best_score:
                    this_feature = feature
                    best_score = auc

            # If a feature was selected in the for loop,
            # add it to the list of good features
            # and update the best scores list
            if this_feature is not None:
                good_features.append(this_feature)
                best_scores.append(best_score)

            # If the score decreased during the previous round,
            # exit the while loop
            # else continue with other features
            if len(best_scores) > 2:
                if best_scores[-1] < best_scores[-2]:
                    break

        # Return the best scores and the list of selected features
        return best_scores[:-1], good_features[:-1]

    def __call__(self, X, y):
        """
        Select the best features based on the greedy feature selection method.

        Args:
            X (pd.DataFrame): the feature matrix.
            y (pd.Series): the target vector.

        Returns:
            tuple: a tuple containing the transformed feature matrix
            with selected features and the best scores.
        """
        scores, features = self._feature_selection(X, y)
        X_transformed = X.loc[:, features]
        return X_transformed, scores

In [10]:
greedy = GreedyFeatureSelection()
selected = greedy(df, y)[0].columns[:n_informative]

print("Greedy")
print(f"selected features = {sorted(selected)}")
print("=" * 100)

Greedy
selected features = ['informative_1', 'informative_2', 'informative_4', 'redundant_1', 'redundant_2']


# More robust greedy feature selection with `sklearn.feature_selection.SequentialFeatureSelector`

In [11]:
sfs = SequentialFeatureSelector(estimator=LogisticRegression(random_state=0), n_features_to_select=n_informative, direction="forward")
sfs.fit(df, y)
selected = sorted(df.loc[:, sfs.get_support()].columns)

print("SequentialFeatureSelector Forward")
print(f"selected features = {sorted(selected)}")
print("=" * 100)

sfs = SequentialFeatureSelector(estimator=LogisticRegression(random_state=0), n_features_to_select=n_informative, direction="backward")
sfs.fit(df, y)
selected = sorted(df.loc[:, sfs.get_support()].columns)

print("SequentialFeatureSelector Backward")
print(f"selected features = {sorted(selected)}")
print("=" * 100)

SequentialFeatureSelector Forward
selected features = ['informative_1', 'informative_2', 'informative_4', 'redundant_1', 'redundant_2']
SequentialFeatureSelector Backward
selected features = ['informative_1', 'informative_5', 'redundant_2', 'redundant_3', 'redundant_5']
