# A Custom K-NN implementation 
A k-nn algorithm that can:
1. Handle missing values
2. Handle class imbalance 
3. Handle different scales of data 
4. Automatically picks the best value for k 
5. Is close to the same speed as sklearn's 
6. Should run in parralel
7. Regression or Classification 

## Planning 
1. Basic implementation 
2. First get it to run faster with the optimisations
3. Just use simulated data

In [669]:
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification, make_regression
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import accuracy_score, r2_score, recall_score
import collections
from scipy.spatial.distance import pdist, cdist

In [805]:
class Custom_Knn(BaseEstimator, ClassifierMixin):
    """
    A custom k-nearest neighbors (KNN) classifier and regressor that handles missing values.
    
    Parameters:
    - neighbors (int): Number of nearest neighbors to consider.
    - optimise (bool): Whether to optimize hyperparameters.
    - method (str): 'c' for classification, 'r' for regression.
    """
    def __init__(self, neighbors=None, optimise=False, method='c', imbalance=False):
        self.optimise = optimise
        self.neighbors = neighbors
        self.method = method
        self.imbalance = imbalance
        self.label_type_ = None
        self.best_neighbors = None 
        self.neighbor_scores = None
    
    def fit(self, X, y):
        """
        Fit the model with training data.
        
        Parameters:
        - X (array-like): Training feature matrix.
        - y (array-like): Target values.
        """
        self.X_ = np.array(X)
        type_of_values = self.get_first_non_none_type(y)
        if self.method == 'r': 
            if len(y) == 0:
                z = 1
            if issubclass(type_of_values, np.floating):
                self.label_type_ = float
            elif issubclass(type_of_values, np.str_):
                self.label_type_ = str
                #check if fails
            elif issubclass(type_of_values, np.bool_):
                self.method = 'c'
            else:
                self.label_type_ = float
            
            self.y_ = np.array(y, dtype=float)
        else: 
            if issubclass(type_of_values, np.integer):
                print("int")
                self.label_type_ = int
            elif issubclass(type_of_values, np.bool_):
                self.label_type_ = bool
            else:
                self.label_type_ = str
            self.y_ = self._format_labels(y)

            
        self.masks_ = self._compute_masks(X)
    
    def predict(self, X):
        """
        Predict target values for the given input data.
        
        Parameters:
        - X (array-like): Input feature matrix.
        
        Returns:
        - list: Predicted values.
        """

        X = np.array(X)
        self.test_mask_ = self._compute_masks(X)
        if self.method == 'c':
            predictions = [self._predict_single_classification(x, test_mask) for x, test_mask in zip(X, self.test_mask_)]
        else:
            predictions = [self._predict_single_regression(x, test_mask) for x, test_mask in zip(X, self.test_mask_)]
        return [self.label_type_(pred) for pred in predictions]
    
    def predict_best_neighbor(self, X_test, y_test, metric=None, n_range = None):
        if n_range is None:
            n_range = (1, self.X_.shape[0])
        
        point_distances = cdist(X_test, self.X_, metric='euclidean')
        
        # Now I have the distance, do prediction with n = n,
        best_n = 0
        best_score = 0
        n_neighbor_scores = {}
        if self.method == 'c':
            metric = 'accuracy'
            # calculate distance from each test_point to each train_point
            for n in n_range:
                predictions = self._predict_with_variable_n(X_test=X_test, neighbors=n, method='c', distances=point_distances)
                acc = self._get_accuracy(predicted=predictions, actual=y_test)
                if acc > best_score:
                    best_score = acc
                    best_n = n
                n_neighbor_scores[n] = acc
                print(acc)
            # do prediction with n = 1 and store all the way to n = n and return best score

        else:
            metric = 'mean_squared_error'
        return best_n, best_score, n_neighbor_scores
    
    def _predict_single_regression(self, x, test_mask):
        """
        Predict a single regression instance.
        """
        distances = [self._distance(x, x_train, train_mask, test_mask) for x_train, train_mask in zip(self.X_, self.masks_)]
        nearest_neigbor_indices = sorted(range(len(distances)), key=lambda sub: distances[sub])[:self.neighbors]
        pred = self._mean([self.y_[index] for index in nearest_neigbor_indices])
        return pred

    def _predict_single_classification(self, x, test_mask):
        """
        Predict a single classification instance.
        """
        distances = [self._distance(x, x_train, train_mask, test_mask) for x_train, train_mask in zip(self.X_, self.masks_)]
        nearest_neigbor_indices = sorted(range(len(distances)), key=lambda sub: distances[sub])[:self.neighbors]
        if self.imbalance: 
            pred = self._sheppards_method(distances=[distances[index] for index in nearest_neigbor_indices], labels=[self.y_[index] for index in nearest_neigbor_indices])
        else: pred = self._mode([self.y_[index] for index in nearest_neigbor_indices])
        return pred
    
    def _predict_with_variable_n(self, X_test, neighbors, method, distances):
        X_test = np.array(X_test)
        self.test_mask_ = self._compute_masks(X_test)
        # i, j = 1, 3  # Corresponds to second and fourth points
        # index = n * i - (i * (i + 1)) // 2 + (j - i - 1)
        predictions = [] 
        for i in range(X_test.shape[0]) :
            #single_obervation_distances = self._get_distances_from_point(distances, i, n_points= X_test.shape[0])
            single_obervation_distances = distances[i]
            nearest_neigbor_indices = sorted(range(len(single_obervation_distances)), key=lambda sub: single_obervation_distances[sub])[:neighbors]
            if self.imbalance: 
                pred = self._sheppards_method(distances=[distances[index] for index in nearest_neigbor_indices], labels=[self.y_[index] for index in nearest_neigbor_indices])
            else: pred = self._mode([self.y_[index] for index in nearest_neigbor_indices])
            predictions.append(pred)
        print(predictions)
        print(type(predictions))
        print(type(predictions[0]))
        return predictions

        # keep complexigty n^2/2
        # get prediction with n neighbors 
        # You have to get all distances for a specific point from the compact representation. 
        # Do not turn it into a squared matrix


        return True
        
    def _distance(self, a, b, train_mask, test_mask):
        """
        Compute Euclidean distance between two feature vectors, considering masks.
        """
        mask = train_mask & test_mask
        if not np.any(mask): #If there are no values for which both observations have valid values return a big value
            return np.inf
        diff = (a[mask] - b[mask]) ** 2
        return np.sqrt(np.sum(diff))
    
    def _mean(self, ls):
        """
        Compute the mean of a list.
        """
        return sum(ls) / len(ls)
    
    def _mode(self, ls):
        """
        Compute the most common value in a list.
        """
        return collections.Counter(ls).most_common()[0][0]
    
    def _sheppards_method(self, distances, labels):
        """
        Apply Sheppard's method to perform weighted voting based on inverse distances.
        
        Parameters:
        - distances (list of float): Distances of neighbors to the query point.
        - labels (list): Corresponding class labels of the neighbors.
        
        Returns:
        - predicted_class: The class with the highest weighted vote.
        """
        weights = 1 / (np.array(distances) + 1e-5)
        weighted_votes = {}
        for label, weight in zip(labels, weights):
            label = str(label)
            if label in weighted_votes:
                weighted_votes[label] += weight
            else:
                weighted_votes[label] = weight
        
        predicted_class = max(weighted_votes, key=weighted_votes.get)
        return predicted_class
    
    def _compute_masks(self, values):
        """
        Compute boolean masks to identify non-missing values.
        """
        return ~pd.isna(values)
    
    def _format_labels(self, y):
        if isinstance(y, pd.Series):  # Convert Pandas Series to list
            y = y.to_numpy()
        elif isinstance(y, np.ndarray):  # Convert NumPy array to list
            y = y.tolist()
        elif not isinstance(y, list):
            raise ValueError("Input y should be a list, NumPy array, or Pandas Series.")

        return list(map(str, y))  # Convert all labels to string
    
    def get_first_non_none_type(self, lst):
        for item in lst:
            if item is not None:  # Skip None values
                return type(item)
        return False  # If only None values are present
    
    def _get_accuracy(self, predicted, actual):
        correct = sum(1 for true, pred in zip(actual, predicted) if true == pred)
        total = len(actual)
        return correct / total
    
    def _get_distances_from_point(self, condensed_distances, point_index, n_points):
        """
        Extract distances from a specific point in a condensed pdist distance matrix.

        Parameters:
        - condensed_distances (1D array): The condensed pdist distance matrix.
        - point_index (int): The index of the point whose distances we want to extract.
        - n_points (int): Total number of points in the dataset.

        Returns:
        - np.ndarray: 1D array of distances from the given point to all others.
        """
        distances = np.zeros(n_points)  # Initialize distance array

        # Fill distances to other points using condensed matrix indexing formula
        for j in range(n_points):
            if point_index == j:
                distances[j] = 0  # Distance to itself is always zero
            elif point_index < j:
                idx = n_points * point_index - (point_index * (point_index + 1)) // 2 + (j - point_index - 1)
                distances[j] = condensed_distances[idx]
            else:
                idx = n_points * j - (j * (j + 1)) // 2 + (point_index - j - 1)
                distances[j] = condensed_distances[idx]

        return distances



