In [None]:
import pandas as pd
import numpy as np
from xgboost import XGBClassifier
from sklearn.utils import resample
from sklearn.metrics import roc_curve, auc, confusion_matrix
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
from sklearn.neighbors import NearestNeighbors
import time
from sklearn.model_selection import StratifiedKFold, train_test_split
import random
from joblib import Parallel, delayed

In [None]:
np.random.seed(42)

def train_ensemble(X, y, classifiers, max_step, rho, batch_size):

    n = len(X)
    K = len(classifiers)
    X = X.values
    y = y.values
    initial_lr = 0.1
    decay_rate = 0.01 

    predictions_matrix = ensemble_predict(X, classifiers)
    diversity_matrix = precompute_diversity_parallel(predictions_matrix, y)
    w = initialize_weights(classifiers, predictions_matrix, 0.6, y) 
    current_z = 0

    for t in range(max_step):
        indices = np.random.permutation(n)
        X = X[indices]
        y = y[indices]
        predictions_matrix_shuffled = predictions_matrix[indices]

        learning_rate = initial_lr / (1 + decay_rate * t)
        for j in range(0, n, batch_size):
            X_batch = X[j:j + batch_size]
            y_batch = y[j:j + batch_size]
            predictions_matrix_batch = predictions_matrix_shuffled[j:j + batch_size] 
            F_X_batch = np.dot(predictions_matrix_batch, w) 

            gradient = compute_gradient(X_batch, y_batch, F_X_batch, predictions_matrix_batch, classifiers, rho, diversity_matrix)
            w = w - learning_rate * gradient
            w = w_standard(w)
            w /= np.sum(w)

        F_X = np.dot(predictions_matrix, w)
        last_z = objective_function(w, X, y, F_X, predictions_matrix, classifiers, rho, diversity_matrix)

        if t > 0 and abs(last_z - current_z) < 1e-5:
            break
        
        current_z = last_z

    return w


In [None]:

def train_classifier(X, y, subset_num):
    n_samples = len(X)
    classifiers = Parallel(n_jobs=1)(delayed(train_single_classifier)(X, y, n_samples, i) for i in range(subset_num))
    return classifiers

def train_single_classifier(X, y, n_samples, i):
    X_subset, y_subset = resample(X, y, n_samples=n_samples, random_state=i*10, replace=True)
    classifier = XGBClassifier(objective='binary:logistic', n_estimators=100, random_state=i*10, eval_metric = 'logloss')
    classifier.fit(X_subset, y_subset, verbose=True)
    return classifier


In [None]:
def get_param(classifiers):
    params_list = []

    for model in classifiers:
        df = model.get_booster().trees_to_dataframe()
        total_params = (df['Feature'] == 'Leaf').sum()
        params_list.append(total_params)

    return params_list


In [None]:

def compute_AIC(params, y_true, y_pred):
    return 2 * params - 2 * np.sum(cross_loss(y_true, y_pred))

def compute_BIC(params, m, y_true, y_pred):
    return np.log(m) * params - 2 * np.sum(cross_loss(y_true, y_pred))

def initialize_weights(classifiers, predictions_matrix, lambda_A, y):
    AIC_list = []
    BIC_list = []
    y_np = y
    
    params_list = get_param(classifiers)
    
    for k in range(predictions_matrix.shape[1]):
        pred_prob = predictions_matrix[:, k]
        params_k = params_list[k]
        AIC_k = compute_AIC(params_k, y_np, pred_prob)
        BIC_k = compute_BIC(params_k, len(y), y_np, pred_prob)
        AIC_list.append(AIC_k)
        BIC_list.append(BIC_k)

    w_tilde = lambda_A * np.array(AIC_list) + (1 - lambda_A) * np.array(BIC_list)
    w_normalized = 1 / w_tilde
    return w_normalized / np.sum(w_normalized)


In [None]:

def ensemble_predict(X, classifiers):
    K = len(classifiers)
    
    predictions_matrix = np.zeros((len(X), K))  # (n_samlpe, K)
    for k in range(K):
        predictions_matrix[:, k] = classifiers[k].predict_proba(X)[:, 1] 

    return predictions_matrix


In [None]:
from joblib import Parallel, delayed

def compute_div_entry(correct_matrix, k, l):
    ck = correct_matrix[:, k]
    cl = correct_matrix[:, l]
    N11 = np.sum((ck == 1) & (cl == 1))
    N10 = np.sum((ck == 1) & (cl == 0))
    N01 = np.sum((ck == 0) & (cl == 1))
    N00 = np.sum((ck == 0) & (cl == 0))
    div = N00 / (N00 + N01 + N10 + N11)
    return div

def precompute_diversity_parallel(predictions_matrix, y_true):
    K = predictions_matrix.shape[1]
    binary_preds = (predictions_matrix >= 0.5).astype(int)
    y_true = y_true.reshape(-1, 1)
    correct_matrix = (binary_preds == y_true).astype(int)

    diversity_matrix = np.zeros((K, K))
    for k in range(K):
        for l in range(k + 1, K):
            div = compute_div_entry(correct_matrix, k, l)
            diversity_matrix[k, l] = div
            diversity_matrix[l, k] = div

    return diversity_matrix


