In [7]:
from pytorch_tabnet.tab_model import TabNetClassifier
from sklearn.base import BaseEstimator, ClassifierMixin
import numpy as np  
from sklearn.utils._tags import InputTags
import sys
import os
import contextlib

@contextlib.contextmanager
def suppress_stdout():
    with open(os.devnull, 'w') as fnull:
        old_stdout = sys.stdout
        old_stderr = sys.stderr
        try:
            sys.stdout = fnull
            sys.stderr = fnull
            yield
        finally:
            sys.stdout = old_stdout
            sys.stderr = old_stderr

class SklearnCompatibleTabNetClassifier(BaseEstimator, ClassifierMixin):
    # _estimator_type = "classifier"  # Tells scikit-learn it's a classifier

    def __init__(self, **kwargs):
        self.tabnet = TabNetClassifier(**kwargs)

    def __sklearn_tags__(self):
        class DummyTag:
            def __init__(self):
                self.estimator_type = 'classifier'
                self.input_tags=InputTags(one_d_array=False, two_d_array=True, three_d_array=False, sparse=True, categorical=False, string=False, dict=False, positive_only=False, allow_nan=False, pairwise=False)
                self.requires_fit=True
        return DummyTag()

    def fit(self, X, y, **fit_params):
        # print("model.fit called")
        # with suppress_stdout():
        self.tabnet.fit(X, y, **fit_params)
        self.classes_ = np.unique(y)  # required by scikit-learn
        return self

    def predict(self, X):
        return self.tabnet.predict(X)

    def predict_proba(self, X):
        return self.tabnet.predict_proba(X)

    # def score(self, X, y):
    #     from sklearn.metrics import accuracy_score
    #     return accuracy_score(y, self.predict(X))

In [8]:
from pytorch_tabnet.tab_model import TabNetClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# Make example data (or use your own X_train, y_train)
X, y = make_classification(n_samples=300, n_features=20, random_state=42)
X = X.astype('float32')  # required by TabNet
y = y.astype(int)        # must be int (not float) for classification

model = TabNetClassifier(
    n_d=8, n_a=8, n_steps=3, verbose=0,
)
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)

model.fit(
    X_train, y_train,
    batch_size=32, 
    max_epochs=5,
    virtual_batch_size=16,    
)




In [9]:
model2 = SklearnCompatibleTabNetClassifier(
    n_d=8, n_a=8, n_steps=3, verbose=0,
)
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=43)

model2.fit(
    X_train, y_train,
    batch_size=32, 
    virtual_batch_size=16,    
    max_epochs=5,
)



In [10]:

from sklearn.model_selection import cross_val_score
print("Cross-validation AUC scores:")
scores = cross_val_score(model2, X, y, scoring='roc_auc', cv=3, params={
    'batch_size': 32,
    'virtual_batch_size': 16,
    'max_epochs': 5,
    })
print("Scores:", scores)

Cross-validation AUC scores:




epoch 0  | loss: 0.7256  |  0:00:00s
epoch 1  | loss: 0.66503 |  0:00:00s
epoch 2  | loss: 0.59901 |  0:00:00s
epoch 3  | loss: 0.55613 |  0:00:00s
epoch 4  | loss: 0.46004 |  0:00:01s




epoch 0  | loss: 0.71092 |  0:00:00s
epoch 1  | loss: 0.68784 |  0:00:00s
epoch 2  | loss: 0.66961 |  0:00:00s
epoch 3  | loss: 0.63598 |  0:00:00s
epoch 4  | loss: 0.5897  |  0:00:01s




epoch 0  | loss: 0.71107 |  0:00:00s
epoch 1  | loss: 0.68965 |  0:00:00s
epoch 2  | loss: 0.64823 |  0:00:00s
epoch 3  | loss: 0.58155 |  0:00:00s
epoch 4  | loss: 0.53243 |  0:00:01s
Scores: [0.9168 0.7192 0.8184]


In [None]:
# cross-validation template, see the code blocks below for usage
from sklearn.model_selection import RandomizedSearchCV
import optuna
from optuna import trial as optuna_trial
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.preprocessing import FunctionTransformer

# just for intellisense purposes
suggest_float = 'suggest_float'
suggest_int = 'suggest_int'
suggest_categorical = 'suggest_categorical'