In [806]:
# Generate classification dataset
X, y = make_classification(n_samples=10, n_features=5, n_informative=4, 
                           n_redundant=0, n_classes=2, random_state=42)
# Introduce missing values
missing_fraction = 0  # 5% missing values
num_missing = int(missing_fraction * X.size)

# Randomly select indices to introduce NaNs
np.random.seed(42)  # For reproducibility
missing_indices = np.random.choice(X.size, num_missing, replace=False)

# Flatten X, introduce NaNs, and reshape back
X_flattened = X.flatten()
X_flattened[missing_indices] = np.nan
X = X_flattened.reshape(X.shape)

# Introduce class imbalance by resampling
def introduce_class_imbalance(X, y, majority_class_ratio=0.8):
    # Get indices of both classes
    class_0_indices = np.where(y == 0)[0]
    class_1_indices = np.where(y == 1)[0]

    # Introduce imbalance by keeping only a fraction of one class
    np.random.seed(42)
    if len(class_0_indices) > len(class_1_indices):
        class_0_indices = np.random.choice(class_0_indices, int(len(class_0_indices) * majority_class_ratio), replace=True)
    else:
        class_1_indices = np.random.choice(class_1_indices, int(len(class_1_indices) * majority_class_ratio), replace=True)

    # Combine the new indices
    selected_indices = np.concatenate([class_0_indices, class_1_indices])

    # Subset the data
    X_imbalanced = X[selected_indices]
    y_imbalanced = y[selected_indices]

    return X_imbalanced, y_imbalanced

