In [5]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif, RFE
from sklearn.linear_model import LogisticRegression, LassoCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np

class FeatureSelector(BaseEstimator, TransformerMixin):
    """
    Wrapper for sklearn feature selection to integrate into pipeline.
    Supports 'kbest', 'rfe', or 'none'.
    """
    def __init__(self, method='kbest', k=20, estimator=None, score_func='f_classif'):
        self.method = method
        self.k = k
        self.estimator = estimator
        self.score_func = score_func
        self.selector_ = None

    def fit(self, X, y):
        if self.method == 'kbest':
            func = f_classif if self.score_func == 'f_classif' else mutual_info_classif
            self.selector_ = SelectKBest(score_func=func, k=self.k)
        elif self.method == 'rfe':
            base_est = self.estimator if self.estimator is not None else LogisticRegression(max_iter=1000)
            self.selector_ = RFE(estimator=base_est, n_features_to_select=self.k)
        elif self.method == 'none':
            self.selector_ = None
            return self
        else:
            raise ValueError("Unknown feature selection method.")
        self.selector_.fit(X, y)
        return self

    def transform(self, X):
        if self.selector_ is None:
            return X
        return self.selector_.transform(X)

    def get_support(self):
        if self.selector_ is None:
            return np.arange(X.shape[1])
        return self.selector_.get_support()

def build_model_pipeline(
    feature_selection_method='kbest',
    k_features=20,
    selector_score_func='f_classif',
    classifier='logreg',
    random_state=42
):
    """
    Returns a ready-to-use sklearn Pipeline with feature selection and classifier.
    """
    if classifier == 'logreg':
        clf = LogisticRegression(max_iter=1000, random_state=random_state)
    elif classifier == 'rf':
        clf = RandomForestClassifier(random_state=random_state)
    elif classifier == 'svm':
        clf = SVC(probability=True, random_state=random_state)
    elif classifier == 'lasso':
        clf = LassoCV(cv=5, random_state=random_state)
    else:
        raise ValueError("Unknown classifier")

    pipeline = Pipeline([
        ('scaler', StandardScaler()),  # Always normalize before selection and modeling
        ('feature_selection', FeatureSelector(
            method=feature_selection_method,
            k=k_features,
            estimator=clf if feature_selection_method == 'rfe' else None,
            score_func=selector_score_func)),
        ('classifier', clf)
    ])
    return pipeline

def run_grid_search(
    X, y,
    pipeline,
    param_grid,
    cv_folds=5,
    n_jobs=-1,
    scoring='accuracy'
):
    """
    Runs a GridSearchCV with the provided pipeline and param grid.
    Returns the fitted GridSearchCV object.
    """
    cv = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=42)
    grid = GridSearchCV(pipeline, param_grid, cv=cv, n_jobs=n_jobs, scoring=scoring, refit=True)
    grid.fit(X, y)
    return grid

# Example usage:
# pipeline = build_model_pipeline(
#     feature_selection_method='kbest',  # or 'rfe', 'none'
#     k_features=30,
#     selector_score_func='f_classif',  # or 'mutual_info_classif'
#     classifier='rf'  # 'logreg', 'rf', 'svm', 'lasso'
# )
#
# param_grid = {
#     'feature_selection__k': [10, 20, 30, 50],
#     'classifier__n_estimators': [100, 200]  # If using 'rf'
# }
#
# best_model = run_grid_search(X, y, pipeline, param_grid)
# y_pred = best_model.predict(X_test)

In [6]:
import numpy as np
import pandas as pd

def get_feature_importance(model, feature_names=None):
    """
    Extracts feature importance or coefficients from a fitted scikit-learn pipeline.

    Args:
        model: Fitted scikit-learn Pipeline (with feature selection and classifier).
        feature_names: List of original feature names (optional). If not provided, uses generic names.

    Returns:
        importance_df: pandas DataFrame with feature names and importance values, sorted descending.
    """
    # Get support mask after feature selection
    selector = model.named_steps['feature_selection']
    X_support = None
    if hasattr(selector, 'get_support'):
        X_support = selector.get_support()
    if feature_names is not None and X_support is not None:
        selected_names = np.array(feature_names)[X_support]
    elif X_support is not None:
        selected_names = [f'f{i}' for i in range(np.sum(X_support))]
    else:
        selected_names = feature_names if feature_names is not None else [f'f{i}' for i in range(model.named_steps['scaler'].n_features_in_)]

    clf = model.named_steps['classifier']
    if hasattr(clf, 'feature_importances_'):
        importances = clf.feature_importances_
    elif hasattr(clf, 'coef_'):
        importances = np.abs(clf.coef_).ravel()
    else:
        raise AttributeError("Classifier does not provide feature importance or coefficients.")

    importance_df = pd.DataFrame({
        "feature": selected_names,
        "importance": importances
    }).sort_values("importance", ascending=False).reset_index(drop=True)
    return importance_df