metric = 'roc_auc'
optuna.logging.set_verbosity(optuna.logging.ERROR)
def cross_validate_model(model_class, model_parameters, fit_parameters, n_trials = 100): # [hyperparameter_name, functionname, (function args)
    def objective(trial: optuna_trial.Trial):
        # print(f"Running trial {trial.number}")
        try:
            # pipeline steps
            steps = []            
            model_args = {}
            for model_parameter in model_parameters:
                parameter_name, function_name, function_args = model_parameter
                func = getattr(trial, function_name)
                function_kwargs = {'log': True} if function_name == suggest_float else {}
                if function_args[0] == "hidden_layer_sizes":
                    # transform a scalar to a length 1 tuple for the hidden_layer_sizes parameter for MLPClassifier
                    model_args.update({parameter_name: (func(*function_args, **function_kwargs),)})
                else:
                    model_args.update({parameter_name: func(*function_args, **function_kwargs)})
            fit_args = {}
            for fit_parameter in fit_parameters:
                parameter_name, function_name, function_args = fit_parameter
                func = getattr(trial, function_name)
                function_kwargs = {'log': True} if function_name == suggest_float else {}
                fit_args.update({'classifier__' + parameter_name: func(*function_args, **function_kwargs)})
            # print('model args: ', model_args)
            # print('fit args: ', fit_args)
            model = model_class(**model_args)   
            # model._estimator_type = "classifier"
            # print(model._estimator_type)
            
            to_numpy = FunctionTransformer(lambda x: x.values if hasattr(x, "values") else x, validate=False)
            steps.append(('to_numpy', to_numpy))

            steps.append(('classifier', model))
            
            pipeline = ImbPipeline(steps=steps)
            score = cross_val_score(pipeline, X_train, y_train, cv=10, scoring=metric, params=fit_args, n_jobs=-1)
            mean_score = score.mean()
            return mean_score
        except Exception as e:
            print(f"An exception occurred: {e}")
            raise e
    
    study = optuna.create_study(direction='maximize', sampler=optuna.samplers.RandomSampler(seed=42))
    # print(f"Total trials: {len(study.trials)}")
    # print(f"Completed trials: {len([t for t in study.trials if t.state.name == 'COMPLETE'])}")
    # print(f"Failed trials: {len([t for t in study.trials if t.state.name == 'FAIL'])}")

    # study.optimize(objective, n_trials=n_trials, show_progress_bar=True, n_jobs=-1)
    # study.optimize(objective, n_trials=n_trials, show_progress_bar=True, catch=(Exception,))
    study.optimize(objective, n_trials=n_trials, show_progress_bar=True)

    trial = study.best_trial
    print(f'{model_class.__name__} Cross-Validation Performance:')
    print("Best trial:")
    print(f"  Value: {trial.value}")
    print("  Best hyperparameters:")
    for key, value in trial.params.items():
        print(f"    {key}: {value}")


In [None]:
# Example hyperparameter suggestions for TabNetClassifier
model_parameters = [
    # ('n_d', suggest_categorical, ['n_d', [8, 12, 16]]),
    # ('n_a', suggest_categorical, ['n_a', [8, 12, 16]]),
    ('n_d', suggest_categorical, ['n_d', [4, 8, 12]]),
    ('n_a', suggest_categorical, ['n_a', [4, 8, 12]]),
    ('n_steps', suggest_int, ['n_steps', 3, 5]),
    ('gamma', suggest_float, ['gamma', 1.0, 1.5]),
    ('lambda_sparse', suggest_float, ['lambda_sparse', 1e-4, 1e-3]),
    ('momentum', suggest_float, ['momentum', 0.02, 0.2]),
    ('clip_value', suggest_float, ['clip_value', 1.0, 2.0]),
    ('scheduler_params', suggest_categorical, ['scheduler_params', [None]]),
    ('verbose', suggest_categorical, ['verbose', [0]]),
    ('device_name', suggest_categorical, ['device_name', ['cuda']]),  # use 'cuda' if GPU is available,
]
fit_parameters = [
    ('batch_size', suggest_int, ['batch_size', 16, 32]),
    
    ('virtual_batch_size', suggest_int, ['virtual_batch_size', 8, 16]),
    ('max_epochs', suggest_int, ['max_epochs', 5, 10]),
]

cross_validate_model(SklearnCompatibleTabNetClassifier, model_parameters, fit_parameters, 5)
play_finished_hint()


