# MCAW-KNN: Multi-Class Adaptive Weighted K-Nearest Neighbors

This notebook implements the MCAW-KNN algorithm as described in the accompanying paper. The algorithm combines:

1. **Smooth Local Region Building** - Uses Bayesian smoothing, Wilson score intervals, and Gaussian kernels to construct representative local regions
2. **Global Weight Matrix Computation** - Calculates class-specific feature weights using methods like LDA, F-score, or centroid-based approaches
3. **Binary Class Weight Correction** - Corrects weight vectors using generalized Rayleigh quotient manifold constraints
4. **Weighted KNN Classification** - Performs distance-weighted voting for final classification

**Author:** Meimei.Huang  
**Date:** January 2026

## 1. Import Required Libraries

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import RobustScaler, MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import mutual_info_classif
from sklearn.cluster import KMeans
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from collections import Counter
from sklearn.base import BaseEstimator
from scipy.linalg import logm, fractional_matrix_power, eigh
from scipy.optimize import minimize
from sklearn.covariance import LedoitWolf
from sklearn.utils.validation import check_X_y
from scipy import stats
import random
import warnings
warnings.filterwarnings('ignore')

## 2. Smooth Local Region Builder

This class constructs local regions around a target point using:
- **Bayesian Smoothing**: Handles small sample sizes with prior information
- **Wilson Score Interval**: Provides conservative confidence estimates
- **Gaussian Kernel**: Adjusts distances based on class representativeness

