In [32]:
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from sklearn.neighbors import KDTree
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt


In [33]:
class IMetric(ABC):

    @abstractmethod
    def get_distances(self, data : pd.DataFrame, point : pd.Series) -> np.ndarray:
        pass

    @abstractmethod
    def get_distance(self, data : pd.Series, point : pd.Series) -> float:
        pass

In [34]:
class CosineMetric(IMetric):

    def get_distances(self, data : pd.DataFrame, point : pd.Series) -> np.ndarray:   
        return (np.ones(len(data)) - data.dot(point) / 
                (np.linalg.norm(data, axis=1) * np.linalg.norm(point)))
        
    def get_distance(self, data : pd.Series, point : pd.Series) -> float:
        return (1 - data.dot(point) / 
                    (np.linalg.norm(data) * np.linalg.norm(point)))
    


In [35]:
class ManhattanMetric(IMetric):

    def get_distances(self, data : pd.DataFrame, point : pd.Series) -> np.ndarray:
        return np.sum(np.abs(data - point), axis=1)
    
    def get_distance(self, data : pd.Series, point : pd.Series) -> float:
        return np.sum(np.abs(data - point))

In [36]:
class EuclideanMetric(IMetric):

    def get_distances(self, data : pd.DataFrame, point : pd.Series) -> np.ndarray:
        return np.linalg.norm(data - point, axis=1)
    
    def get_distance(self, data : pd.Series, point : pd.Series) -> float:
        return np.linalg.norm(data - point)

In [37]:
cityblock_metric : IMetric = ManhattanMetric()
cosine_metric : IMetric = CosineMetric()
euclidian_metric : IMetric = EuclideanMetric()

data = pd.DataFrame({"price":np.array([1, 2, 3]),
   "count": np.array([10, 12, 7])})

point = np.array([1, 10])

print(data)
print(point)
#manhattan_metric.get_distances(data, point)
print(cosine_metric.get_distances(data, point))
print(cityblock_metric.get_distances(data, point))
print(euclidian_metric.get_distances(data, point))

print(cosine_metric.get_distance(data.iloc[1], point))
print(cityblock_metric.get_distance(data.iloc[1], point))
print(euclidian_metric.get_distance(data.iloc[1], point))

   price  count
0      1     10
1      2     12
2      3      7
[ 1 10]
0    0.000000
1    0.002143
2    0.046220
dtype: float64
0    0
1    3
2    5
dtype: int64
[0.         2.23606798 3.60555128]
0.0021430509496885852
3
2.23606797749979


In [38]:
class MetricsFactory:

    __exist_metrics : list = ["euclidean", "cityblock", 'cosine']

    @property
    def exist_metrics(self) -> list:
        return self.__exist_metrics
        
    def metrics_exist(self, name : str) -> bool:
        return name in self.__exist_metrics

    def get_metrics(self, name : str) -> IMetric:
        if name == "euclidean":
            return EuclideanMetric()
        elif name == "cityblock":
            return ManhattanMetric()
        elif name == 'cosine':
            return CosineMetric()
            

In [39]:
class IKernel(ABC):
    @abstractmethod
    def kernel_func(self, x : float) -> float:
        raise NotImplementedError()

In [40]:
class RectangularKernel(IKernel):

    def kernel_func(self, r: float) -> float:
        if np.abs(r) <= 1:
            return 0.5
        else:
            return 0

In [41]:
class GaussianKernel(IKernel):

    def kernel_func(self, r: float) -> float:
        return 1/np.sqrt(2 * np.pi) * np.exp(-2 * np.power(r, 2))

In [42]:
class DefaulKernel(IKernel):

    def kernel_func(self, x: float) -> float:
        return x

In [43]:
class KernelFactory():
    
    __exist_kernel : list = ["rectangular", "gaussian"]

    @property
    def exist_kernel(self) -> list:
        return self.__exist_kernel
    
    def kernel_exist(self, name : str) -> bool:
        return name in self.__exist_kernel

    def get_kernel(self, name_kernel : str) -> IKernel:
        if name_kernel == "rectangular":
            return RectangularKernel()
        elif name_kernel == "gaussian":
            return GaussianKernel()


In [44]:
class IMethodOfGetNeighbours(ABC):
    
    @abstractmethod
    def __init__(self, metric : str) -> None:
        raise NotImplementedError()

    @abstractmethod
    def preprocessing(self, data : pd.DataFrame) -> None:
        raise NotImplementedError()
        
    @abstractmethod
    def get_neighbours(self, data : pd.DataFrame, point : pd.Series, knn : int,
                        width : float = -1) -> tuple[np.ndarray, np.ndarray]:
        raise NotImplementedError()

