In [2]:
from numpy import mean
from numpy import std
from sklearn.datasets import make_moons
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.ensemble import VotingClassifier
import matplotlib.pyplot as plt
import numpy as np  
import os   
import joblib
from tensorflow.keras.models import load_model
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
from tensorflow.keras.models import Sequential

In [3]:
# get the dataset
def get_dataset():
	X, y = make_moons(n_samples=300, noise=0.2, random_state=42)
	return X, y

# Charger les modèles depuis le dossier "models"
def get_models():
    """Load models from the 'models' directory"""
    models = {}
    models_folder = os.path.join(os.getcwd(), "models")
    for file in os.listdir(models_folder):
        model_name = file.split(".")[0]
        file_extension = file.split(".")[-1]
        if file_extension == "pkl":
            models[model_name] = joblib.load(os.path.join(models_folder, file))
            print(f"Imported sklearn model: {model_name}")
        elif file_extension == "keras":
            models[model_name] = load_model(os.path.join(models_folder, file))
            print(f"Imported keras model: {model_name}")
    print(models)
    return models

def custom_soft_voting_predict(models, X):
    probas = []  # Initialize probas list
    for model in models:
        if hasattr(model, "predict_proba"):
            probas.append(model.predict_proba(X))
        elif isinstance(model, Sequential):  # Handle Keras models
            probas.append(model.predict(X))
        else:
            print(f"Model {model} does not support predict_proba and will be skipped.")
    if not probas:
        raise ValueError("No models in the ensemble support predict_proba.")
    avg_proba = np.mean(probas, axis=0)
    return np.argmax(avg_proba, axis=1)

# evaluate a list of models
def evaluate_ensemble(models, X, y):
    # check for no models
    if len(models) == 0:
        return 0.0
    model_objects = [model for _, model in models]
    # define the evaluation procedure
    cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
    # calculate predictions for each fold
    scores = []
    for train_idx, test_idx in cv.split(X, y):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        # train each model
        for model in model_objects:
            model.fit(X_train, y_train)
        # make predictions using custom soft voting
        y_pred = custom_soft_voting_predict(model_objects, X_test)
        # calculate accuracy
        score = mean(y_pred == y_test)
        scores.append(score)
    # return mean score
    return mean(scores)

def calculate_Q_statistic(predictions1, predictions2):
    """Calculate Q-statistic between two classifiers' predictions"""
    N11 = sum((predictions1 == 1) & (predictions2 == 1))
    N00 = sum((predictions1 == 0) & (predictions2 == 0))
    N10 = sum((predictions1 == 1) & (predictions2 == 0))
    N01 = sum((predictions1 == 0) & (predictions2 == 1))
    
    Q = (N11 * N00 - N10 * N01) / (N11 * N00 + N10 * N01 + 1e-10)
    return Q

def get_predictions(model, X, y):
    """Get binary predictions from a model, handling different prediction attributes"""
    try:
        if hasattr(model, 'predict_proba'):
            # Models with predict_proba method
            y_pred = model.predict_proba(X)[:, 1] > 0.5
        elif hasattr(model, 'decision_function'):
            # Models with decision_function method
            y_pred = model.decision_function(X) > 0
        else:
            # Fallback to predict method
            y_pred = model.predict(X)
            if len(y_pred.shape) > 1 and y_pred.shape[1] > 1:
                y_pred = y_pred[:, 1] > 0.5
        return y_pred.astype(int)
    except Exception as e:
        print(f"Error predicting with model: {e}")
        # Fallback to zeros if prediction fails
        return np.zeros_like(y)

def mean_Q_statistic(models, X, y):
    """Calculate mean Q-statistic across all pairs of models"""
    n_models = len(models)
    if n_models < 2:
        return 0.0
    
    Q_values = []
    predictions = []
    for model in models:
        try:
            pred = get_predictions(model[1], X, y)
            predictions.append(pred)
        except Exception as e:
            print(f"Error getting predictions for model {model[0]}: {e}")
            predictions.append(np.zeros_like(y))  # Fallback to zeros if prediction fails
    
    for i in range(n_models):
        for j in range(i+1, n_models):
            Q = calculate_Q_statistic(predictions[i], predictions[j])
            Q_values.append(Q)
            
    return np.mean(Q_values)

def prune_round(models_in, X, y):
    """Perform a single round of pruning based on Q-statistic diversity"""
    baseline_acc = evaluate_ensemble(models_in, X, y)
    baseline_Q = mean_Q_statistic(models_in, X, y)
    best_score = baseline_acc
    removed = None
    # Try removing each model and evaluate both accuracy and diversity
    for m in models_in:
        dup = [model for model in models_in if model != m]
        
        # Calculate new accuracy and Q-statistic
        new_acc = evaluate_ensemble(dup, X, y)
        new_Q = mean_Q_statistic(dup, X, y)
        
        # Accept removal if accuracy doesn't decrease significantly (within 1%)
        # and diversity improves (lower Q-statistic)
        if new_acc >= best_score * 0.99 and new_Q < baseline_Q:
            best_score = new_acc
            removed = m
            baseline_Q = new_Q
            
    return best_score, removed, baseline_Q

# prune an ensemble from scratch
def prune_ensemble(models, X, y):
    scores = []
    Q_stats = []
    best_score = 0.0
    m_length = len(models)-1
    iterations = 0
    # prune ensemble until no further improvement or max iterations reached
    while iterations < m_length:
        # remove one model from the ensemble
        score, removed, stat_Q = prune_round(models, X, y)
        scores.append(score)
        Q_stats.append(stat_Q)
        # check for no improvement
        if removed is None:
            print('>no further improvement')
            break
        # keep track of best score
        best_score = score
        models = [model for model in models if model != removed]
        models.remove(removed)
        # Removed model is already excluded in the list comprehension above
        print('>%.3f (removed: %s)' % (score, removed[0]))
        iterations += 1
    return best_score, models, scores, Q_stats



In [None]:
# Main execution code
if __name__ == "__main__":

    # define dataset
    X, y = get_dataset()
    print("Dataset loaded successfully")
    
    # get the models to evaluate
    models = get_models()
    # Use the predefined models variable
    if not models:
        raise ValueError("No models found in the predefined models list")
    # convert models dict to list of tuples
    models = [(name, model) for name, model in models]
    print('done')
    # run pruning
    score, model_list, scores, Q_stats = prune_ensemble(models, X, y)
    
    # Plotting
    plt.figure(figsize=(12, 7))
    plt.plot(scores, label='Accuracy Scores', linestyle='-', linewidth=2, color='red', marker='o')
    plt.plot(Q_stats, label='Q-Statistics', linestyle='-', linewidth=2, color='blue', marker='s')
    plt.xlabel('Iteration', fontsize=12)
    plt.ylabel('Value', fontsize=12)
    plt.title('Accuracy Scores and Q-Statistics Over Iterations', fontsize=14, pad=20)
    plt.legend(loc='center right', bbox_to_anchor=(1.15, 0.5))
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()
    # Calculate remaining model diversity
    final_Q = mean_Q_statistic([model for _, model in model_list], X, y)
    print('Final Q-statistic: %.3f' % final_Q)
    # Print results
    names = ','.join([n for n, _ in model_list])
    print('Remaining Models: %s' % names)
    print('Final Mean Accuracy: %.3f' % score)
    