In [None]:
class SmoothLocalRegionBuilder:
    """
    Local region builder based on Bayesian smoothing, Wilson smoothing, and Gaussian kernel.
    Provides smooth class representativeness evaluation and distance adjustment.
    
    Parameters
    ----------
    k_region : int
        Candidate set size for each class
    region_size : int
        Target local region size
    alpha : float
        Bayesian prior parameter α
    beta : float
        Bayesian prior parameter β
    confidence_level : float
        Wilson interval confidence level
    sigma : float
        Gaussian kernel bandwidth parameter
    prior_strength : int
        Prior strength for Bayesian estimation
    """
    
    def __init__(self, k_region=15, region_size=15, alpha=1.0, beta=1.0, 
                 confidence_level=0.95, sigma=1.0, prior_strength=5):
        self.k_region = k_region
        self.region_size = region_size
        self.alpha = alpha
        self.beta = beta
        self.confidence_level = confidence_level
        self.sigma = sigma
        self.prior_strength = prior_strength
        
        # Storage for intermediate computation results
        self.sample_results = {}  # Detailed results for each sample
        self.class_tightness = {}  # Class tightness values
        self.class_candidates = {}  # Candidate sets per class
    
    def fit(self, X, y, row_ids):
        """Fit the region builder with training data."""
        self.X = X
        self.y = y
        self.row_ids = row_ids
        return self
    
    def compute_class_tightness(self, class_label, indices):
        """Compute class tightness (inverse of average distance to centroid)."""
        class_indices = indices[self.y[indices] == class_label]
        if len(class_indices) == 0:
            return 1.0
        
        class_samples = self.X[class_indices]
        if len(class_samples) <= 1:
            return 1.0
        
        centroid = np.mean(class_samples, axis=0)
        avg_distance = np.mean(np.linalg.norm(class_samples - centroid, axis=1))
        tightness = 1.0 / (avg_distance + 1e-8)
        return tightness
    
    def compute_backward_rank(self, target_point, sample_point, candidate_points):
        """Compute backward rank (rank of target from sample's perspective)."""
        if len(candidate_points) == 0:
            return 1
        
        distances_to_sample = [np.linalg.norm(candidate - sample_point) 
                              for candidate in candidate_points]
        target_to_sample_dist = np.linalg.norm(target_point - sample_point)
        all_distances = distances_to_sample + [target_to_sample_dist]
        sorted_indices = np.argsort(all_distances)
        rank = np.where(sorted_indices == len(all_distances) - 1)[0][0] + 1
        return rank
    
    def bayesian_smoothing(self, k, n, alpha=None, beta=None):
        """
        Compute Bayesian smoothed probability p_bayesian.
        
        Parameters
        ----------
        k : int
            Number of successes (e.g., count of high-ranked samples)
        n : int
            Total number of trials
        
        Returns
        -------
        float
            Bayesian smoothed probability
        """
        if alpha is None:
            alpha = self.alpha
        if beta is None:
            beta = self.beta
        
        if n == 0:
            return 0.5  # Default prior
        
        p_bayesian = (k + alpha) / (n + alpha + beta)
        return p_bayesian
    
    def wilson_score_interval(self, p, n, z_score=None):
        """
        Compute Wilson score interval lower bound p_wilson.
        
        This provides a conservative estimate of the true proportion.
        """
        if n == 0:
            return p
        
        if z_score is None:
            z_score = stats.norm.ppf(1 - (1 - self.confidence_level) / 2)
        
        denominator = 1 + z_score**2 / n
        centre = (p + z_score**2 / (2 * n)) / denominator
        width = (z_score * np.sqrt((p * (1 - p)) / n + z_score**2 / (4 * n**2))) / denominator
        
        # Use confidence lower bound as conservative estimate
        p_wilson = centre - width
        return max(0, min(1, p_wilson))
    
    def gaussian_kernel_penalty(self, penalty, sigma=None):
        """Compute Gaussian kernel adjustment factor."""
        if sigma is None:
            sigma = self.sigma
        adjustment_factor = np.exp(-0.5 * (penalty / sigma) ** 2)
        return adjustment_factor
    
    def calculate_penalty(self, p_wilson, method='linear'):
        """Calculate penalty term based on p_wilson."""
        if method == 'linear':
            penalty = 1.0 - p_wilson  # Linear penalty
        elif method == 'quadratic':
            penalty = (1.0 - p_wilson) ** 2  # Quadratic penalty
        elif method == 'sigmoid':
            penalty = 1.0 / (1.0 + np.exp(-10 * (p_wilson - 0.5)))  # Sigmoid penalty
        else:
            penalty = 1.0 - p_wilson
        return penalty
    
    def compute_representativeness(self, R, r, N, sample_id):
        """
        Compute sample's class representativeness combining Bayesian smoothing,
        Wilson smoothing, and Gaussian kernel.
        
        Returns adjustment factor and all intermediate results.
        """
        # Step 1: Initial representativeness assessment (based on backward rank)
        initial_representativeness = r  # Normalized backward rank
        
        # Step 2: Bayesian smoothing (handles small sample cases)
        k_bayesian = max(1, int(r * N))  # Success count: high-ranked samples
        n_bayesian = N  # Total trials
        p_bayesian = self.bayesian_smoothing(k_bayesian, n_bayesian)
        
        # Step 3: Wilson smoothing (provides conservative confidence interval estimate)
        p_wilson = self.wilson_score_interval(initial_representativeness, N)
        
        # Step 4: Tightness-weighted adjustment
        tightness_weight = np.tanh(R - 1)  # Positive adjustment when R>1, negative when R<1
        weighted_p_wilson = p_wilson * (1 + 0.3 * tightness_weight)
        weighted_p_wilson = max(0, min(1, weighted_p_wilson))
        
        # Step 5: Calculate penalty term
        penalty = self.calculate_penalty(weighted_p_wilson, method='sigmoid')
        
        # Step 6: Gaussian kernel adjustment factor
        adjustment_factor = self.gaussian_kernel_penalty(penalty)
        
        # Store all intermediate results
        results = {
            'sample_id': sample_id,
            'R': R,
            'r': r,
            'N': N,
            'initial_representativeness': initial_representativeness,
            'p_bayesian': p_bayesian,
            'p_wilson': p_wilson,
            'weighted_p_wilson': weighted_p_wilson,
            'penalty': penalty,
            'adjustment_factor': adjustment_factor,
            'tightness_weight': tightness_weight
        }
        return adjustment_factor, results
    
    def build_local_region(self, target_point, verbose=False):
        """
        Build local region based on smooth representativeness evaluation.
        
        Parameters
        ----------
        target_point : array-like
            The query point for which to build the local region
        verbose : bool
            Whether to print detailed progress information
        
        Returns
        -------
        list
            Indices of samples in the local region
        """
        if verbose:
            print("=== Building Smooth Local Region ===")
        
        # Reset result storage
        self.sample_results = {}
        self.class_tightness = {}
        self.class_candidates = {}
        
        # Get all classes
        unique_classes = np.unique(self.y)
        if verbose:
            print(f"Classes in dataset: {unique_classes}")
        
        # Step 1: Build candidate set for each class and compute tightness
        candidate_indices = []
        for cls in unique_classes:
            # Get all samples of this class
            class_indices = np.where(self.y == cls)[0]
            if len(class_indices) == 0:
                continue
            
            # Compute distances to target point
            distances = [np.linalg.norm(self.X[i] - target_point) for i in class_indices]
            
            # Select k_region nearest samples
            sorted_indices = np.argsort(distances)[:self.k_region]
            cls_candidates = [class_indices[i] for i in sorted_indices]
            
            # Compute class tightness
            tightness = self.compute_class_tightness(cls, class_indices)
            
            self.class_candidates[cls] = cls_candidates
            self.class_tightness[cls] = tightness
            candidate_indices.extend(cls_candidates)
            
            if verbose:
                print(f"Class {cls}: candidate size {len(cls_candidates)}, tightness: {tightness:.4f}")
        
        # Step 2: Compute adjusted distances for each candidate sample
        if verbose:
            print("\n=== Computing Sample Adjusted Distances ===")
        
        all_candidates = list(set(candidate_indices))
        adjusted_distances = []
        
        for candidate_idx in all_candidates:
            cls = self.y[candidate_idx]
            row_id = self.row_ids[candidate_idx]
            
            if cls not in self.class_candidates:
                continue
            
            # Get class context
            cls_candidates = self.class_candidates[cls]
            candidate_points = self.X[cls_candidates]
            sample_point = self.X[candidate_idx]
            
            # Compute original distance
            original_distance = np.linalg.norm(sample_point - target_point)
            
            # Compute backward rank
            backward_rank = self.compute_backward_rank(target_point, sample_point, candidate_points)
            
            # Compute normalized backward rank r
            N = len(cls_candidates)
            if N > 1:
                r = 1 - (backward_rank - 1) / (N - 1)
            else:
                r = 1.0
            
            # Get relative tightness R (current class tightness / average tightness)
            avg_tightness = np.mean(list(self.class_tightness.values()))
            R = self.class_tightness[cls] / (avg_tightness + 1e-8)
            
            # Compute comprehensive representativeness adjustment factor
            adjustment_factor, results = self.compute_representativeness(R, r, N, row_id)
            
            # Compute adjusted distance
            adjusted_distance = original_distance * adjustment_factor
            
            # Store complete results
            results.update({
                'row_id': row_id,
                'class': cls,
                'original_distance': original_distance,
                'adjusted_distance': adjusted_distance,
                'backward_rank': backward_rank,
                'R': R,
                'r': r
            })
            self.sample_results[row_id] = results
            adjusted_distances.append((candidate_idx, adjusted_distance, cls, row_id))
        
        # Step 3: Select samples by class proportion to build local region
        if verbose:
            print("\n=== Building Local Region by Class Proportion ===")
        
        local_region = []
        class_distribution = Counter(self.y)
        total_samples = sum(class_distribution.values())
        
        # Calculate target sample count for each class
        class_targets = {}
        for cls, count in class_distribution.items():
            target_count = max(1, int(self.region_size * count / total_samples))
            class_targets[cls] = min(target_count, len(self.class_candidates.get(cls, [])))
        
        if verbose:
            print(f"Target class distribution: {class_targets}")
        
        # Select samples with smallest adjusted distances for each class
        for cls, target_count in class_targets.items():
            if cls not in self.class_candidates or target_count == 0:
                continue
            
            # Get candidate samples of this class
            cls_candidates = [idx for idx, _, cls_val, row_id in adjusted_distances if cls_val == cls]
            if not cls_candidates:
                continue
            
            # Sort by adjusted distance
            cls_candidates_sorted = sorted(
                cls_candidates,
                key=lambda idx: next(adj_dist for i, adj_dist, c, rid in adjusted_distances 
                                   if i == idx and c == cls)
            )[:target_count]
            local_region.extend(cls_candidates_sorted)
            
            if verbose:
                print(f"Class {cls}: selected {len(cls_candidates_sorted)} samples")
        
        # Step 4: If region size is insufficient, add samples with smallest adjusted distances
        if len(local_region) < self.region_size:
            remaining = self.region_size - len(local_region)
            used_set = set(local_region)
            available_candidates = [idx for idx, _, _, _ in adjusted_distances 
                                  if idx not in used_set]
            if available_candidates:
                available_sorted = sorted(
                    available_candidates,
                    key=lambda idx: next(adj_dist for i, adj_dist, c, rid in adjusted_distances 
                                       if i == idx)
                )[:remaining]
                local_region.extend(available_sorted)
                if verbose:
                    print(f"Added {len(available_sorted)} supplementary samples")
        
        # Final size adjustment
        local_region = local_region[:self.region_size]
        
        return local_region
    
    def get_detailed_results(self, sort_by='p_bayesian'):
        """Get detailed results sorted by specified metric."""
        if not self.sample_results:
            return []
        
        sort_keys = {
            'p_bayesian': lambda x: x[1]['p_bayesian'],
            'p_wilson': lambda x: x[1]['p_wilson'],
            'penalty': lambda x: x[1]['penalty'],
            'adjustment_factor': lambda x: x[1]['adjustment_factor'],
            'adjusted_distance': lambda x: x[1]['adjusted_distance']
        }
        
        reverse = sort_by not in ['penalty', 'adjustment_factor', 'adjusted_distance']
        
        if sort_by in sort_keys:
            return sorted(self.sample_results.items(), key=sort_keys[sort_by], reverse=reverse)
        return list(self.sample_results.items())

## 3. Global Weight Matrix

Computes class-specific feature weights using various methods:
- **LDA (Linear Discriminant Analysis)**: Based on generalized Rayleigh quotient
- **F-Score**: Using ANOVA F-statistics
- **Centroid**: Based on point-to-centroid ratios
- **Inter-class Difference**: Using precision matrix-weighted mean differences

In [None]:
class GlobalWeightMatrix:
    """
    Computes global feature weights for each class.
    
    Parameters
    ----------
    method : str
        Weight computation method: 'lda', 'f_score', 'centroid', 'inter_class_difference'
    """
    
    def __init__(self, method='lda'):
        self.method = method
        self.global_weights = None
        self.class_weight_dict = None  # Class weight dictionary
        self.feature_names = None
        self.class_names = None
        self.scaler = StandardScaler()
    
    def fit(self, X, y=None, feature_names=None, class_names=None):
        """
        Fit the weight calculator on training data.
        
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training feature matrix
        y : array-like of shape (n_samples,), optional
            Class labels
        feature_names : list, optional
            Names of features
        class_names : list, optional
            Names of classes
        """
        X = np.array(X)
        if y is not None:
            y = np.array(y)
        
        self.feature_names = feature_names
        self.class_names = class_names
        X_scaled = self.scaler.fit_transform(X)
        
        if y is None:
            # Unsupervised case
            self.global_weights = self._compute_global_weights(X_scaled)
            self.global_weights = self.global_weights.reshape(1, -1)
            self.class_weight_dict = {'global': self.global_weights[0]}
        else:
            # Supervised case: compute weights for each class
            unique_classes = np.unique(y)
            n_classes = len(unique_classes)
            n_features = X.shape[1]
            
            self.global_weights = np.zeros((n_classes, n_features))
            self.class_weight_dict = {}
            
            for i, class_label in enumerate(unique_classes):
                weight_vector = self._compute_global_weights(X_scaled, y, class_label)
                self.global_weights[i] = weight_vector
                self.class_weight_dict[class_label] = weight_vector
        
        return self
    
    def get_weight_matrix(self):
        """Return the weight matrix."""
        return self.global_weights
    
    def get_class_weight_dict(self):
        """Return the class weight dictionary."""
        return self.class_weight_dict
    
    def get_weight_by_class(self, class_identifier):
        """
        Get weight vector by class identifier.
        Supports class name or original label.
        """
        if self.class_weight_dict is None:
            raise ValueError("Please call fit() method first to train the model")
        
        if class_identifier in self.class_weight_dict:
            return self.class_weight_dict[class_identifier]
        else:
            str_identifier = str(class_identifier)
            if str_identifier in self.class_weight_dict:
                return self.class_weight_dict[str_identifier]
            else:
                print(f"Warning: Class '{class_identifier}' weight not found, using uniform weights")
                n_features = self.global_weights.shape[1] if self.global_weights is not None else 1
                return np.ones(n_features) / n_features
    
    def _compute_global_weights(self, X, y=None, current_class=None):
        """Dispatch to appropriate weight computation method."""
        if y is not None and current_class is not None:
            methods = {
                'inter_class_difference': self._inter_class_difference_weights,
                'f_score': self._f_score_weights,
                'centroid': self._centroid_weights,
                'lda': self._rayleigh_quotient
            }
            method_func = methods.get(self.method, self._inverse_covariance_weights)
            return method_func(X, y, current_class)
        else:
            return self._inverse_covariance_weights(X, y, current_class)
    
    def _rayleigh_quotient(self, X, y, current_class):
        """
        LDA weight computation based on generalized Rayleigh quotient.
        Binary mode: current class vs. other classes.
        """
        try:
            # Create binary labels: current class vs. others
            y_binary = (y == current_class).astype(int)
            
            # Get samples from both classes
            class1_mask = (y_binary == 1)  # Current class
            class0_mask = (y_binary == 0)  # Other classes
            X1 = X[class1_mask]  # Current class samples
            X0 = X[class0_mask]  # Other class samples
            
            n1 = len(X1)
            n0 = len(X0)
            n_features = X.shape[1]
            
            # Check if sample count is sufficient
            if n1 < 2 or n0 < 1:
                return np.ones(n_features) / n_features
            
            # Compute mean vectors for both classes
            mu1 = np.mean(X1, axis=0)  # Current class mean
            mu0 = np.mean(X0, axis=0)  # Other class mean
            
            # Compute between-class scatter direction
            mean_diff = mu1 - mu0
            
            # Compute within-class scatter matrix Sw
            S1 = np.cov(X1.T) * (n1 - 1) if n1 > 1 else np.zeros((n_features, n_features))
            S0 = np.cov(X0.T) * (n0 - 1) if n0 > 1 else np.zeros((n_features, n_features))
            Sw = S1 + S0
            
            # Add regularization to prevent singular matrix
            epsilon = 1e-6
            Sw_reg = Sw + epsilon * np.eye(n_features)
            
            # Compute LDA weight vector: w = Sw^{-1} * (mu1 - mu0)
            try:
                Sw_inv = np.linalg.pinv(Sw_reg)
                weights = np.dot(Sw_inv, mean_diff)
            except np.linalg.LinAlgError:
                weights = mean_diff
            
            # Handle NaN or Inf values
            weights = np.nan_to_num(weights, nan=0.0, posinf=1.0, neginf=0.0)
            
            # Take absolute value and normalize
            weights = np.abs(weights)
            if np.sum(weights) > 0:
                weights = weights / np.sum(weights)
            else:
                weights = np.ones(n_features) / n_features
            
            return weights
            
        except Exception as e:
            print(f"Rayleigh quotient weight computation error: {e}")
            n_features = X.shape[1]
            return np.ones(n_features) / n_features
    
    def _centroid_weights(self, X, y, current_class):
        """
        Centroid method: weights based on ratio of each point to global centroid.
        """
        try:
            class_mask = (y == current_class)
            X_class = X[class_mask]
            n_samples_class = len(X_class)
            n_features = X.shape[1]
            
            if n_samples_class == 0:
                return np.ones(n_features) / n_features
            
            # Compute global centroid (mean of all points)
            centroid_all = np.mean(X, axis=0)
            epsilon = 1e-8
            centroid_all_safe = centroid_all + epsilon
            
            # For each point in current class, compute ratio to global centroid
            ratio_vectors = []
            for i in range(n_samples_class):
                x_point = X_class[i]
                with np.errstate(divide='ignore', invalid='ignore'):
                    ratio = np.divide(x_point, centroid_all_safe)
                ratio = np.nan_to_num(ratio, nan=1.0, posinf=1.0, neginf=0.0)
                ratio_abs = np.abs(ratio)
                ratio_vectors.append(ratio_abs)
            
            # Compute mean ratio vector as weights
            mean_ratio = np.mean(ratio_vectors, axis=0) if ratio_vectors else np.ones(n_features)
            
            # Normalize
            if np.sum(mean_ratio) > 0:
                weights = mean_ratio / np.sum(mean_ratio)
            else:
                weights = np.ones(n_features) / n_features
            
            return weights
            
        except Exception as e:
            print(f"Centroid weight computation error: {e}")
            return np.ones(X.shape[1]) / X.shape[1]
    
    def _inter_class_difference_weights(self, X, y, current_class):
        """Compute weights based on inter-class differences using precision matrix."""
        try:
            current_class_mask = (y == current_class)
            other_class_mask = (y != current_class)
            X_current = X[current_class_mask]
            X_other = X[other_class_mask]
            
            if len(X_current) < 2 or len(X_other) < 1:
                n_features = X.shape[1]
                return np.ones(n_features) / n_features
            
            cov_estimator_current = LedoitWolf().fit(X_current)
            sigma_current = cov_estimator_current.covariance_
            precision_current = np.linalg.pinv(sigma_current)
            
            mean_current = np.mean(X_current, axis=0)
            mean_other = np.mean(X_other, axis=0)
            mean_diff = mean_current - mean_other
            
            weights = np.dot(precision_current, mean_diff)
            
            if np.sum(np.abs(weights)) > 0:
                weights = np.abs(weights) / np.sum(np.abs(weights))
            else:
                n_features = X.shape[1]
                weights = np.ones(n_features) / n_features
            
            return weights
            
        except Exception as e:
            print(f"Inter-class difference weight computation error: {e}")
            return np.ones(X.shape[1]) / X.shape[1]
    
    def _f_score_weights(self, X, y, current_class):
        """Compute weights using F-score (ANOVA F-statistics)."""
        try:
            from sklearn.feature_selection import f_classif
            y_binary = (y == current_class).astype(int)
            f_scores, _ = f_classif(X, y_binary)
            f_scores = np.nan_to_num(f_scores, nan=0.0, posinf=1.0, neginf=0.0)
            
            if np.sum(f_scores) > 0:
                weights = f_scores / np.sum(f_scores)
            else:
                weights = np.ones(X.shape[1]) / X.shape[1]
            
            return weights
            
        except Exception as e:
            print(f"F-score weight computation error: {e}")
            return np.ones(X.shape[1]) / X.shape[1]
    
    def _inverse_covariance_weights(self, X, y=None, current_class=None):
        """Compute weights using inverse covariance matrix."""
        try:
            if current_class is not None and y is not None:
                current_class_mask = (y == current_class)
                X_used = X[current_class_mask]
                if len(X_used) < 2:
                    return np.ones(X.shape[1]) / X.shape[1]
            else:
                X_used = X
            
            cov_estimator = LedoitWolf().fit(X_used)
            sigma = cov_estimator.covariance_
            precision_matrix = np.linalg.pinv(sigma)
            
            n_features = X_used.shape[1]
            unit_vector = np.ones(n_features)
            weights = np.dot(precision_matrix, unit_vector)
            
            if np.sum(np.abs(weights)) > 0:
                weights = np.abs(weights) / np.sum(np.abs(weights))
            else:
                weights = np.ones(n_features) / n_features
            
            return weights
            
        except Exception as e:
            print(f"Inverse covariance weight computation error: {e}")
            return np.ones(X.shape[1]) / X.shape[1]

## 4. Binary Class Weight Corrector

Corrects local weight vectors using global manifold constraints based on the generalized Rayleigh quotient. For each class, treats the problem as binary classification (class c vs. not c).

In [None]:
class BinaryClassWeightCorrector(BaseEstimator):
    """
    Weight vector corrector based on binary global manifold constraints.
    
    For each class c, treats it as a binary classification problem (c vs. not c),
    then corrects the local weight vector via generalized Rayleigh quotient manifold constraint.
    
    Parameters
    ----------
    reg_param : float
        Regularization parameter to prevent numerical instability
    alpha : float
        Balance parameter between local weight preservation and global manifold constraint
    method : str
        Correction method: 'projection' or 'optimization'
    """
    
    def __init__(self, reg_param=1e-6, alpha=0.5, method='projection'):
        self.reg_param = reg_param
        self.alpha = alpha
        self.method = method
        self.class_stats_ = {}
    
    def fit(self, X_train, y_train):
        """
        Fit on training data, compute statistics for each class.
        
        Parameters
        ----------
        X_train : array-like of shape (n_samples, n_features)
            Training feature matrix
        y_train : array-like of shape (n_samples,)
            Training labels
        """
        X_train, y_train = check_X_y(X_train, y_train)
        self.X_train_ = X_train
        self.y_train_ = y_train
        self.n_features_ = X_train.shape[1]
        self.classes_ = np.unique(y_train)
        
        self._compute_class_statistics()
        return self
    
    def _compute_class_statistics(self):
        """Compute statistics for each class."""
        for cls in self.classes_:
            # Get data for class c and not-c
            X_c = self.X_train_[self.y_train_ == cls]
            X_not_c = self.X_train_[self.y_train_ != cls]
            n_c = X_c.shape[0]
            n_not_c = X_not_c.shape[0]
            
            # Compute means
            mean_c = np.mean(X_c, axis=0)
            mean_not_c = np.mean(X_not_c, axis=0)
            global_mean = (n_c * mean_c + n_not_c * mean_not_c) / (n_c + n_not_c)
            
            # Compute within-class scatter matrices
            S_w_c = self._compute_within_class_scatter(X_c, mean_c)
            S_w_not_c = self._compute_within_class_scatter(X_not_c, mean_not_c)
            S_w = S_w_c + S_w_not_c
            
            # Compute between-class scatter matrix
            S_b = self._compute_between_class_scatter_binary(
                mean_c, mean_not_c, n_c, n_not_c, global_mean
            )
            
            # Store statistics
            self.class_stats_[cls] = {
                'S_w': S_w,
                'S_b': S_b,
                'mean_c': mean_c,
                'mean_not_c': mean_not_c,
                'global_mean': global_mean,
                'n_c': n_c,
                'n_not_c': n_not_c
            }
    
    def _compute_within_class_scatter(self, X, mean):
        """Compute within-class scatter matrix."""
        n_samples, n_features = X.shape
        S_w = np.zeros((n_features, n_features))
        for i in range(n_samples):
            diff = (X[i] - mean).reshape(-1, 1)
            S_w += diff @ diff.T
        return S_w
    
    def _compute_between_class_scatter_binary(self, mean_c, mean_not_c, n_c, n_not_c, global_mean):
        """Compute between-class scatter matrix for binary classification."""
        diff_c = (mean_c - global_mean).reshape(-1, 1)
        diff_not_c = (mean_not_c - global_mean).reshape(-1, 1)
        S_b = n_c * (diff_c @ diff_c.T) + n_not_c * (diff_not_c @ diff_not_c.T)
        return S_b
    
    def correct_weight_vector(self, w_local, class_label):
        """
        Correct local weight vector.
        
        Parameters
        ----------
        w_local : array-like of shape (n_features,)
            Local weight vector to correct
        class_label : int
            Class label for the binary classification problem
        
        Returns
        -------
        w_corrected : array-like of shape (n_features,)
            Corrected weight vector
        """
        if not hasattr(self, 'X_train_'):
            raise ValueError("Model not fitted. Please call fit() first.")
        
        if class_label not in self.class_stats_:
            raise ValueError(f"Class {class_label} not in training data")
        
        w_local = np.array(w_local).flatten()
        if len(w_local) != self.n_features_:
            raise ValueError(f"Weight vector dimension mismatch: expected {self.n_features_}, got {len(w_local)}")
        
        # Get statistics for this class
        stats = self.class_stats_[class_label]
        S_w = stats['S_w']
        S_b = stats['S_b']
        
        # Add regularization
        S_w_reg = S_w + self.reg_param * np.eye(self.n_features_)
        
        if self.method == 'projection':
            w_corrected = self._project_to_manifold(w_local, S_b, S_w_reg)
        elif self.method == 'optimization':
            w_corrected = self._optimize_with_constraints(w_local, S_b, S_w_reg)
        else:
            raise ValueError(f"Unknown correction method: {self.method}")
        
        # Normalize
        w_corrected = self._normalize_weight_vector(w_corrected)
        return w_corrected
    
    def _project_to_manifold(self, w_local, S_b, S_w):
        """Project onto generalized Rayleigh quotient manifold."""
        try:
            # Solve generalized eigenvalue problem: S_b * w = λ * S_w * w
            eigenvalues, eigenvectors = eigh(S_b, S_w)
            
            # Sort by descending eigenvalues
            idx = np.argsort(eigenvalues)[::-1]
            eigenvectors = eigenvectors[:, idx]
            
            # Select eigenvector with largest eigenvalue
            w_manifold = eigenvectors[:, 0]
            
            # Compute projection coefficient
            coeff = np.dot(w_manifold, w_local) / np.dot(w_manifold, w_manifold)
            
            # Project onto manifold direction
            w_projected = coeff * w_manifold
            
            # Combine with original vector
            w_corrected = self.alpha * w_projected + (1 - self.alpha) * w_local
            
            return w_corrected
        except np.linalg.LinAlgError:
            return w_local
    
    def _optimize_with_constraints(self, w_local, S_b, S_w):
        """Correct weight vector through optimization."""
        def objective_function(w, w_local, S_b, S_w, alpha):
            """Objective: balance local weight preservation and Rayleigh quotient maximization."""
            w = w.reshape(-1, 1)
            w_local = w_local.reshape(-1, 1)
            
            # Term 1: Difference from local weight
            diff_term = np.sum((w - w_local)**2)
            
            # Term 2: Generalized Rayleigh quotient (maximize)
            denominator = w.T @ S_w @ w
            if denominator > 1e-10:
                rayleigh_quotient = (w.T @ S_b @ w) / denominator
            else:
                rayleigh_quotient = 0
            
            # We want to maximize Rayleigh quotient, so negate it in objective
            return alpha * diff_term - (1 - alpha) * rayleigh_quotient
        
        # Use L-BFGS-B optimization
        result = minimize(
            objective_function,
            w_local,
            args=(w_local, S_b, S_w, self.alpha),
            method='L-BFGS-B',
            bounds=[(-1, 1)] * len(w_local),
            options={'maxiter': 1000, 'ftol': 1e-8}
        )
        
        return result.x if result.success else w_local
    
    def _normalize_weight_vector(self, w):
        """Normalize weight vector."""
        w_norm = np.linalg.norm(w)
        return w / w_norm if w_norm > 1e-10 else w
    
    def compute_binary_rayleigh_quotient(self, w, class_label):
        """Compute generalized Rayleigh quotient for a weight vector."""
        if class_label not in self.class_stats_:
            raise ValueError(f"Class {class_label} not in training data")
        
        stats = self.class_stats_[class_label]
        S_w = stats['S_w'] + self.reg_param * np.eye(self.n_features_)
        S_b = stats['S_b']
        
        w = w.reshape(-1, 1)
        numerator = w.T @ S_b @ w
        denominator = w.T @ S_w @ w
        
        return (numerator / denominator).flatten()[0] if denominator > 1e-10 else 0.0

## 5. Weighted KNN Classifier

The final classifier that uses class-specific feature weights to compute distances and performs distance-weighted voting.

In [None]:
class WeightedKNNClassifier:
    """
    Weighted KNN classifier based on local region weights.
    
    Parameters
    ----------
    k : int
        Number of neighbors to consider
    distance_weight_method : str
        Method for computing distance weights: 'exponential', 'inverse', 'gaussian'
    inv_cov_matrix : array-like, optional
        Inverse covariance matrix for Mahalanobis distance
    """
    
    def __init__(self, k=7, distance_weight_method='exponential', inv_cov_matrix=None):
        self.k = k
        self.X_train = None
        self.y_train = None
        self.X_row_number = None
        self.feature_names = None
        self.distance_weight_method = distance_weight_method
        self.inv_cov_matrix = inv_cov_matrix
    
    def fit(self, X_train, y_train, X_row_number, feature_names=None):
        """Fit the classifier with training data."""
        self.X_train = X_train
        self.y_train = y_train
        self.X_row_number = X_row_number
        self.feature_names = feature_names or [f"Feature_{i}" for i in range(X_train.shape[1])]
    
    def calculate_improved_distance_weight(self, distances, method='rank_exponential', top_m=5):
        """
        Improved distance weight calculation that emphasizes nearest neighbors.
        
        Parameters
        ----------
        distances : array-like
            Sorted distances to neighbors
        method : str
            Weight method: 'rank_exponential', 'rank_power', 'relative_distance', 'top_m_dominant'
        top_m : int
            Number of dominant neighbors for 'top_m_dominant' method
        """
        if len(distances) == 0:
            return np.array([])
        
        distances = np.array(distances)
        
        if method == 'rank_exponential':
            # Rank-based exponential weight: nearest neighbor has highest weight with rapid decay
            ranks = np.arange(1, len(distances) + 1)
            base = 2.0  # Higher base = faster decay
            weights = base ** (-ranks + 1)  # Rank 1: 2^0=1, Rank 2: 2^-1=0.5, etc.
            weights = [w * (1/(d+1e-8)) for w, d in zip(weights, distances)]
            
        elif method == 'rank_power':
            # Rank-based power weight
            ranks = np.arange(1, len(distances) + 1)
            power = 3.0  # Higher power = faster decay
            weights = 1.0 / (ranks ** power)
            
        elif method == 'relative_distance':
            # Relative distance weight: based on ratio to nearest distance
            min_distance = np.min(distances)
            if min_distance == 0:
                min_distance = 1e-8
            relative_distances = distances / min_distance
            weights = np.exp(2.0 * (1 - relative_distances))
            
        elif method == 'top_m_dominant':
            # Only top m neighbors have significant weight
            weights = np.zeros(len(distances))
            for i in range(min(top_m, len(distances))):
                weights[i] = 1.0 if i == 0 else 0.1
        else:
            # Default: rank-based exponential
            ranks = np.arange(1, len(distances) + 1)
            weights = 2.0 ** (-ranks + 1)
        
        # Normalize
        weights = np.array(weights)
        if np.sum(weights) > 0:
            weights = weights / np.sum(weights)
        
        return weights
    
    def calculate_distance_weight(self, distances, method='exponential'):
        """Calculate distance weights: smaller distance = higher weight."""
        if len(distances) == 0:
            return np.array([])
        
        distances = np.array(distances) + 1e-8  # Avoid division by zero
        
        if method == 'inverse':
            weights = 1.0 / distances
        elif method == 'gaussian':
            sigma = np.median(distances)
            if sigma < 1e-8:
                sigma = 1.0
            weights = np.exp(-distances**2 / (2 * sigma**2))
        elif method == 'exponential':
            weights = np.exp(-distances)
        else:
            weights = 1.0 / distances
        
        # Normalize
        if np.sum(weights) > 0:
            weights = weights / np.sum(weights)
        
        return weights
    
    def calculate_weighted_distance(self, target_point, weights):
        """
        Calculate weighted Euclidean distances from target point to all training samples.
        
        Returns
        -------
        list of tuples
            (row_id, distance, class_label) for each training sample
        """
        weights = weights / np.sum(weights)  # Ensure normalization
        
        weighted_distances = []
        for i in range(len(self.X_train)):
            # Feature-weighted Euclidean distance
            weighted_diff = weights * (self.X_train[i] - target_point)
            distance = np.sqrt(np.sum(weighted_diff ** 2))
            weighted_distances.append((self.X_row_number[i], distance, self.y_train[i]))
        
        return weighted_distances
    
    def predict_with_distance_weighted_voting(self, target_point, comprehensive_weights, 
                                               local_region_classes, verbose=False):
        """
        KNN prediction with distance-weighted voting.
        
        Parameters
        ----------
        target_point : array-like
            Query point to classify
        comprehensive_weights : dict
            Class-specific feature weight vectors
        local_region_classes : list
            Classes present in the local region
        verbose : bool
            Whether to print detailed information
        
        Returns
        -------
        tuple
            (predicted_class, max_consistency, knn_results, weighted_votes)
        """
        if verbose:
            print(f"\n=== Distance-Weighted KNN Prediction ===")
            print(f"Target point: {target_point}")
        
        knn_results = {}
        weighted_votes = {}
        
        for cls, weights in comprehensive_weights.items():
            if cls not in local_region_classes:
                continue
            
            # Calculate weighted distances
            weighted_distances = self.calculate_weighted_distance(target_point, weights)
            
            # Sort by distance and select k neighbors
            weighted_distances.sort(key=lambda x: x[1])
            k_neighbors = weighted_distances[:self.k]
            
            if verbose:
                print(f"\nClass {cls} nearest {self.k} neighbors:")
                print(k_neighbors)
            
            # Extract distances and class information
            distances = [item[1] for item in k_neighbors]
            neighbor_classes = [item[2] for item in k_neighbors]
            
            # Compute distance weights
            distance_weights = self.calculate_improved_distance_weight(distances)
            class_count = sum(1 for _, _, neighbor_class in k_neighbors if neighbor_class == cls)
            consistency = class_count / self.k
            
            # Compute weighted vote (considering distance weight)
            weighted_vote = 0.0
            for rank, neighbor_class in enumerate(neighbor_classes):
                if neighbor_class == cls:
                    weighted_vote = consistency * 1/(distances[rank] + 1e-8)
                    break
            
            weighted_votes[cls] = weighted_vote
            
            # Store detailed results
            knn_results[cls] = {
                'neighbors': k_neighbors,
                'distances': distances,
                'distance_weights': distance_weights,
                'neighbor_classes': neighbor_classes,
                'consistency': weighted_vote,
                'weights_used': weights
            }
        
        # Select prediction using weighted votes
        if weighted_votes:
            predicted_class = max(weighted_votes.items(), key=lambda x: x[1])[0]
            max_weighted_consistency = weighted_votes[predicted_class]
            
            if verbose:
                print(f"\nPrediction result:")
                print(f"  Weighted vote prediction: Class {predicted_class} (consistency: {max_weighted_consistency:.4f})")
            
            return predicted_class, max_weighted_consistency, knn_results, weighted_votes
        else:
            # Fallback to simple KNN
            distances = np.linalg.norm(self.X_train - target_point, axis=1)
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_classes = self.y_train[nearest_indices]
            predicted_class = Counter(nearest_classes).most_common(1)[0][0]
            return predicted_class, 0.0, {}, {}

## 6. Main Classification Pipeline

This section demonstrates the complete MCAW-KNN classification pipeline.

In [None]:
def run_mcaw_knn_classification(file_path, target_column='quality', test_size=0.2, 
                                 k_neighbors=7, region_size=20, verbose=False):
    """
    Run the complete MCAW-KNN classification pipeline.
    
    Parameters
    ----------
    file_path : str
        Path to the CSV data file
    target_column : str
        Name of the target column
    test_size : float
        Proportion of data to use for testing
    k_neighbors : int
        Number of neighbors for KNN
    region_size : int
        Size of local regions
    verbose : bool
        Whether to print detailed progress
    
    Returns
    -------
    dict
        Results including accuracy and detailed test information
    """
    
    # ========== Step 1: Load and preprocess data ==========
    print("=" * 60)
    print("Step 1: Data Loading and Preprocessing")
    print("=" * 60)
    
    df = pd.read_csv(file_path)
    
    # Add row number column (actual row number in CSV, starting from 2 since row 1 is header)
    df['csv_row_number'] = range(2, len(df) + 2)
    df = df.dropna()
    
    # Handle categorical columns if present
    if 'type' in df.columns:
        df['type'] = df['type'].map({'red': 1, 'white': 2, 'Red': 1, 'White': 2}).fillna(0)
    
    # Separate features, labels, and row indices
    row_indices = df['csv_row_number'].values
    X = df.drop([target_column, 'csv_row_number'], axis=1).values
    y = df[target_column].values
    feature_names = df.drop([target_column, 'csv_row_number'], axis=1).columns.tolist()
    
    print(f"Data shape: X{X.shape}, y{y.shape}")
    print(f"Class distribution: {dict(Counter(y))}")
    print(f"Features: {feature_names}")
    
    # ========== Step 2: Train-test split ==========
    print("\n" + "=" * 60)
    print("Step 2: Train-Test Split")
    print("=" * 60)
    
    X_train, X_test, y_train, y_test, row_indices_train, row_indices_test = train_test_split(
        X, y, row_indices,
        test_size=test_size,
        random_state=42,
        stratify=y
    )
    
    # Standardize features
    scaler = RobustScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    
    minmax_scaler = MinMaxScaler()
    X_train_normalized = minmax_scaler.fit_transform(X_train_scaled)
    
    X_test_scaled = scaler.transform(X_test)
    X_test_normalized = minmax_scaler.transform(X_test_scaled)
    
    print(f"Training set: {X_train.shape}, Test set: {X_test.shape}")
    print(f"Training class distribution: {dict(Counter(y_train))}")
    
    # ========== Step 3: Initialize components ==========
    print("\n" + "=" * 60)
    print("Step 3: Initialize MCAW-KNN Components")
    print("=" * 60)
    
    # Compute inverse covariance matrix for optional Mahalanobis distance
    cov_matrix = np.cov(X_train_normalized.T)
    try:
        inv_cov_matrix = np.linalg.inv(cov_matrix)
    except np.linalg.LinAlgError:
        inv_cov_matrix = np.linalg.pinv(cov_matrix)
    
    # Initialize weight corrector
    corrector = BinaryClassWeightCorrector()
    corrector.fit(X_train, y_train)
    print("✓ Weight corrector initialized")
    
    # Initialize weight calculator
    weight_calculator = GlobalWeightMatrix(method='lda')
    print("✓ Weight calculator initialized")
    
    # Initialize region builder
    region_builder = SmoothLocalRegionBuilder(
        k_region=15,
        region_size=region_size,
        alpha=1.0,
        beta=1.0,
        confidence_level=0.95,
        sigma=0.5
    )
    region_builder.fit(X_train, y_train, row_indices_train)
    print("✓ Region builder initialized")
    
    # Initialize KNN classifier
    knn_classifier = WeightedKNNClassifier(k=k_neighbors, inv_cov_matrix=inv_cov_matrix)
    knn_classifier.fit(X_train, y_train, row_indices_train, feature_names)
    print("✓ KNN classifier initialized")
    
    # ========== Step 4: Classification ==========
    print("\n" + "=" * 60)
    print("Step 4: Running Classification")
    print("=" * 60)
    
    correct_count = 0
    total_count = len(X_test)
    results = {'details': {}}
    
    for target_idx in range(len(X_test)):
        target_point = X_test[target_idx]
        target_class = y_test[target_idx]
        target_row = row_indices_test[target_idx]
        
        # Build local region
        best_region = region_builder.build_local_region(target_point, verbose=False)
        
        if best_region is not None and len(best_region) > 0:
            # Extract local region data
            X_region = X_train[best_region]
            y_region = y_train[best_region]
            
            # Compute weights for local region
            weight_calculator.fit(X_region, y_region)
            comprehensive_weights = weight_calculator.get_class_weight_dict()
            
            # Apply weight correction
            for cls in comprehensive_weights.keys():
                class_weight = comprehensive_weights[cls]
                corrected_weight = corrector.correct_weight_vector(class_weight, cls)
                comprehensive_weights[cls] = corrected_weight
            
            # Get classes in local region
            local_region_classes = list(set(y_region))
            
            # Make prediction
            predicted_class, consistency, knn_results, weighted_votes = \
                knn_classifier.predict_with_distance_weighted_voting(
                    target_point, comprehensive_weights, local_region_classes, verbose=verbose
                )
            
            # Check if correct
            is_correct = (predicted_class == target_class)
            if is_correct:
                correct_count += 1
            
            # Store result
            results['details'][target_idx] = {
                'target_row': target_row,
                'target_class': target_class,
                'predicted_class': predicted_class,
                'correct': is_correct,
                'consistency': consistency
            }
            
            if verbose or target_idx % 10 == 0:
                print(f"Sample {target_idx+1}/{total_count}: True={target_class}, Pred={predicted_class}, {'✓' if is_correct else '✗'}")
    
    # ========== Step 5: Results summary ==========
    accuracy = correct_count / total_count
    results['accuracy'] = accuracy
    results['correct_count'] = correct_count
    results['total_count'] = total_count
    
    print("\n" + "=" * 60)
    print("Classification Results")
    print("=" * 60)
    print(f"Total test samples: {total_count}")
    print(f"Correctly classified: {correct_count}")
    print(f"Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)")
    
    return results

## 7. Example Usage

Demonstrate the algorithm on the Wine Quality dataset.

In [None]:
# Example: Run on Wine Quality dataset
# Uncomment and modify the path to run on your data

# file_path = 'path/to/your/winequalityN.csv'
# results = run_mcaw_knn_classification(
#     file_path=file_path,
#     target_column='quality',
#     test_size=0.2,
#     k_neighbors=7,
#     region_size=20,
#     verbose=False
# )
# 
# print(f"\nFinal Accuracy: {results['accuracy']:.4f}")

## 8. Algorithm Summary

### Key Components:

1. **SmoothLocalRegionBuilder**: Constructs adaptive local regions using:
   - Bayesian smoothing for small sample handling
   - Wilson score intervals for conservative estimates
   - Gaussian kernel for distance adjustment

2. **GlobalWeightMatrix**: Computes class-specific feature weights using:
   - LDA-based Rayleigh quotient
   - F-score statistics
   - Centroid-based ratios

3. **BinaryClassWeightCorrector**: Corrects weights using:
   - Manifold projection
   - Generalized Rayleigh quotient optimization

4. **WeightedKNNClassifier**: Final classification using:
   - Feature-weighted distances
   - Distance-weighted voting

### Algorithm Flow:
```
Input: Query point x
  ↓
Build local region around x
  ↓
Compute class-specific feature weights on local region
  ↓
Apply global manifold correction to weights
  ↓
Weighted KNN classification with distance voting
  ↓
Output: Predicted class
```