In [1]:
from xgboost import XGBClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import GridSearchCV
import pandas as pd
import numpy as np

class ClassPredictor():
    def __init__(self, model=XGBClassifier(), *, param_grid=None):
        self.model = model
        self.param_grid = param_grid
    
    def fit(self, X, y):
        if self.param_grid:
            grid_model = GridSearchCV(self.model, self.param_grid, n_jobs=-1)
            try:
                grid_model.fit(X, y)
                self.model = grid_model.best_estimator_
                return True
            except Exception as e:
                print(e, "Error in fitting the model, passed param_grid could be the issue, make sure it is in the correct format.")
                return False
        try:
            self.model.fit(X, y)
            return True
        except Exception as e:
            print(e, "Error in fitting the model")
            return False

    def predict(self, X):
        try:
            if X.ndim == 1:
                X = X.reshape(1, -1)
            return self.model.predict(X)
        except Exception as e:
            print(e, "Error in predicting the model")
        
    def predict_proba(self, X):
        try:
            if X.ndim == 1:
                X = X.reshape(1, -1)
            return self.model.predict_proba(X)
        except Exception as e:
            print(e, "Error in predicting the model")


class LowLevelHandler():
    def __init__(self, model, *, lower_params, class_ratio=0.20, random_state=None):
        self.model = model
        self.lower_params = lower_params
        self.class_ratio = class_ratio
        self.random_state = None
        self.classes = []
        self.predictors = {}

    def create_predictors(self, y):
        self.get_classes(y)
        if self.lower_params:
            for i in self.classes:
                self.predictors[i] = ClassPredictor(self.model, param_grid=self.lower_params)
        else:
            for i in self.classes:
                self.predictors[i] = ClassPredictor(self.model)
        return self.predictors

    def get_predictors(self):
        return self.predictors

    def get_classes(self, y):
        self.classes = pd.Series(y).value_counts().index.tolist()
    
    def split_data(self, X, y, class_ratio, random_state=None):
        data = {}
        rng = np.random.default_rng(random_state)

        for i in self.classes:
            class_samples = X[y == i]
            non_class_samples = X[y != i]
            num_non_class_samples = int(len(class_samples) * (1 - class_ratio) / class_ratio)
            if len(non_class_samples) > num_non_class_samples:
                non_class_samples = non_class_samples[rng.choice(
                    len(non_class_samples), size=num_non_class_samples, replace=False
                )]
            data[i] = class_samples
            data["not_" + str(i)] = non_class_samples
        
        return data


    def fit(self, X, y):
        data = self.split_data(X, y, self.class_ratio)
        
        for i in self.classes:
            class_samples = data[i]
            non_class_samples = data["not_" + str(i)]
            
            combined_X = np.vstack((class_samples, non_class_samples))
            combined_y = np.hstack((np.ones(len(class_samples)), np.zeros(len(non_class_samples))))
            
            self.predictors[i].fit(combined_X, combined_y)

    def fit_predictor(self, X, y):
        for i in self.classes:
            self.predictors[i].fit(X, y==i)

    def get_predictor_results(self, class_name, X):
        return self.predictors[class_name].predict(X)

    def get_predictor_proba(self, class_name, X):
        return self.predictors[class_name].predict_proba(X)

class HighLevelHandler():
    def __init__(self, margin=0.05, include_status=False, model=XGBClassifier(), lower_params=None, class_ratio=0.20, hidden_layer_sizes=(100,), 
    activation='relu', *, solver='adam',alpha=0.0001, batch_size='auto', learning_rate='constant', learning_rate_init=0.001, power_t=0.5, max_iter=200, 
    shuffle=True, random_state=None, tol=0.0001, verbose=False, warm_start=False, momentum=0.9, nesterovs_momentum=True, early_stopping=False, validation_fraction=0.1, 
    beta_1=0.9, beta_2=0.999, epsilon=1e-08, n_iter_no_change=10, max_fun=15000):
        
        self.low_level_handler = LowLevelHandler(model=model, lower_params=lower_params, class_ratio=class_ratio, random_state=random_state)
        self.margin = margin
        self.include_status = include_status
        self.high_level_model = MLPClassifier(
            hidden_layer_sizes=hidden_layer_sizes, 
            activation=activation, 
            solver=solver, 
            alpha=alpha, 
            batch_size=batch_size, 
            learning_rate=learning_rate, 
            learning_rate_init=learning_rate_init, 
            power_t=power_t, 
            max_iter=max_iter, 
            shuffle=shuffle, 
            random_state=random_state, 
            tol=tol, 
            verbose=verbose, 
            warm_start=warm_start, 
            momentum=momentum, 
            nesterovs_momentum=nesterovs_momentum, 
            early_stopping=early_stopping, 
            validation_fraction=validation_fraction, 
            beta_1=beta_1, 
            beta_2=beta_2, 
            epsilon=epsilon, 
            n_iter_no_change=n_iter_no_change, 
            max_fun=max_fun
        )

    def fit(self, X, y):
        self.low_level_handler.create_predictors(y)
        self.low_level_handler.fit(X, y)
        
        low_level_predictions = []
        for i in self.low_level_handler.classes:
            low_level_predictions.append(self.low_level_handler.get_predictor_results(i, X))
        
        low_level_predictions = np.column_stack(low_level_predictions)
        self.high_level_model.fit(low_level_predictions, y)

    def predict(self, X):
        low_level_predictions = []
        for i in self.low_level_handler.classes:
            low_level_predictions.append(self.low_level_handler.get_predictor_results(i, X))
        
        low_level_predictions = np.column_stack(low_level_predictions)
        
        high_level_predictions = self.high_level_model.predict_proba(low_level_predictions)
        
        final_predictions = []
        
        for idx, prob_distribution in enumerate(high_level_predictions):
            sorted_classes = sorted(
                enumerate(prob_distribution), key=lambda x: x[1], reverse=True
            )
            
            primary_class, primary_confidence = sorted_classes[0]
            
            primary_result = self.low_level_handler.get_predictor_results(self.high_level_model.classes_[primary_class], X[idx])

            if primary_result:
                final_predictions.append((primary_class, False) if self.include_status else primary_class)
                continue

            is_dirty = True
            for alt_class, alt_confidence in sorted_classes[1:]:
                alt_result = self.low_level_handler.get_predictor_results(alt_class, X[idx])
                
                if alt_result and abs(primary_confidence - alt_confidence) <= self.margin:
                    final_predictions.append((alt_class, False) if self.include_status else alt_class)
                    is_dirty = False
                    break

            if is_dirty:
                final_predictions.append(
                    (primary_class, True) if self.include_status else primary_class
                )
        
        return final_predictions

    def predict_proba(self, X):
        low_level_predictions = []
        for i in self.low_level_handler.classes:
            low_level_predictions.append(self.low_level_handler.get_predictor_proba(i, X))

        reshaped_predictions = np.column_stack([pred[:, 1] for pred in low_level_predictions])

        high_level_predictions = self.high_level_model.predict_proba(reshaped_predictions)

        return high_level_predictions

    def fit_predictor(self, X, y):
        self.low_level_handler.fit_predictor(X, y)

class AydelotteClassifier():
    def __init__(self, margin=0.05, include_status=False, model=XGBClassifier(), lower_params=None, class_ratio=0.20, hidden_layer_sizes=(100,), activation='relu', *, 
                 solver='adam', alpha=0.0001, batch_size='auto', learning_rate='constant', learning_rate_init=0.001, power_t=0.5, max_iter=200, shuffle=True, 
                 random_state=None, tol=0.0001, verbose=False, warm_start=False, momentum=0.9, nesterovs_momentum=True, early_stopping=False, validation_fraction=0.1, 
                 beta_1=0.9, beta_2=0.999, epsilon=1e-08, n_iter_no_change=10, max_fun=15000):
        
        self.margin = margin
        self.include_status = include_status
        self.model = model
        self.lower_params = lower_params
        self.class_ratio = class_ratio
        self.hidden_layer_sizes = hidden_layer_sizes
        self.activation = activation
        self.solver = solver
        self.alpha = alpha
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.learning_rate_init = learning_rate_init
        self.power_t = power_t
        self.max_iter = max_iter
        self.shuffle = shuffle
        self.random_state = random_state
        self.tol = tol
        self.verbose = verbose
        self.warm_start = warm_start
        self.momentum = momentum
        self.nesterovs_momentum = nesterovs_momentum
        self.early_stopping = early_stopping
        self.validation_fraction = validation_fraction
        self.beta_1 = beta_1
        self.beta_2 = beta_2
        self.epsilon = epsilon
        self.n_iter_no_change = n_iter_no_change
        self.max_fun = max_fun
        
        self.high_level_handler = HighLevelHandler(
            margin=self.margin, 
            include_status=self.include_status, 
            model=self.model,
            lower_params=self.lower_params,
            hidden_layer_sizes=self.hidden_layer_sizes,
            activation=self.activation,
            solver=self.solver,
            alpha=self.alpha,
            batch_size=self.batch_size,
            learning_rate=self.learning_rate,
            learning_rate_init=self.learning_rate_init,
            power_t=self.power_t,
            max_iter=self.max_iter,
            shuffle=self.shuffle,
            random_state=self.random_state,
            tol=self.tol,
            verbose=self.verbose,
            warm_start=self.warm_start,
            momentum=self.momentum,
            nesterovs_momentum=self.nesterovs_momentum,
            early_stopping=self.early_stopping,
            validation_fraction=self.validation_fraction,
            beta_1=self.beta_1,
            beta_2=self.beta_2,
            epsilon=self.epsilon,
            n_iter_no_change=self.n_iter_no_change,
            max_fun=self.max_fun
        )

    def fit(self, X, y):
        self.high_level_handler.fit(X, y)
        self.high_level_handler.low_level_handler.fit(X, y)

    def predict(self, X):
        return self.high_level_handler.predict(X)

    def predict_proba(self, X):
        low_level_predictions = []
        for i in self.high_level_handler.low_level_handler.classes:
            low_level_predictions.append(self.high_level_handler.low_level_handler.get_predictor_results(i, X))
        
        low_level_predictions = np.column_stack(low_level_predictions)
        
        high_level_predictions = self.high_level_handler.high_level_model.predict_proba(low_level_predictions)
        
        return high_level_predictions

    def score(self, X, y):
        y_pred = self.predict(X)
        return np.mean(y_pred == y)
    
    def get_params(self, deep=True):
        return {
            'margin': self.margin,
            'include_status': self.include_status,
            'model': self.model,
            'lower_params': self.lower_params,
            'class_ratio': self.class_ratio,
            'hidden_layer_sizes': self.hidden_layer_sizes,
            'activation': self.activation,
            'solver': self.solver,
            'alpha': self.alpha,
            'batch_size': self.batch_size,
            'learning_rate': self.learning_rate,
            'learning_rate_init': self.learning_rate_init,
            'power_t': self.power_t,
            'max_iter': self.max_iter,
            'shuffle': self.shuffle,
            'random_state': self.random_state,
            'tol': self.tol,
            'verbose': self.verbose,
            'warm_start': self.warm_start,
            'momentum': self.momentum,
            'nesterovs_momentum': self.nesterovs_momentum,
            'early_stopping': self.early_stopping,
            'validation_fraction': self.validation_fraction,
            'beta_1': self.beta_1,
            'beta_2': self.beta_2,
            'epsilon': self.epsilon,
            'n_iter_no_change': self.n_iter_no_change,
            'max_fun': self.max_fun
        }

    def set_params(self, **params):
        for key, value in params.items():
            setattr(self, key, value)
        return self