Realization of the framework

In [None]:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.losses import BinaryCrossentropy
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.svm import LinearSVC
from sklearn.feature_selection import SelectFromModel
from sklearn.feature_selection import RFE
from imblearn.over_sampling import RandomOverSampler, SMOTE
from imblearn.combine import SMOTETomek
from imblearn.under_sampling import OneSidedSelection, EditedNearestNeighbours
from imblearn.pipeline import Pipeline
from sklearn.cluster import KMeans
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, roc_auc_score, confusion_matrix
import numpy as np
from sklearn.exceptions import NotFittedError
from sklearn.ensemble import StackingClassifier

In [None]:
class Model:
    def __init__(self, model_type, data , target_column):
        self.model_fitted = False
        self.model_type = model_type
        self.data = data
        self.target_column = target_column
        self.training_data = pd.DataFrame()
        self.testing_data = pd.DataFrame()

        if self.model_type == 1:
            self.classifier = RandomForestClassifier(random_state=3)
        elif self.model_type == 2:
            self.classifier = Sequential([
                Dense(units=10, activation='sigmoid'),
                Dense(units=6, activation='sigmoid'),
                Dense(units=1, activation='linear')
            ])
            self.classifier.compile(loss=BinaryCrossentropy(from_logits=True))
        elif self.model_type == 3:
            self.classifier = XGBClassifier()
        elif self.model_type == 4:
            self.classifier = LogisticRegression(random_state=3)
        elif self.model_type == 5:
            self.classifier = lgb.LGBMClassifier()
        else:
            raise ValueError("Invalid model_type")

    def feature_selecting(self, method):
    # Separate features and target
        features = self.data.drop(self.target_column, axis=1)
        target = self.data[[self.target_column]]

        if method == 1:
            lsvc = LinearSVC(C=0.01, penalty="l1", dual=False).fit(features, target)
            model = SelectFromModel(lsvc, prefit=True)
            selected_features_array = model.transform(features)
            selected_features = pd.DataFrame(selected_features_array, columns=features.columns[model.get_support()])
        elif method == 2:
            rfe = RFE(estimator=RandomForestClassifier(random_state=3), n_features_to_select=3)
            rfe.fit(features, target)
            selected_features_array = rfe.transform(features)
            selected_features = pd.DataFrame(selected_features_array, columns=features.columns[rfe.get_support()])
        else:
            raise ValueError("Invalid feature selection method")
        self.data = pd.concat([selected_features, target.reset_index(drop=True)], axis=1)

    def split_data(self):
        X_train, X_test, y_train, y_test = train_test_split(
            self.data.drop(self.target_column, axis=1),
            self.data[self.target_column],
            test_size=0.3,
            random_state=42
        )
        self.training_data = pd.concat([X_train, y_train], axis=1)
        self.testing_data = pd.concat([X_test, y_test], axis=1)

    def sample_modifying(self, method):
        X = self.training_data.drop(self.target_column, axis=1)
        y = self.training_data[self.target_column]
        if method == 0:
            return
        elif method == 1:
            oversample = RandomOverSampler(sampling_strategy='minority')
            X_resampled, y_resampled = oversample.fit_resample(X, y)
        elif method == 2:
            smote = SMOTE()
            X_resampled, y_resampled = smote.fit_resample(X, y)
        elif method == 3:
            smote_tomek = SMOTETomek()
            X_resampled, y_resampled = smote_tomek.fit_resample(X, y)
        elif method == 4:
            smote = SMOTE()
            oss = OneSidedSelection()
            X_resampled, y_resampled = smote.fit_resample(X, y)
            X_resampled, y_resampled = oss.fit_resample(X_resampled, y_resampled)
        elif method == 5:
            smote = SMOTE()
            enn = EditedNearestNeighbours()
            resampling_pipeline = Pipeline([('smote', smote), ('enn', enn)])
            X_resampled, y_resampled = resampling_pipeline.fit_resample(X, y)
        else:
            raise ValueError("Invalid sample modifying method")

        modified_samples = pd.concat([X_resampled, y_resampled], axis=1)
        self.training_data = modified_samples


    def fit(self):
        X_train = self.training_data.drop(self.target_column, axis=1)
        y_train = self.training_data[self.target_column].to_numpy()

        if len(np.unique(y_train)) < 2:
            print("Not enough classes in the data to fit the model.")
        else:
            self.classifier.fit(X_train, y_train)
            self.model_fitted = True

    def predict(self, X):
        return self.classifier.predict(X)

    def get_accuracy(self):
        X_test = self.testing_data.drop(self.target_column, axis=1)
        y_test = self.testing_data[self.target_column]
        y_pred = self.predict(X_test)
        if self.model_type == 2:
            y_pred = np.where(y_pred > 0.5, 1, 0)
        return accuracy_score(y_test, y_pred)

    def get_sensitivity(self):
        if not self.model_fitted:
            return 0
        X_test = self.testing_data.drop(self.target_column, axis=1)
        y_test = self.testing_data[self.target_column]
        y_pred = self.predict(X_test)
        if self.model_type == 2:
            y_pred = np.where(y_pred > 0.5, 1, 0)
        return recall_score(y_test, y_pred)

    def get_specificity(self):
        if not self.model_fitted:
            return 1
        X_test = self.testing_data.drop(self.target_column, axis=1)
        y_test = self.testing_data[self.target_column]
        y_pred = self.predict(X_test)
        if self.model_type == 2:
            y_pred = np.where(y_pred > 0.5, 1, 0)
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
        return tn / (tn + fp)

    def get_F1_score(self):
        try:
            if len(self.testing_data[self.target_column].unique()) < 2:
                return 0
            X_test = self.testing_data.drop(self.target_column, axis=1)
            y_test = self.testing_data[self.target_column]
            y_pred = self.predict(X_test)
            if self.model_type == 2:
                y_pred = np.where(y_pred > 0.5, 1, 0)
            F1_score = f1_score(y_test, y_pred)
            return F1_score
        except NotFittedError:
            return 0

    def get_roc_auc_score(self):
        try:
            if len(self.testing_data[self.target_column].unique()) < 2:
                return 0
            X_test = self.testing_data.drop(self.target_column, axis=1)
            y_test = self.testing_data[self.target_column]
            if self.model_type == 2:
                y_pred = self.predict(X_test)
            else:
                y_pred = self.predict_proba(X_test)[:, 1]
            return roc_auc_score(y_test, y_pred)
        except NotFittedError:
            return 0

    def predict_proba(self, X):
        if self.model_type == 2:
            return self.classifier.predict(X)
        else:
            return self.classifier.predict_proba(X)
    def info(self):
        model_types = { 1 : "RandomForest" ,  2 : "NeuralNetwork" ,  3 : "XGBoost" ,  4 : "LogisticRegression",  5 : "LGBM"}
        return model_types[self.model_type]

