In [3]:
import numpy as np
from scipy.linalg import solve
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import accuracy_score
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.metrics.pairwise import rbf_kernel
from scipy.sparse.linalg import cg
from sklearn.metrics import classification_report

In [5]:
class REWKLR(BaseEstimator, ClassifierMixin):
    """
    Rare Event Weighted Kernel Logistic Regression (RE-WKLR) implementation.
    
    Parameters
    ----------
    sigma : float, default=1.0
        Kernel bandwidth parameter for RBF kernel.
    lambda_ : float, default=0.1
        Regularization parameter.
    w1 : float, default=None
        Weight for positive class (rare events). If None, will be calculated from data.
    w0 : float, default=None
        Weight for negative class. If None, will be calculated from data.
    max_iter : int, default=30
        Maximum number of IRLS iterations.
    max_cg_iter : int, default=200
        Maximum number of conjugate gradient iterations.
    tol : float, default=1e-4
        Tolerance for stopping criterion.
    """
    def __init__(self, sigma, lambda_, w1=None, w0=None, 
                 max_iter=30, max_cg_iter=200, tol1=2.5, tol2=0.005):
        self.sigma = sigma
        self.lambda_ = lambda_
        self.w1 = w1
        self.w0 = w0
        self.max_iter = max_iter
        self.max_cg_iter = max_cg_iter
        self.tol1 = tol1
        self.tol2 = tol2
    
    # def _rbf_kernel(self, X1, X2):
    #     """Compute RBF kernel matrix between X1 and X2."""
    #     sq_dist = np.sum(X1**2, axis=1).reshape(-1, 1) + \
    #               np.sum(X2**2, axis=1) - 2 * np.dot(X1, X2.T)
    #     return np.exp(-0.5 * sq_dist / self.sigma**2)
    
    def _logistic(x):
        return 1 / (1 + np.exp(-x))
    
    def _loglikelihood(self, K, alpha, y, w):
        eta = K @ alpha
        reg = (self.lambda_ / 2) * alpha.T @ K @ alpha
        ll = 0
        for i in range(len(y)):
            num = np.exp(y[i] * eta[i])
            denom = 1 + np.exp(eta[i])
            ll += w[i] * np.log(num / denom)
        return ll - reg
    
    
    def fit(self, X, y, verbose=False):
        """Fit the RE-WKLR model to the training data."""
        X, y = check_X_y(X, y)
        self.classes_ = np.unique(y)
        n_samples, n_features = X.shape
        
        # Calculate weights if not provided
        if self.w1 is None or self.w0 is None:
            tau = 0.6 # Population proportion 
            y_bar = np.mean(y)  # Sample proportion
            self.w1_ = tau / y_bar if y_bar > 0 else 1.0
            self.w0_ = (1 - tau) / (1 - y_bar) if y_bar < 1 else 1.0
        else:
            self.w1_ = self.w1
            self.w0_ = self.w0
        
        # Compute kernel matrix
        self.X_train_ = X
        K = rbf_kernel(X, X)
        
        # Add small constant to diagonal for numerical stability
        K_hat = K + 1e-8 * np.eye(n_samples)
        
        # Initialize parameters
        alphainit = np.zeros(n_samples)
        p = self.logistic(K @ alphainit)
        delta = 5.0
        i = 0
        LLW = 0.0
        alpha = np.zeros_like(alphainit)
        bias = np.zeros_like(alphainit)
        w = np.where(y == 1, self.w1_, self.w0_)
        
        
        # IRLS iterations
        while delta > self.tol1 and i < self.max_iter:
            # Compute probabilities
            eta = K @ alphainit
            # reg = (self.lambda_ / 2) * alpha.T @ K @ alpha
            # p = 1 / (1 + np.exp(-eta))
            
            # Compute weights and adjusted response
            v = p * (1 - p)
            # w = np.where(y == 1, self.w1_, self.w0_)
            
            z = eta + (y - p) / v
            
            # Compute Q matrix diagonal elements
            Q_diag = 1 / (v * w)
            
            # Compute bias correction terms
            xi = 0.5 * Q_diag * ((1 + self.w1_) * p - self.w1_)
            
            D = np.diag(v * w)
            
            # Solve for alpha using CG
            A = K_hat.T @ D @ K_hat + self.lambda_ * K_hat
            b_alpha = K_hat.T @ D @ z
            b_bias = K_hat.T @ D @ xi
            
            alpha = cg(A, b_alpha, x0=alpha, maxiter=self.max_cg_iter, rtol=self.tol2)[0]
            
            # alpha = self._conjugate_gradient(A, b_alpha, alpha, 
            #                                self.max_cg_iter, self.tol2)
            
            
            
            # # Solve for bias using CG
            
            bias = cg(A, b_bias, x0=bias, maxiter=self.max_cg_iter, rtol=self.tol2)[0]
            # bias = self._conjugate_gradient(A, b_bias, np.zeros_like(alpha), 
            #                               self.max_cg_iter, self.tol2)
            
        
            
            # Given current alpha, compute the new eta
            eta = K @ alpha
            p = self.logistic(eta)
            alphainit = alpha.copy()
            
            LLOld = LLW
            LLW = -2 * self._loglikelihood(K, alpha, y, w)
            delta = np.abs((LLOld - LLW) / LLW + 1e-8)
            i += 1
            
        if verbose:
            if delta < self.tol1:
                print(f"Converged after {i} iterations.")
            else:
                print(f"Max iterations reached: {self.max_iter}.")
                
        
        unbiased_alpha = alpha - bias
        probunbiased = self.logistic(K @ unbiased_alpha)
        self.alpha_ = unbiased_alpha    
        self.prob_ = probunbiased
        
        return self
    
    def predict_prob(self, X):
        """Predict probabilities for the input data."""
        check_is_fitted(self)
        X = check_array(X)
        
        K_test = rbf_kernel(X, self.X_train_)
        prob = self.logistic(K_test @ self.alpha_)
        
        return prob
    
    def predict(self, X):
        """Predict class labels for the input data."""
        prob = self.predict_prob(X)
        return (prob >= 0.5).astype(int)
    
    