In [None]:
def cross_loss(y_true, y_pred):
    epsilon = 1e-6
    return ((y_true * np.log(y_pred + epsilon)) + ((1 - y_true) * np.log(1 - y_pred + epsilon)))

In [None]:

def objective_function(w, X, y, F_X, predictions_matrix, classifiers, rho, diversity_matrix):
    loss = -(np.sum(cross_loss(y, F_X))) / len(X)
    diversity = np.dot(np.dot(diversity_matrix, np.ones(len(w))), w)
    return loss + rho * diversity

def compute_gradient(X, y, F_X, predictions_matrix, classifiers, rho, diversity_matrix):
    K = len(classifiers)
    grad = np.zeros(K)
    y_np = y

    grad_matrix = predictions_matrix / ((1 - y_np).reshape(-1, 1) - F_X.reshape(-1, 1))
    grad = np.sum(grad_matrix, axis=0) / len(X)  
    
    grad += rho * np.dot(diversity_matrix, np.ones(K)) 
    return grad


In [None]:
def w_standard(w):
    min_val = np.min(w)
    max_val = np.max(w)
    
    if max_val <= 0:
        w += abs(min_val)
        min_val = 0
        max_val = np.max(w)
    
    w_normalized = (w - min_val) / (max_val - min_val)
    
    return w_normalized


In [None]:

def train_part(X_train, y_train, max_step, subset_num, rho, batch_size):
    classifiers = train_classifier(X_train, y_train, subset_num)
    w_optimal = train_ensemble(X_train, y_train, classifiers, max_step, rho, batch_size) ##
    return classifiers, w_optimal
    

def classification(X_test, y_test, classifiers, w_optimal):
    predictions_matrix = ensemble_predict(X_test, classifiers)
    F_X = np.dot(predictions_matrix, w_optimal)
    y_pred = (F_X >= 0.5).astype(int)
    probabilities = F_X

    fpr, tpr, thresholds = roc_curve(y_test, probabilities) 
    roc_auc = auc(fpr, tpr)

    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='binary')
    recall = recall_score(y_test, y_pred, average='binary')
    f1 = f1_score(y_test, y_pred, average='binary')

    TN, FP, _, _ = confusion_matrix(y_test, y_pred).ravel()
    specificity = TN / (TN + FP) if (TN + FP) > 0 else 0
    
    g_mean = (recall * specificity) ** 0.5

    results = {
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'Specificity': specificity,
        'G-Mean': g_mean,
        'F1 Score': f1,
        'AUC': roc_auc
    }

    return results


In [None]:

def dataset(data):
    data_X = data.drop(columns=['Label'])
    data_y = data['Label']
    
    return data_X, data_y


In [None]:
import time
from sklearn.model_selection import KFold
import numpy as np
import pandas as pd

def cross_validate_model(X, y, max_step, subset_num, rho, batch_size, n_splits=10):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    all_metrics = []
    all_times = []

    fold = 1
    for train_index, test_index in kf.split(X):
        print(f"Fold {fold}...")

        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        start_time = time.time()
        classifiers, w_optimal = train_part(X_train, y_train, max_step, subset_num, rho, batch_size)
        results = classification(X_test, y_test, classifiers, w_optimal)

        end_time = time.time()
        elapsed_time = end_time - start_time
        results['Time (s)'] = elapsed_time

        print(f"Fold {fold} completed in {elapsed_time:.2f} seconds.")
        all_metrics.append(results)
        all_times.append(elapsed_time)

        fold += 1

    results_df = pd.DataFrame(all_metrics)
    mean_results = results_df.mean().to_dict()
    std_results = results_df.std().to_dict()

    print("\n=== Average Performance (10-Fold) ===")
    for key in mean_results:
        print(f"{key}: {mean_results[key]:.4f} ± {std_results[key]:.4f}")

    return classifiers, w_optimal, results_df, mean_results, std_results


In [None]:
from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler
def preprocess_data(df, scaling_method='minmax'):
    df = df.copy()

    label_encoder = LabelEncoder()
    df['Label'] = label_encoder.fit_transform(df['Label'].values)

    labels = df['Label']
    features = df.drop(columns=['Label'])

    scaler = MinMaxScaler() if scaling_method == 'minmax' else StandardScaler()
    features_df = pd.DataFrame(scaler.fit_transform(features), columns=features.columns)

    process_data = pd.concat([features_df, labels.reset_index(drop=True)], axis=1)


    return process_data

In [None]:
np.random.seed(42)

data = pd.read_csv(r'')
data = preprocess_data(data, scaling_method='minmax')

data_X, data_y = dataset(data)
classifiers, w_optimal, results, mean_results, std_results = cross_validate_model(
    data_X,
    data_y,
    max_step=50,
    subset_num=30,
    rho=0.1,
    batch_size=16,
    n_splits=10
)