In [1]:
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import SelectFromModel
import numpy as np

class FeatureSelector(SelectFromModel):
    def __init__(self, estimator, threshold=None, prefit=False, norm_order=1, max_features=None, importance_getter='auto'):
        super().__init__(estimator, threshold=threshold, prefit=prefit, norm_order=norm_order, max_features=max_features, importance_getter=importance_getter)

    def fit(self, X, y=None, **fit_params):
        super().fit(X=X, y=y, **fit_params)
        self.feature_names = X.columns
        self.feature_importances_ = self.get_support()
        return self
    
    def fit_transform(self, X, y, **fit_params):
        self.fit(X, y, **fit_params)
        return self.transform(X)
    
    def transform(self, X):
        transformed_X = super().transform(X)
        if transformed_X.shape[1] == 0:
            if isinstance(X, np.ndarray):
                return X[:, [0]]
            elif hasattr(X, 'iloc'):  # Handling pandas DataFrame
                return X.iloc[:, [0]]
        return transformed_X
    
    def get_selected_features(self, feature_names=None):
        selected_features = self.get_support()
        if feature_names is None:
            feature_names = self.feature_names
        return [feature_names[i] for i, selected in enumerate(selected_features) if selected]

class VI_Pipeline(Pipeline):
    def __init__(self, steps, prediction_step=True, memory=None, verbose=False, vi_step="prediction", vi_attr="feature_importances_"):
        super().__init__(steps, memory=memory, verbose=verbose)
        self.selection_steps = steps[:-1] if prediction_step else steps[:]
        self.prediction_step = prediction_step
        self.vi_step = vi_step
        self.vi_attr = vi_attr
        self.feature_importances_ = None

    def fit(self, X, y=None, **fit_params):
        self.features = X.columns
        super().fit(X, y, **fit_params)
        
        self.feature_importances_ = self.recover_features(X.columns)
        return self
        
    def fit_transform(self, X, y, **fit_params):
        self.fit(X, y, fit_params=fit_params)
        return self.transform(X)
    
    def recover_features(self, all_features=None, selector_name="feature_trimming"):
        all_features = all_features if all_features is not None else self.features
        
        feature_selector = self.named_steps[selector_name]
        support_mask = feature_selector.get_support()
        
        full_importances = np.zeros(len(all_features))

        if support_mask.any():
            model_importances = getattr(self.named_steps[self.vi_step], self.vi_attr)
            full_importances[support_mask] = model_importances

        return full_importances

In [2]:
import numpy as np
import pandas as pd
import fastsparsegams
from sklearn.base import BaseEstimator, TransformerMixin

class FastSparseSklearn(BaseEstimator, TransformerMixin):
    def __init__(self, max_support_size=10, tol=1e-8, lambda_0=0.025, gamma=0, penalty="L0"):
        # self.data = data 
        # self.labels = labels
        # self.data = data.to_numpy() if not isinstance(data, np.ndarray) else data
        # self.labels = labels.to_numpy() if not isinstance(labels, np.ndarray) else labels
        # self.data = self.transform(data)
        # self.num_features = np.shape(data)[1]
        self.max_support_size = max_support_size
        # self.labels = self.labels[0].T
        self.tol = tol
        self.lambda_0 = lambda_0
        self.gamma = gamma
        self.penalty=penalty
        
    def transform(self, data):
        # Check if data is a DataFrame and convert it directly to a numpy array
        if isinstance(data, pd.DataFrame):
            # Convert DataFrame to numpy array, ignoring indices and headers
            data = data.values
        elif not isinstance(data, np.ndarray):
            # In case the input is neither DataFrame nor ndarray, convert it to ndarray
            data = np.array(data)

        # Ensure the data is of type float
        data = data.astype(float)
        return data
    
    def fit(self, data, labels):
        data = self.transform(data)
        labels = self.transform(labels)
        self.model = fastsparsegams.fit(data, labels, penalty=self.penalty, max_support_size=self.max_support_size, rtol=self.tol, algorithm = "CDPSI")
        
        coefficients = self.model.coeff(lambda_0=self.lambda_0, gamma=self.gamma, include_intercept=False).toarray()
        self.coef_ = np.squeeze(coefficients) #might need to do more processing later
        return self
    
    def predict(self, X):
        X = self.transform(X)
        return self.model.predict(X, lambda_0=self.lambda_0, gamma=self.gamma)

In [3]:
import numpy as np
import pandas as pd
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from xgboost import XGBRegressor
from sklearn.linear_model import Lasso
from warnings import simplefilter
from variable_importance.dgp import DataGenerator
from variable_importance.fastsparsewrap import FastSparseSklearn
from variable_importance.variable_importance_scoring import importance_score, cross_validation_scores
from variable_importance.pipelining import VI_Pipeline, FeatureSelector
from datetime import datetime
import os