In [45]:
class KDTreeGetterNeighbours(IMethodOfGetNeighbours):
    
    __kdtree : KDTree
    __metric : str
    def __init__(self, metric : str) -> None:
        self.__metric = metric

    def preprocessing(self, data: pd.DataFrame) -> None:
        self.__kdtree = KDTree(data, metric=self.__metric.get_distance)

    def get_neighbours(self, point: pd.Series, knn : int = -1,
                        width : float = -1) -> tuple[np.ndarray, np.ndarray]:

        if width == -1 and knn != -1:
            distance, nearest_neighbor_index = self.__kdtree.query(point, k=knn, metric=self.__metric)
            return nearest_neighbor_index, distance
        elif width != -1 and knn == -1:
            # Define a condition
            nearest_neighbor_index = self.__kdtree.query_ball_point(point, width, metric=self.__metric)
            return nearest_neighbor_index, self.__kdtree[nearest_neighbor_index]




In [46]:
class ExhaustiveSearchGetterNeighbours(IMethodOfGetNeighbours):

    __data : pd.DataFrame
    __metric : str
    def __init__(self, metric : str) -> None:
        self.__metric = metric

    def preprocessing(self, data: pd.DataFrame) -> None:
        self.__data = data

    def get_neighbours(self, point: pd.Series, knn : int = -1,
                        width : float = -1) -> tuple[np.ndarray, np.ndarray]:

        distances = self.__metric.get_distances(self.__data, point)
        if width == -1 and knn != -1: 
            nearest_neighbor_index = np.argpartition(distances, knn, axis=None)[:knn] 
            return nearest_neighbor_index, distances
        elif width != -1 and knn == -1:
            index_elem_less_width = np.where(distances < width)[0]
            return index_elem_less_width, np.take(distances, index_elem_less_width)


In [47]:
class MethodOfGetNeighboursFactory:

    __exist_methods : list = ["kdtree", "exhaustive"]

    @property
    def exist_metrics(self) -> list:
        return self.__exist_methods
    
    def method_exist(self, name : str) -> bool:
        return name in self.__exist_methods

    def get_method(self, name_method : str,
                    metric : IMetric = EuclideanMetric()) -> IMethodOfGetNeighbours:
        
        if name_method == "kdtree":
            return KDTreeGetterNeighbours(metric)
        elif name_method == "exhaustive":
            return ExhaustiveSearchGetterNeighbours(metric)


In [48]:
class IMetricMethod(ABC):

    __method : IMethodOfGetNeighbours
    __methods_factory : MethodOfGetNeighboursFactory

    __metric : IMetric
    __metric_factory : MetricsFactory

    def __init__(self, metric : str, method : str) -> None:
        
        self.__metric_factory = MetricsFactory()
        self.__metric = self.__metric_factory.get_metrics(metric)

        self.__methods_factory = MethodsFactory()
        self.__method = self.__methods_factory.get_method(name_method=method, metric=self.__metric)

    def fit(self, data : pd.Series) -> None:
        pass

    @abstractmethod
    def __Get_Neighbor(self, train_Y : pd.Series, data_point : pd.Series) -> any:
        raise NotImplementedError()

    @abstractmethod
    def predict(self, X_test : pd.DataFrame, Y_train : pd.Series) -> np.ndarray:
        raise NotImplementedError()

In [49]:
class OneNN(IMetricMethod):

    def __init__(self, metric : str = "euclidean", method : str = "exhaustive") -> None:
        super().__init__(self, metric, method)


    def __Get_Neighbor(self, train_Y : pd.Series, data_point : pd.Series) -> any:
        ''' Эта функция фозвращает класс соседа объекта data_point, который встречается чаще всего'''

        nearest_index, distances = self.__method.get_neighbours(point=data_point, knn=1)
        
        c_neighbor = np.take(train_Y, nearest_index)
        unique, counts = np.unique(c_neighbor, return_counts=True)

        return unique[np.argmax(counts)]  
    
    def fit(self, data : pd.Series) -> None:
        self.__method.preprocessing(data)

    def predict(self, X_test : pd.DataFrame, Y_train : pd.Series) -> np.ndarray:
        
        predict = []
        for row in np.array(X_test):
            predict.append(self.__Get_Neighbor(Y_train, row, 1))

        return np.array(predict) 

In [50]:
class KNN(IMetricMethod):

    def __init__(self, metric : str = "euclidean", method : str = "exhaustive") -> None:
        super().__init__(self, metric, method)

    def __Get_Neighbor(self, train_Y : pd.Series, data_point : pd.Series, countNeighbor : int) -> any:
        ''' Эта функция фозвращает класс соседа объекта data_point, который встречается чаще всего'''

        nearest_index, distances = self.__method.get_neighbours(point=data_point, knn=countNeighbor)
       
        c_neighbor = np.take(train_Y, nearest_index)
        unique, counts = np.unique(c_neighbor, return_counts=True)

        return unique[np.argmax(counts)]  
    
    def fit(self, data : pd.Series) -> None:
        self.__method.preprocessing(data)

    def predict(self, X_test : pd.DataFrame,
                Y_train : pd.Series, count_neigbors : int = 10) -> np.ndarray:
        
        predict = []
        for row in np.array(X_test):
            predict.append(self.__Get_Neighbor(Y_train, row, count_neigbors))

        return np.array(predict) 

In [51]:
class ParzenWindowFixedWidth(IMetricMethod):

    __kernel : IKernel
    __kernel_factory = KernelFactory()

    def __init__(self, metric : str = "euclidean", method : str = "exhaustive",
                 kernel : str = "default") -> None:
        super().__init__(metric, method)
        self.__kernel = self.__kernel_factory.get_kernel(name_kernel=kernel)

    def __Get_Neighbor(self, train_Y : pd.Series,
                        data_point : pd.Series, width : float) -> any:
        ''' Эта функция фозвращает класс соседа объекта data_point, который встречается чаще всего'''

        nearest_index, distances = self.__method.get_neighbours(point=data_point, width=width)

        nearest = {cl : 0 for cl in np.unique(train_Y)}
        for i in nearest_index:
            nearest[train_Y.iloc[i]] += self.__kernel.kernel_func(distances[i]/width)

        return max(nearest, key=nearest.get)
    
    def fit(self, data : pd.Series) -> None:
        self.__method.preprocessing(data)

    def predict(self, X_test : pd.DataFrame,
                Y_train : pd.Series, width : float = 10) -> np.ndarray:
        
        predict = []
        for row in np.array(X_test):
            predict.append(self.__Get_Neighbor(Y_train, row, width))

        return np.array(predict) 

In [52]:
class ParzenWindowVariableWidth (IMetricMethod):

    __kernel : IKernel
    __kernel_factory = KernelFactory()

    def __init__(self, metric : str = "euclidean", method : str = "exhaustive",
                 kernel : str = "default") -> None:
                 
        super().__init__(metric, method)
        self.__kernel = self.__kernel_factory.get_kernel(name_kernel=kernel)

    def __Get_Neighbor(self, train_Y : pd.Series,
                        data_point : pd.Series, k : int) -> any:
        ''' Эта функция фозвращает класс соседа объекта data_point, который встречается чаще всего'''

        nearest_index, distances = self.__method.get_neighbours(point=data_point, knn=k+1)
        width = distances[k]

        nearest = {cl : 0 for cl in np.unique(train_Y)}
        for i in nearest_index[:k+1]:
            nearest[train_Y.iloc[i]] += self.__kernel.kernel_func(distances[i]/width)

        return max(nearest, key=nearest.get)
    
    def fit(self, data : pd.Series) -> None:
        self.__method.preprocessing(data)

    def predict(self, X_test : pd.DataFrame,
                Y_train : pd.Series, k : int = 10) -> np.ndarray:
        
        predict = []
        for row in np.array(X_test):
            predict.append(self.__Get_Neighbor(Y_train, row, k))

        return np.array(predict) 

In [53]:
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target,
                                                       test_size=0.2, random_state=42)
y_train

array([0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0, 0, 1, 2, 2, 1, 2, 1, 2,
       1, 0, 2, 1, 0, 0, 0, 1, 2, 0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 0, 2, 2,
       1, 1, 2, 1, 0, 1, 2, 0, 0, 1, 1, 0, 2, 0, 0, 1, 1, 2, 1, 2, 2, 1,
       0, 0, 2, 2, 0, 0, 0, 1, 2, 0, 2, 2, 0, 1, 1, 2, 1, 2, 0, 2, 1, 2,
       1, 1, 1, 0, 1, 1, 0, 1, 2, 2, 0, 1, 2, 2, 0, 2, 0, 1, 2, 2, 1, 2,
       1, 1, 2, 2, 0, 1, 2, 0, 1, 2])