def batch_predict(model, X, batch_size=256, proba=False):
    """
    Makes predictions or predicts probabilities in batches for large datasets.

    Args:
        model: Trained scikit-learn pipeline.
        X: Feature matrix (np.ndarray or pd.DataFrame).
        batch_size: Number of samples per batch.
        proba: If True, returns class probabilities; else, class predictions.

    Returns:
        np.ndarray: Predictions or probabilities for all samples.
    """
    n_samples = X.shape[0]
    results = []
    for start in range(0, n_samples, batch_size):
        end = min(start + batch_size, n_samples)
        X_batch = X[start:end]
        if proba and hasattr(model, "predict_proba"):
            batch_result = model.predict_proba(X_batch)
        else:
            batch_result = model.predict(X_batch)
        results.append(batch_result)
    if proba:
        return np.vstack(results)
    return np.concatenate(results)

In [7]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import (
    confusion_matrix,
    classification_report,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
)

def compute_metrics(y_true, y_pred, labels=None, average='weighted'):
    """
    Computes classification metrics and confusion matrix.

    Args:
        y_true: Ground truth labels (array-like).
        y_pred: Predicted labels (array-like).
        labels: Optional list of label names (for consistent confusion matrix display).
        average: Averaging method for multi-class scores ('weighted', 'macro', etc.).

    Returns:
        metrics_dict: Dictionary containing accuracy, precision, recall, f1, classification report, and confusion matrix.
    """
    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, average=average, zero_division=0)
    rec = recall_score(y_true, y_pred, average=average, zero_division=0)
    f1 = f1_score(y_true, y_pred, average=average, zero_division=0)
    cls_report = classification_report(y_true, y_pred, zero_division=0, output_dict=True)
    cm = confusion_matrix(y_true, y_pred, labels=labels)

    metrics_dict = {
        "accuracy": acc,
        "precision": prec,
        "recall": rec,
        "f1_score": f1,
        "classification_report": cls_report,
        "confusion_matrix": cm,
    }
    return metrics_dict

def print_metrics(metrics_dict, label_names=None):
    """
    Pretty-prints the classification metrics.

    Args:
        metrics_dict: Output from compute_metrics.
        label_names: Optional label names for confusion matrix rows/columns.
    """
    print("Accuracy:", metrics_dict["accuracy"])
    print("Precision:", metrics_dict["precision"])
    print("Recall:", metrics_dict["recall"])
    print("F1 Score:", metrics_dict["f1_score"])
    print("\nClassification Report:")
    report = metrics_dict["classification_report"]
    if isinstance(report, dict):
        print(pd.DataFrame(report).transpose())
    else:
        print(report)
    print("\nConfusion Matrix:")
    cm = metrics_dict["confusion_matrix"]
    if label_names is not None:
        df_cm = pd.DataFrame(cm, index=label_names, columns=label_names)
        print(df_cm)
    else:
        print(cm)

def plot_confusion_matrix(metrics_dict, label_names=None, normalize=False, cmap='Blues', title='Confusion Matrix'):
    """
    Plots the confusion matrix using matplotlib.

    Args:
        metrics_dict: Output from compute_metrics.
        label_names: List of class labels (for axis ticks).
        normalize: If True, shows percentages instead of counts.
        cmap: Colormap for plot.
        title: Plot title.
    """
    cm = metrics_dict["confusion_matrix"]
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        fmt = '.2f'
    else:
        fmt = 'd'
    fig, ax = plt.subplots(figsize=(6, 5))
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    if label_names is not None:
        ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]),
               xticklabels=label_names, yticklabels=label_names, title=title, ylabel='True label', xlabel='Predicted label')
        plt.xticks(rotation=45)
    else:
        ax.set(title=title, ylabel='True label', xlabel='Predicted label')
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt), ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    plt.tight_layout()
    plt.show()

# Example usage:
# y_true = ... # ground truth labels
# y_pred = ... # predicted labels
# metrics = compute_metrics(y_true, y_pred, labels=[0,1,2])
# print_metrics(metrics, label_names=['No pain', 'Mild pain', 'Severe pain'])
# plot_confusion_matrix(metrics, label_names=['No pain', 'Mild pain', 'Severe pain'], normalize=True)