In [4]:
!pip install pyswarm

Collecting pyswarm
  Downloading pyswarm-0.6.tar.gz (4.3 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyswarm
  Building wheel for pyswarm (setup.py) ... [?25l[?25hdone
  Created wheel for pyswarm: filename=pyswarm-0.6-py3-none-any.whl size=4464 sha256=3de26deb1cd529dab4c2845b48e92bd775e770f547d8e802718f5e8609be0326
  Stored in directory: /root/.cache/pip/wheels/71/67/40/62fa158f497f942277cbab8199b05cb61c571ab324e67ad0d6
Successfully built pyswarm
Installing collected packages: pyswarm
Successfully installed pyswarm-0.6


In [None]:
from sklearn.datasets import make_classification
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.isotonic import IsotonicRegression
from sklearn.utils import resample

import scipy.stats as stats

In [2]:


# Ayarları yapılandıralım
plt.style.use('seaborn-darkgrid')
sns.set_palette('colorblind')


# Veri kümesini oluşturalım
X, y = make_classification(n_samples=1000, n_features=20, n_informative=2, n_redundant=10,
                           random_state=42, flip_y=0.03, class_sep=0.7)

# Eğitim, kalibrasyon ve test kümelerini ayarlayalım
x_train, x_temp, y_train, y_temp = train_test_split(X, y, test_size=0.5, random_state=42, stratify=y)
x_calibration, x_test, y_calibration, y_test = train_test_split(x_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

  plt.style.use('seaborn-darkgrid')


In [6]:
model = RandomForestClassifier(random_state=42)
model.fit(x_train, y_train)

In [None]:
import numpy as np
import concurrent.futures
from sklearn.metrics import (
    log_loss,
    hinge_loss,
    accuracy_score,
    recall_score,
    precision_score,
    f1_score,
    roc_auc_score,
    confusion_matrix,
)
from pyswarm import pso
from typing import List, Tuple, Callable, Dict, Optional, Union, Any

class ConformalPredictionOptimizer:
    def __init__(self, model: Callable):
        """
        Initializes the optimizer object with a machine learning model.

        :param model: A trained binary classification model with a predict_proba method.
        """
        self.model = model

    @staticmethod
    def squared_hinge_loss(y_true: np.ndarray, y_pred: np.ndarray) -> float:
        """
        Computes the squared hinge loss.

        :param y_true: Array of true binary labels.
        :param y_pred: Array of predicted probabilities.
        :return: The mean squared hinge loss.
        """
        loss = np.maximum(0, 1 - y_true * y_pred)
        return np.mean(loss ** 2)

    def objective_function(self, weights: np.ndarray, loss_function: Callable, x_calib: np.ndarray, y_calib: np.ndarray) -> float:
        """
        Computes the objective function value for given weights and a loss function.

        :param weights: Array of weights to apply to the model's predictions.
        :param loss_function: A callable loss function to evaluate the objective value.
        :param x_calib: Calibration feature data.
        :param y_calib: Calibration target data.
        :return: The total loss computed with the given loss function.
        """
        self.validate_inputs(loss_function, x_calib, y_calib)

        if len(weights) != 2:
            raise ValueError("Weights array must be of length 2.")

        weighted_proba = weights[0] * self.model.predict_proba(x_calib)[:, 1] + weights[1]
        decision_function_output = 2 * weighted_proba - 1  # Adjusted predictions
        return np.mean(loss_function(y_calib, decision_function_output))

    def validate_inputs(self, loss_function: Callable, x_calib: np.ndarray, y_calib: np.ndarray) -> None:
        if not callable(loss_function):
            raise ValueError("loss_function must be a callable function.")

        if not isinstance(x_calib, np.ndarray) or not isinstance(y_calib, np.ndarray):
            raise ValueError("x_calib and y_calib must be numpy arrays.")

        if len(x_calib) != len(y_calib):
            raise ValueError(f"x_calib and y_calib must be of the same length. Got lengths {len(x_calib)} and {len(y_calib)}.")

    def optimize(self, x_train: np.ndarray, y_train: np.ndarray, x_calib: np.ndarray, y_calib: np.ndarray, x_test: np.ndarray, lower_bound: Tuple[float, float] = (0, 0), upper_bound: Tuple[float, float] = (1, 1)) -> Tuple[List[Tuple[float, float]], str]:
        """
        Optimizes the conformal prediction model and returns the best intervals.

        :param x_train: Training feature data.
        :param y_train: Training target data.
        :param x_calib: Calibration feature data.
        :param y_calib: Calibration target data.
        :param x_test: Test feature data.
        :param lower_bound: Lower bounds for weights.
        :param upper_bound: Upper bounds for weights.
        :return: The optimized intervals and the corresponding best loss function name.
        """
        self.model.fit(x_train, y_train)
        best_score = float('inf')
        best_intervals = None
        best_loss = None

        loss_functions = {
            'log_loss': log_loss,
            'hinge_loss': hinge_loss,
            'squared_hinge_loss': self.squared_hinge_loss
        }

        with concurrent.futures.ProcessPoolExecutor() as executor:
            futures = [
                executor.submit(pso, self.objective_function, lower_bound, upper_bound, args=(loss_function, x_calib, y_calib))
                for _, loss_function in loss_functions.items()
            ]

            for future, loss_name in zip(concurrent.futures.as_completed(futures), loss_functions.keys()):
                try:
                    weights, score = future.result()
                except Exception as e:
                    raise RuntimeError(f"Optimization failed with loss function {loss_name}. Reason: {str(e)}")

                test_proba = weights[0] * self.model.predict_proba(x_test)[:, 1] + weights[1]
                calib_proba = weights[0] * self.model.predict_proba(x_calib)[:, 1] + weights[1]

                calib_scores = [loss_functions[loss_name]([y], [2 * proba - 1]) for y, proba in zip(y_calib, calib_proba)]
                intervals = [
                    (np.mean(calib_scores <= score), np.mean(calib_scores < score) + (1 / len(calib_scores)))
                    for score in [loss_functions[loss_name]([y], [2 * proba - 1]) for proba in test_proba]
                ]

                if score < best_score:
                    best_score = score
                    best_intervals = intervals
                    best_loss = loss_name

        return best_intervals, best_loss


def evaluate_intervals(intervals: List[Tuple[float, float]], y_test: np.ndarray, model_predictions: np.ndarray) -> Tuple[Dict[str, Union[float, np.ndarray]], Optional[Dict[str, Union[float, np.ndarray]]]]:
    """
    Evaluates the model performance based on the intervals and returns the performance metrics.

    :param intervals: The predicted intervals for the test data.
    :param y_test: The true labels for the test data.
    :param model_predictions: The model's predictions on the test data.
    :return: Performance metrics for reliable and non-reliable predictions.
    """
    reliable_indices = [i for i, interval in enumerate(intervals) if interval[0] <= 0.5]
    non_reliable_indices = [i for i, interval in enumerate(intervals) if interval[0] > 0.5]

    reliable_y_test = np.take(y_test, reliable_indices)
    non_reliable_y_test = np.take(y_test, non_reliable_indices)

    reliable_predictions = np.take(model_predictions, reliable_indices)
    non_reliable_predictions = np.take(model_predictions, non_reliable_indices)

    def calculate_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Union[Dict[str, Union[float, np.ndarray]], str]:
        if len(y_true) < 2 or len(np.unique(y_true)) < 2:
            return "Cannot calculate metrics with less than two classes"

        return {
            'accuracy': accuracy_score(y_true, y_pred),
            'recall': recall_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred),
            'f1': f1_score(y_true, y_pred),
            'roc_auc': roc_auc_score(y_true, y_pred),
            'conf_matrix': confusion_matrix(y_true, y_pred),
        }

    reliable_metrics = calculate_metrics(reliable_y_test, reliable_predictions)
    non_reliable_metrics = calculate_metrics(non_reliable_y_test, non_reliable_predictions) if non_reliable_y_test.size else "No non-reliable predictions to evaluate."

    return reliable_metrics, non_reliable_metrics


# Usage:
# Make sure model, x_train, y_train, x_calibration, y_calibration, x_test, and y_test are defined.
optimizer = ConformalPredictionOptimizer(model)
best_intervals, best_loss = optimizer.optimize(x_train, y_train, x_calibration, y_calibration, x_test, lower_bound=(0, 0), upper_bound=(1, 1))

# Evaluate intervals
reliable_metrics, non_reliable_metrics = evaluate_intervals(best_intervals, y_test, model.predict(x_test))

# Printing the Results:
print(f"Best Loss Function: {best_loss}")
print("Performance Metrics for Reliable Predictions:")
print(reliable_metrics)
print("\nPerformance Metrics for Non-Reliable Predictions:")
print(non_reliable_metrics)