In [None]:
class HeterogeneousEnsemble:
    def __init__(self, data, target_column, feature_selection_method, sample_modifying_method):
        self.model_fitted = False
        self.feature_selection_method = feature_selection_method
        self.sample_modifying_method = sample_modifying_method
        self.target_column = target_column

        self.xgb_model = Model(3, data, target_column)
        self.lr_model = Model(4, data, target_column)
        self.rf_model = Model(1, data, target_column)

        self.xgb_model.feature_selecting(feature_selection_method)
        self.lr_model.feature_selecting(feature_selection_method)
        self.rf_model.feature_selecting(feature_selection_method)

        self.xgb_model.split_data()
        self.lr_model.split_data()
        self.rf_model.split_data()


        self.xgb_model.sample_modifying(sample_modifying_method)
        self.lr_model.sample_modifying(sample_modifying_method)
        self.rf_model.sample_modifying(sample_modifying_method)

        self.base_models = [self.xgb_model , self.lr_model , self.rf_model]
        self.stacking_model = StackingClassifier(
            estimators=[
                ('xgb', self.xgb_model.classifier),
                ('lr', self.lr_model.classifier),
                ('rf', self.rf_model.classifier),
            ],
            final_estimator=LogisticRegression(),
        )

    def fit(self):
        try:
            X = self.xgb_model.training_data.drop(self.target_column, axis=1)
            y = self.xgb_model.training_data[self.target_column]
            if len(y.unique()) < 2:
                print("Not enough classes in the data to fit the model.")
            else:
                self.stacking_model.fit(X, y)
                self.model_fitted = True
        except ValueError as ve:
            print(f"Value Error occurred during fit: {ve}")
        except Exception as e:
            print(f"An error occurred during fit: {e}")

    def predict(self, X):
        return self.stacking_model.predict(X)

    def predict_proba(self, X):
        return self.stacking_model.predict_proba(X)

    def get_accuracy(self):
        X_test = self.xgb_model.testing_data.drop(self.target_column, axis=1)
        y_test = self.xgb_model.testing_data[self.target_column]
        y_pred = self.predict(X_test)
        return accuracy_score(y_test, y_pred)

    def get_sensitivity(self):
        X_test = self.xgb_model.testing_data.drop(self.target_column, axis=1)
        y_test = self.xgb_model.testing_data[self.target_column]
        y_pred = self.predict(X_test)
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
        return tp / (tp + fn)

    def get_specificity(self):
        if not self.model_fitted:
            return 1
        X_test = self.xgb_model.testing_data.drop(self.target_column, axis=1)
        y_test = self.xgb_model.testing_data[self.target_column]
        y_pred = self.predict(X_test)
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
        return tn / (tn + fp)

    def get_F1_score(self):
        try:
            if len(self.xgb_model.testing_data[self.target_column].unique()) < 2:
                return 0
            X_test = self.xgb_model.testing_data.drop(self.target_column, axis=1)
            y_test = self.xgb_model.testing_data[self.target_column]
            y_pred = self.predict(X_test)
            F1_score = f1_score(y_test, y_pred)
            return F1_score
        except NotFittedError:
            return 0

    def get_roc_auc_score(self):
        try:
            if len(self.xgb_model.testing_data[self.target_column].unique()) < 2:
                return 0
            X_test = self.xgb_model.testing_data.drop(self.target_column, axis=1)
            y_test = self.xgb_model.testing_data[self.target_column]
            y_pred_proba = self.predict_proba(X_test)[:, 1]
            return roc_auc_score(y_test, y_pred_proba)
        except NotFittedError:
            return 0