# Introduce class imbalance (e.g., 80% of majority class remains)
X_imbalanced, y_imbalanced = introduce_class_imbalance(X, y, majority_class_ratio=1)

y = list(map(str, y))
y_imbalanced = list(map(str, y_imbalanced))
# Print class distribution before and after imbalance
print("Original class distribution:", np.bincount(y))
print("Imbalanced class distribution:", np.bincount(y_imbalanced))


Original class distribution: [5 5]
Imbalanced class distribution: [5 5]


In [807]:
X_reg, y_reg = make_regression(n_samples=100, n_features=5, n_informative=5)
missing_fraction = 0.05  # 10% missing values

# Calculate number of missing values
num_missing = int(missing_fraction * X_reg.size)

# Randomly select indices to introduce NaNs
np.random.seed(42)  # For reproducibility
missing_indices = np.random.choice(X_reg.size, num_missing, replace=False)

# Flatten X, introduce NaNs, and reshape back
X_flattened = X_reg.flatten()
X_flattened[missing_indices] = np.nan
X_reg = X_flattened.reshape(X_reg.shape)
y_reg = list(map(str, y_reg))

# Model 
1. sklearn model format 

In [808]:
c_knn_class = Custom_Knn(neighbors=5, method='c')
c_knn_class_imbalance_not_set = Custom_Knn(neighbors=5, method='c')
c_knn_class_imbalance = Custom_Knn(neighbors=5, method='c', imbalance=True)
c_knn_regg = Custom_Knn(neighbors=1, method='r')


In [809]:
c_knn_regg.fit(X_reg, y_reg)
c_knn_class.fit(X, y)
c_knn_class_imbalance.fit(X_imbalanced, y_imbalanced)
c_knn_class_imbalance_not_set.fit(X_imbalanced, y_imbalanced)


In [810]:
pred_regg = c_knn_regg.predict(X=X_reg)

pred_class = c_knn_class.predict(X=X)

pred_class_imbalance = c_knn_class_imbalance.predict(X=X_imbalanced)

pred_class_imbalance_not_set = c_knn_class_imbalance_not_set.predict(X=X_imbalanced)


In [811]:
c_knn_class.label_type_

str

In [812]:
print(accuracy_score(y, pred_class))

print(r2_score(y_reg, pred_regg))

0.6
1.0


In [813]:
y_reg[0]

'7.853102481383311'

In [814]:
type(pred_class[0])

str

In [815]:
print(accuracy_score(y, pred_class))
print(recall_score(y, pred_class, pos_label='0'))


0.6
1.0


In [816]:
print(accuracy_score(y_imbalanced, pred_class_imbalance))
print(recall_score(y_imbalanced, pred_class_imbalance, pos_label='0'))

print(accuracy_score(y_imbalanced, pred_class_imbalance_not_set))
print(recall_score(y_imbalanced, pred_class_imbalance_not_set, pos_label='0'))

1.0
1.0
0.5
0.4


In [817]:
a, b, c = c_knn_class.predict_best_neighbor(X_test=X, y_test=y, metric='acc', n_range=(1, 15, 2, 3, 4, 5,6,7, 7, 7, 7))

['1', '1', '1', '0', '0', '1', '0', '0', '1', '0']
<class 'list'>
<class 'str'>
1.0
['1', '1', '1', '0', '0', '1', '0', '0', '1', '0']
<class 'list'>
<class 'str'>
1.0
['1', '1', '1', '0', '0', '1', '0', '0', '1', '0']
<class 'list'>
<class 'str'>
1.0
['1', '1', '0', '0', '0', '1', '0', '0', '1', '0']
<class 'list'>
<class 'str'>
0.9
['1', '1', '0', '0', '0', '1', '0', '0', '1', '0']
<class 'list'>
<class 'str'>
0.9
['1', '0', '0', '0', '0', '0', '0', '0', '0', '0']
<class 'list'>
<class 'str'>
0.6
['1', '0', '0', '0', '0', '0', '0', '0', '0', '0']
<class 'list'>
<class 'str'>
0.6
['1', '0', '0', '0', '0', '0', '0', '0', '0', '0']
<class 'list'>
<class 'str'>
0.6
['1', '0', '0', '0', '0', '0', '0', '0', '0', '0']
<class 'list'>
<class 'str'>
0.6
['1', '0', '0', '0', '0', '0', '0', '0', '0', '0']
<class 'list'>
<class 'str'>
0.6
['1', '0', '0', '0', '0', '0', '0', '0', '0', '0']
<class 'list'>
<class 'str'>
0.6


In [818]:
y


['1', '1', '1', '0', '0', '1', '0', '0', '1', '0']

In [819]:
a

1

In [820]:
b

1.0

In [821]:
c

{1: 1.0, 15: 1.0, 2: 1.0, 3: 0.9, 4: 0.9, 5: 0.6, 6: 0.6, 7: 0.6}