param_grid = {
    "feature_trimming__estimator__max_support_size": [5, 10, 15, 20, 25],
    "feature_trimming__estimator__tol": [1e-9, 1e-8, 1e-7, 1e-6],
    "feature_trimming__estimator__lambda_0": [0.001, 0.005, 0.01, 0.05, 0.1],
    
    'prediction__learning_rate': [0.05, 0.1, 0.2],  # XGBoost learning rate
    'prediction__n_estimators': [100, 200, 300],  # Number of trees in XGBoost
    'prediction__max_depth': [3, 5, 7],  # Maximum depth of each tree in XGBoost
}

lasso = Lasso()
fastsparse = FastSparseSklearn()
xgboost = XGBRegressor()

lasso_selector = FeatureSelector(lasso)
fastsparse_selector = FeatureSelector(fastsparse, importance_getter="auto")
pipeline = VI_Pipeline(steps=[
    ('feature_trimming', fastsparse_selector),
    ('prediction', xgboost)
], prediction_step=True, vi_step="prediction")

In [20]:
dgp = DataGenerator(num_cols=100, num_rows=20, num_important=9, num_interaction_terms=0, effects='linear', noise_scale=0.1)
dataset = dgp.generate_data()

X = dataset.drop(["target"], axis=1)
y = dataset["target"]
dataset

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,91,92,93,94,95,96,97,98,99,target
0,1,0,0,1,1,1,0,1,1,0,...,0,1,0,1,1,1,1,0,1,0.409202
1,1,0,0,1,1,1,0,1,1,0,...,0,1,1,1,1,1,1,0,1,0.325871
2,1,0,0,1,1,1,0,1,1,0,...,0,1,0,1,1,1,1,0,1,0.250509
3,1,1,0,0,0,1,1,1,0,0,...,1,0,1,1,1,1,1,0,1,1.125573
4,0,0,1,1,1,1,0,1,1,0,...,0,1,1,0,1,1,1,0,1,-0.19127
5,0,0,1,1,1,1,1,1,0,0,...,0,1,1,0,0,1,1,0,1,1.013571
6,1,1,0,0,0,1,0,0,1,1,...,1,0,0,1,1,0,0,1,1,-0.238047
7,1,0,0,1,1,1,0,0,1,1,...,0,1,1,1,1,1,1,1,1,0.244733
8,1,1,0,0,0,0,1,1,0,1,...,1,0,1,1,1,0,1,0,1,0.437839
9,1,1,0,0,0,1,0,1,1,0,...,1,0,0,1,1,1,1,1,1,-0.30117


In [21]:
from sklearn.feature_selection._base import _get_feature_importances
from sklearn.feature_selection._from_model import _calculate_threshold

fastsparse_selector.fit(X, y)
reduced = fastsparse_selector.transform(X)
reduced

array([[1, 0, 0, 1],
       [1, 0, 0, 1],
       [1, 0, 0, 1],
       [0, 0, 1, 1],
       [1, 0, 0, 0],
       [1, 0, 1, 0],
       [0, 0, 0, 1],
       [1, 0, 0, 1],
       [0, 1, 1, 1],
       [0, 0, 0, 1],
       [1, 0, 1, 1],
       [1, 0, 0, 1],
       [0, 0, 1, 1],
       [1, 0, 1, 1],
       [1, 0, 0, 1],
       [0, 1, 1, 0],
       [0, 0, 1, 1],
       [0, 0, 0, 1],
       [1, 1, 0, 1],
       [1, 0, 1, 1]])

In [5]:
dgp = DataGenerator(num_cols=10, num_rows=20, num_important=3, num_interaction_terms=0, effects='linear', noise_scale=0.5)
dataset = dgp.generate_data()

X = dataset.drop(["target"], axis=1)
y = dataset["target"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

rscv = RandomizedSearchCV(pipeline, param_grid, scoring='r2', verbose=0, cv=3, n_iter=10, n_jobs=2)
# Now you can use this pipeline in your cross-validation function
cross_validation_scores(rscv, X, y, importance_attr='feature_importances_', true_importances=dgp.importances, verbose=True)

best_model = rscv.best_estimator_
# Calculate predictions for the training set and the test set

print(best_model.feature_importances_)

Scores For <class 'variable_importance.pipelining.VI_Pipeline'>
Training R^2 Score: 0.8968467838636399
Test R^2 Score: 0.7329769384009948
Importance Score: 0.19355044226326587
[0.         0.         0.89676213 0.         0.         0.
 0.05333584 0.         0.04990204 0.        ]


In [6]:
fastsparse_selector.estimator_.coef_

array([0.        , 0.94807854, 0.55401202, 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ])