# Confidence-Based Classification Model 

This notebook implements a customized machine learning pipeline to predict a binary target using XGBoost with a custom loss function and a custom scoring metric. It includes:

- Data loading and preprocessing
- Definition of a shifted quadratic loss function tailored for the problem
- A custom scoring function that prioritizes confident true negatives and true positives
- A custom XGBoost wrapper to integrate the custom loss into training
- Splitting of data into train, validation, and test sets
- Hyperparameter tuning via GridSearchCV on the training+validation data
- Model evaluation on the test set with multiple metrics:
    - Accuracy, classification report, confusion matrix
    - AUROC and AUPRC
    - Precision-Recall curve analysis with threshold tuning
    - Analysis of Positive Predictive Value (PPV) and Negative Predictive Value (NPV) across thresholds
- Visualization of prediction performance and threshold effects

## 1. Imports and Setup

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
import xgboost as xgb
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix, roc_auc_score, average_precision_score, precision_recall_curve
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt
import seaborn as sns

import warnings
warnings.filterwarnings("ignore", category=FutureWarning, module="sklearn.pipeline")

## 2. Load Data

In [None]:
data = pd.read_csv('your_dataset.csv') # Replace 'your_dataset.csv' with your actual file path
data = pd.DataFrame(data) # Optional if data is already a DataFrame

## 3. Custom Shifted Quadratic Loss Function

In [None]:
def shifted_quadratic_loss(preds, dtrain, m=2.0, v=0.3, b=0.1):
    y = dtrain.get_label()
    p = 1 / (1 + np.exp(-preds))  # Sigmoid
    
    # Shifted quadratic weight
    weight = m * ((p - v) ** 2) + b
    d_weight_dp = 2 * m * (p - v)
    
    grad = weight * (p - y)
    hess = weight * p * (1 - p) + d_weight_dp * (p - y)
    
    return grad, hess

## 4. Custom Scoring Function

In [None]:
# --- Custom Goal Function: prioritize confident true negatives and positives ---
def compute_custom_goal(probas, y_actual):
    confident_tn = (probas <= 0.05) & (y_actual == 0) # Confident True Negatives: at least 95% confidence level
    confident_tp = (probas >= 0.80) & (y_actual == 1) # Confident True Positives: at least 80% confidence level
    return np.sum(confident_tn | confident_tp)

# --- Scoring Function for GridSearchCV ---
def n95_minus_scorer(estimator, X_val, y_val):
    probas = estimator.predict_proba(X_val)[:, 1]
    return compute_custom_goal(probas, y_val)

## 5. Custom XGBoost Wrapper

In [None]:
class XGBCustomWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self, m=2.0, v=0.3, b=0.1, learning_rate=0.1, max_depth=3,
                 min_child_weight=1, subsample=1.0, colsample_bytree=1.0,
                 scale_pos_weight=1.0, num_boost_round=100):
        self.m = m
        self.v = v
        self.b = b
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_child_weight = min_child_weight
        self.subsample = subsample
        self.colsample_bytree = colsample_bytree
        self.scale_pos_weight = scale_pos_weight
        self.num_boost_round = num_boost_round
        self.booster = None

    def fit(self, X, y):
        dtrain = xgb.DMatrix(X, label=y)
        self.booster = xgb.train(
            {
                'objective': 'binary:logistic',
                'disable_default_eval_metric': 1,
                'learning_rate': self.learning_rate,
                'max_depth': self.max_depth,
                'min_child_weight': self.min_child_weight,
                'subsample': self.subsample,
                'colsample_bytree': self.colsample_bytree,
                'scale_pos_weight': self.scale_pos_weight,
            },
            dtrain,
            num_boost_round=self.num_boost_round,
            obj=lambda preds, dtrain: shifted_quadratic_loss(preds, dtrain, self.m, self.v, self.b)
        )
        return self

    def predict_proba(self, X):
        dmatrix = xgb.DMatrix(X)
        probas = self.booster.predict(dmatrix)
        return np.vstack([1 - probas, probas]).T

    def predict(self, X):
        return (self.predict_proba(X)[:, 1] >= 0.5).astype(int)

## 6. Data Splitting

- **Training set**: Used to fit the model and learn from the data. The model adjusts its parameters based on this data.
- **Validation set**: Used during model development to tune hyperparameters and select the best model. It acts as an independent check to prevent overfitting on the training data.
- **Test set**: Completely held out until the very end, used for the final unbiased evaluation of the model’s performance on unseen data.


1. Split the data into train + validation (80%) and test (20%).
2. Further split train + validation into training (60%) and validation (20%).
3. Perform hyperparameter tuning (GridSearchCV) on the training set.
4. Evaluate and select models based on performance on the validation set.
5. After selecting the best model, test its final performance on the test set to get an unbiased estimate.

In [None]:
# --- Data Preprocessing ---
X = data.drop('target_column', axis=1) # Replace 'target_column' with the name of your target variable
y = data['target_column'] # Replace accordingly

# First split: hold out test set (e.g., 20%)
X_train_val, X_test, y_train_val, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Second split: split remaining data into train and validation (e.g., 25% of 80% = 20% of total)
X_train, X_val, y_train, y_val = train_test_split(
    X_train_val, y_train_val, test_size=0.25, random_state=42
)

## 7. Pipeline and Hyperparameter Grid

In [None]:
pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value=0)),
    ('scaler', StandardScaler()),
    ('xgb', XGBCustomWrapper())
])

param_grid = {
    'xgb__m': [5.0, 6.0, 4.0],
    'xgb__v': [0.35, 0.1, 0.15],
    'xgb__b': [0.05, 0.03],
    'xgb__learning_rate': [0.05, 0.1],
    'xgb__max_depth': [5, 10],
    'xgb__min_child_weight': [1, 3],
    'xgb__subsample': [0.8, 1.0],
    'xgb__colsample_bytree': [0.8, 1.0],
    'xgb__scale_pos_weight': [1, 2],
    'xgb__num_boost_round': [100, 200, 300, 400]
}

## 8. Grid Search for Hyperparameter Tuning

In [None]:
gscv = GridSearchCV(
    pipeline,
    param_grid,
    scoring=n95_minus_scorer,
    cv=3,
    verbose=2,
    n_jobs=-1
)

# Fit on training + validation set
gscv.fit(X_train_val, y_train_val)

## 9. Model Evaluation on Test Set

In [None]:
print("Best Parameters Found:")
print(gscv.best_params_)

best_model = gscv.best_estimator_

# Get predictions
y_test_pred = best_model.predict(X_test)

# Evaluate on testing set
print("Testing Accuracy:", accuracy_score(y_test, y_test_pred))
print("Testing Classification Report:")
print(classification_report(y_test, y_test_pred))

# Confusion matrix
print("Confusion Matrix:\n", confusion_matrix(y_test, y_test_pred))

## 10. Advanced Metrics and Threshold Analysis

In [None]:
# Get predicted probabilities for the positive class
test_probas = best_model.predict_proba(X_test)[:, 1]

# 1. AUROC
auroc = roc_auc_score(y_test, test_probas)
print(f"AUROC: {auroc:.4f}")

# 2. AUPRC
auprc = average_precision_score(y_test, test_probas)
print(f"AUPRC: {auprc:.4f}")

# 3. Precision-Recall Curve for max PPV
precision, recall, thresholds = precision_recall_curve(y_test, test_probas)

# Remove the last precision and recall point
precision = precision[:-1]
recall = recall[:-1]

# Find threshold with maximum precision (PPV)
max_ppv_idx = np.argmax(precision)
max_ppv = precision[max_ppv_idx]
ppv_threshold = thresholds[max_ppv_idx]

# % of predictions made at or above this threshold
high_conf_preds = test_probas >= ppv_threshold
percent_classified_at_max_ppv = 100 * np.mean(high_conf_preds)

print(f"Maximum PPV: {max_ppv:.4f} at threshold {ppv_threshold:.4f}")
print(f"Percentage of test observations classified at max PPV threshold: {percent_classified_at_max_ppv:.2f}%")

# 4. Compute NPV at various thresholds
npv_thresholds = np.arange(0.05, 1.00, 0.05)
npv_values = []
percent_classified_at_npv = []

for thresh in npv_thresholds:
    preds = (test_probas < thresh).astype(int)  # Predict negative if below threshold
    tn = np.sum((preds == 1) & (y_test == 0))
    fn = np.sum((preds == 1) & (y_test == 1))
    if (tn + fn) > 0:
        npv = tn / (tn + fn)
        pct = 100 * (tn + fn) / len(y_test)
    else:
        npv = 0
        pct = 0
    npv_values.append(npv)
    percent_classified_at_npv.append(pct)

# Find threshold with maximum NPV
max_npv_idx = np.argmax(npv_values)
max_npv = npv_values[max_npv_idx]
npv_threshold = npv_thresholds[max_npv_idx]
percent_classified_at_max_npv = percent_classified_at_npv[max_npv_idx]

print(f"Maximum NPV: {max_npv:.4f} at threshold {npv_threshold:.2f}")
print(f"Percentage of test observations classified at max NPV threshold: {percent_classified_at_max_npv:.2f}%")

## 11. Visualization - Threshold Analysis

In [None]:
thresholds = np.arange(0.05, 1.0, 0.05)
tp_perc, tn_perc, fp_perc, fn_perc = [], [], [], []
ppv, npv = [], []
perc_pred_1, perc_pred_0 = [], []

total = len(y_test)
probas_positive = best_model.predict_proba(X_test)[:, 1]

for i in thresholds:
    preds = (probas_positive >= i).astype(int)

    TP = np.sum((preds == 1) & (y_test == 1))
    TN = np.sum((preds == 0) & (y_test == 0))
    FP = np.sum((preds == 1) & (y_test == 0))
    FN = np.sum((preds == 0) & (y_test == 1))

    tp_perc.append(TP / total * 100)
    tn_perc.append(TN / total * 100)
    fp_perc.append(FP / total * 100)
    fn_perc.append(FN / total * 100)

    ppv.append(TP / (TP + FP) if (TP + FP) > 0 else np.nan)
    npv.append(TN / (TN + FN) if (TN + FN) > 0 else np.nan)

    perc_pred_1.append((TP + FP) / total * 100)
    perc_pred_0.append((TN + FN) / total * 100)

results_df = pd.DataFrame({
    "Threshold": thresholds,
    "TP %": tp_perc,
    "FP %": fp_perc,
    "TN %": tn_perc,
    "FN %": fn_perc,
    "PPV (Precision)": ppv,
    "NPV": npv,
    "Predicted 1 %": perc_pred_1,
    "Predicted 0 %": perc_pred_0
})

# Stacked bar plot
plt.figure(figsize=(12, 7))
bar_tp = plt.bar(thresholds, tp_perc, width=0.03, label='True Positives')
bar_fp = plt.bar(thresholds, fp_perc, width=0.03, bottom=np.array(tp_perc), label='False Positives')
bar_tn = plt.bar(thresholds, tn_perc, width=0.03, bottom=np.array(tp_perc) + np.array(fp_perc), label='True Negatives')
bar_fn = plt.bar(thresholds, fn_perc, width=0.03, bottom=np.array(tp_perc) + np.array(fp_perc) + np.array(tn_perc), label='False Negatives')

# Annotate bars
for bar, data, offset in zip([bar_tp, bar_fp, bar_tn, bar_fn], 
                             [tp_perc, fp_perc, tn_perc, fn_perc],
                             [np.zeros(len(tp_perc)), 
                              np.array(tp_perc), 
                              np.array(tp_perc) + np.array(fp_perc), 
                              np.array(tp_perc) + np.array(fp_perc) + np.array(tn_perc)]):
    for rect, perc, base in zip(bar, data, offset):
        if perc > 0:
            height = rect.get_height()
            plt.text(rect.get_x() + rect.get_width() / 2, base + height / 2,
                     f'{perc:.1f}%', ha='center', va='center', fontsize=9, color='black')

plt.xlabel('Threshold')
plt.ylabel('Percentage (%)')
plt.title('Threshold Analysis: TP, FP, TN, FN by Confidence Threshold')
plt.legend()
plt.xticks(thresholds, rotation=45)
plt.tight_layout()
plt.show()


In [None]:
pos_conf = []  # PPV
neg_conf = []  # NPV
frac_pos = []
frac_neg = []


for i in thresholds:
    preds = (probas_positive >= i).astype(int)

    TP = np.sum((preds == 1) & (y_test == 1))
    TN = np.sum((preds == 0) & (y_test == 0))
    FP = np.sum((preds == 1) & (y_test == 0))
    FN = np.sum((preds == 0) & (y_test == 1))

    ppv = TP / (TP + FP) if (TP + FP) > 0 else np.nan
    npv = TN / (TN + FN) if (TN + FN) > 0 else np.nan

    frac_pred_pos = np.mean(preds == 1)
    frac_pred_neg = np.mean(preds == 0)

    pos_conf.append(ppv)
    neg_conf.append(npv)
    frac_pos.append(frac_pred_pos)
    frac_neg.append(frac_pred_neg)

# Plotting
plt.figure(figsize=(12, 7))

plt.plot(thresholds, pos_conf, marker='x', linestyle='-', color='blue', label='Positive Precision (PPV)')
plt.plot(thresholds, neg_conf, marker='x', linestyle='-', color='green', label='Negative Precision (NPV)')
plt.plot(thresholds, frac_pos, linestyle='--', marker='^', color='blue', label='Fraction Predicted Positive')
plt.plot(thresholds, frac_neg, linestyle='--', marker='v', color='green', label='Fraction Predicted Negative')

plt.axhline(0.80, color='black', linestyle='--')
plt.axhline(0.90, color='black', linestyle='--')

for i, t in enumerate(thresholds):
    if not np.isnan(pos_conf[i]) and pos_conf[i] >= 0.80:
        plt.annotate(f'{pos_conf[i]:.2f}', (t, pos_conf[i]), textcoords="offset points", xytext=(0,10), ha='center', color='blue')
    if not np.isnan(neg_conf[i]) and neg_conf[i] >= 0.80:
        plt.annotate(f'{neg_conf[i]:.2f}', (t, neg_conf[i]), textcoords="offset points", xytext=(0,-15), ha='center', color='green')

plt.xlabel('Threshold')
plt.ylabel('Metric Value')
plt.title('Confidence vs Threshold (PPV, NPV)')
plt.legend()
plt.grid(True)
plt.xticks(thresholds, rotation=45)
plt.tight_layout()
plt.show()


In [None]:
pos_conf = []  # PPV
neg_conf = []  # NPV
frac_pos = []
frac_neg = []
actual_pos_frac = []

for i in thresholds:
    preds = (probas_positive >= i).astype(int)

    TP = np.sum((preds == 1) & (y_test == 1))
    TN = np.sum((preds == 0) & (y_test == 0))
    FP = np.sum((preds == 1) & (y_test == 0))
    FN = np.sum((preds == 0) & (y_test == 1))

    ppv = TP / (TP + FP) if (TP + FP) > 0 else np.nan
    npv = TN / (TN + FN) if (TN + FN) > 0 else np.nan

    frac_pred_pos = np.mean(preds == 1)
    frac_pred_neg = np.mean(preds == 0)

    pos_conf.append(ppv)
    neg_conf.append(npv)
    frac_pos.append(frac_pred_pos)
    frac_neg.append(frac_pred_neg)
    act_pos_frac = TP / np.sum(preds == 1) if np.sum(preds == 1) > 0 else np.nan
    actual_pos_frac.append(act_pos_frac)

# ------------------ POSITIVE METRICS PLOT ------------------
plt.figure(figsize=(10, 6))
plt.plot(thresholds, pos_conf, marker='x', linestyle='-', color='blue', label='Positive Precision (PPV)')
plt.plot(thresholds, frac_pos, linestyle='--', color='blue', label='Fraction Predicted Positive')
plt.axhline(0.80, color='black', linestyle='--', label='Target Confidence: 0.80')

for i, t in enumerate(thresholds):
    if not np.isnan(pos_conf[i]) and pos_conf[i] >= 0.80:
        plt.annotate(f'{pos_conf[i]:.2f}', (t, pos_conf[i]), textcoords="offset points", xytext=(0,10), ha='center', color='blue')

plt.xlabel('Threshold')
plt.ylabel('Metric Value')
plt.title('Positive Prediction Confidence vs Threshold')
plt.legend()
plt.grid(True)
plt.xticks(thresholds, rotation=45)
plt.tight_layout()
plt.show()

# ------------------ NEGATIVE METRICS PLOT ------------------
plt.figure(figsize=(10, 6))
plt.plot(thresholds, neg_conf, marker='x', linestyle='-', color='green', label='Negative Precision (NPV)')
plt.plot(thresholds, frac_neg, linestyle='--', color='green', label='Fraction Predicted Negative')
plt.axhline(0.90, color='black', linestyle='--', label='Target Confidence: 0.90')

for i, t in enumerate(thresholds):
    if not np.isnan(neg_conf[i]) and neg_conf[i] >= 0.90:
        plt.annotate(f'{neg_conf[i]:.2f}', (t, neg_conf[i]), textcoords="offset points", xytext=(0,10), ha='center', color='green')

plt.xlabel('Threshold')
plt.ylabel('Metric Value')
plt.title('Negative Prediction Confidence vs Threshold')
plt.legend()
plt.grid(True)
plt.xticks(thresholds, rotation=45)
plt.tight_layout()
plt.show()
