In [4]:
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, cross_val_score, train_test_split
from skopt import BayesSearchCV
from sklearn.metrics import accuracy_score, mean_absolute_error, precision_score, roc_auc_score
from sklearn.base import BaseEstimator
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
import numpy as np

class ModelOptimization(BaseEstimator):
    def __init__(self, models, param_grids, scoring, cv=5, top_n=3, n_iter=10, search_method='grid'):
        self.models = models
        self.param_grids = param_grids
        self.scoring = scoring
        self.cv = cv
        self.top_n = top_n
        self.best_models = []
        self.search_method = search_method
        self.n_iter = n_iter
        self.best_overall_score = float('-inf')
        self.best_overall_model = None
        self.initial_scores = {}
        self.best_hyperparameters = {}
        self.saved_models = []

    def _fit_initial_models(self, X_train, y_train):
        all_scores = {}
        
        for i, model in enumerate(self.models):
            print(f"Fitting model {i+1}/{len(self.models)}: {type(model).__name__}")
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)

            # Calculate and store the initial score
            model_name = type(model).__name__
            score = self._calculate_score(self.scoring, y_test, y_pred)
            all_scores[model_name] = score
            self.initial_scores[model_name] = score
            
            if score > self.best_overall_score:
                self.best_overall_score = score
                self.best_overall_model = model

    def _select_top_models(self):
        top_models = sorted(self.initial_scores.items(), key=lambda x: x[1], reverse=True)[:self.top_n]
        self.best_model_names = [model[0] for model in top_models]

        print("\nModel ranking based on scores:")
        for rank, (model_name, score) in enumerate(top_models, 1):
            print(f"Rank {rank}: {model_name} - Score: {score:.4f}")
        print('\n\n')

    def _optimize_and_cv_top_models(self, X_train, y_train):
        for model_name in self.best_model_names:
            model_index = [i for i, model in enumerate(self.models) if type(model).__name__ == model_name][0]
            model = self.models[model_index]
            print(f"Optimizing hyperparameters and cross-validating for model {model_name}")

            search = self._create_search(model, self.param_grids[model_index])
            search.fit(X_train, y_train)

            best_model = search.best_estimator_
            best_score = search.best_score_
            best_params = search.best_params_

            print(f"Best score for {model_name}: {best_score:.4f}")
            print(f"Best hyperparameters for {model_name}: {best_params}\n\n")

            self.best_models.append(best_model)
            self.best_hyperparameters[model_name] = best_params

            # Compare models without hyperparameters using cross-validation on training data
            print(f"Cross-validating model without hyperparameters: {model_name}")
            scores_cv = cross_val_score(model, X_train, y_train, scoring=self.scoring, cv=self.cv, n_jobs=-1)
            avg_score = np.mean(scores_cv)
            print(f"Avg CV score for {model_name} without hyperparameters: {avg_score:.4f}")

            if avg_score > self.initial_scores[model_name]:
                self.best_hyperparameters[model_name] = None
                relative_increase = ((avg_score - self.initial_scores[model_name]) / self.initial_scores[model_name]) * 100
                print(f"Relative increase in score: {relative_increase:.2f}%")
        
    def _calculate_score(self, scoring, y_true, y_pred):
        if scoring == 'accuracy':
            return accuracy_score(y_true, y_pred)
        elif scoring == 'roc_auc':
            if len(set(y_true)) > 2:
                return roc_auc_score(y_true, y_pred, average='macro', multi_class='ovr')
            else:
                return roc_auc_score(y_true, y_pred)
        elif scoring == 'precision':
            return precision_score(y_true, y_pred, average='macro')
        elif scoring == 'mae':
            return -mean_absolute_error(y_true, y_pred)
        else:
            raise ValueError(f"Invalid scoring metric: {scoring}")

    def _create_search(self, model, param_grid):
        if self.search_method == 'grid':
            return GridSearchCV(model, param_grid, scoring=self.scoring, cv=self.cv, n_jobs=-1)
        elif self.search_method == 'random':
            return RandomizedSearchCV(model, param_grid, scoring=self.scoring, cv=self.cv, n_jobs=-1, n_iter=self.n_iter)
        elif self.search_method == 'bayesian':
            return BayesSearchCV(model, param_grid, scoring=self.scoring, cv=self.cv, n_iter=self.n_iter, n_jobs=-1)
        else:
            raise ValueError("Invalid search method. Supported options: 'grid', 'random', 'bayesian'")
    
    def _list_saved_models(self):
        print("Saved models:")
        for i, saved_model in enumerate(self.saved_models, start=1):
            print(f"{i}: {saved_model['model_name']} - Score: {saved_model['score']:.4f}")
            
            
    def fit(self, X, y):
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        self._fit_initial_models(X_train, y_train)
        self._select_top_models()
        self._optimize_and_cv_top_models(X_train, y_train)

        print(f"Best overall score: {self.best_overall_score} for model {type(self.best_overall_model).__name__} with hyperparameters:{self.best_hyperparameters[type(self.best_overall_model).__name__]}")
        return self
    
    def predict(self, X):
        predictions = {}
        for model in self.best_models:
            model_name = type(model).__name__
            y_pred = model.predict(X)
            predictions[model_name] = y_pred

        return predictions
    
    def get_best_model(self):
        best_hyperparameters = self.best_hyperparameters.get(type(self.best_overall_model).__name__, None)
        return self.best_overall_model, best_hyperparameters

    def choose_model_to_load(self):
        self._list_saved_models()
        choice = int(input("Enter the index of the model you want to load: ")) - 1
        if 0 <= choice < len(self.saved_models):
            return self.saved_models[choice]['model']
        else:
            print("Invalid choice.")
            return None

    def save_best_models(self, num_models_to_save):
        if num_models_to_save > 0:
            top_saved_models = sorted(self.saved_models, key=lambda x: x['score'], reverse=True)[:num_models_to_save]
            self.saved_models = top_saved_models
