In [14]:
import numpy as np
from tests import generate_regression_data, test_regression_model, test_knn_model

### 1.1 Необходимо реализовать класс для решения задачи восстановления регрессии методом ближайших соседей

Для класса `KnnKdtreeRegressor` обучение заключается в построении k-d дерева по обучающей выборке. K-d дерево является бинарным деревом, которое соответствует разделению пространства, где находятся элементы обучающей выборки, гиперплоскостями, ортогональными осям координат. Каждой вершине k-d дерева соответствует гиперпрямоугольник в этом пространстве. Корневой вершине соответствует гиперпрямоугольник, содержащий все точки обучающей выборки.

Построение k-d дерева реализовано на основе алгоритма ID3 для построения дерева принятия решений. Ось, по которой будет проводиться разбиение в каждой вершине выбирается случайным образом.

In [15]:
class KnnKdtreeRegressor(object):
    '''Регрессор реализует взвешенное усреднение по ближайшим соседям.
    При подсчете расстояния используется l1-метрика.
    Поиск ближайшего соседа осуществляется поиском по kd-дереву.
    Параметры
    ----------
    n_neighbors : int, optional
        Число ближайших соседей, учитывающихся в усреднении
    weights : str, optional (default = 'uniform')
        веса, используемые в голосовании. Возможные значения:
        - 'uniform' : все веса равны.
        - 'distance' : веса обратно пропорциональны расстоянию до классифицируемого объекта
        -  функция, которая получает на вход массив расстояний и возвращает массив весов
    leaf_size: int, optional
        Максимально допустимый размер листа дерева
    '''
    def __init__(self, n_neighbors=1, weights='uniform', leaf_size=30):
        self.n_neighbors = n_neighbors
        self.weights = weights
        self.leaf_size = leaf_size

    def fit(self, x, y):
        '''Обучение модели - построение kd-дерева
        Параметры
        ----------
        x : двумерный массив признаков размера n_queries x n_features
        y : массив/список правильных меток размера n_queries
        Выход
        -------
        Метод возвращает обученную модель
        '''
        def build_kdtree(indices, parent=None):
            if len(indices) <= self.leaf_size:
                return Leaf(parent, indices)
            axis = np.random.randint(x.shape[1])  # splitting axis
            median = np.median(self.x[indices, axis])
            left_indices = indices[np.argwhere(x[indices, axis] < median).flatten()]
            right_indices = indices[np.argwhere(x[indices, axis] >= median).flatten()]
            if len(left_indices) == 0 or len(right_indices) == 0:
                return Leaf(parent, indices)
            node = Node(parent, axis, median)
            node.left = build_kdtree(left_indices, parent=node)
            node.right = build_kdtree(right_indices, parent=node)
            return node

        self.x = x
        self.y = y
        self.tree = build_kdtree(indices=np.arange(x.shape[0]))
        return self

    def predict(self, x):
        """ Восстановление регрессии для входных объектов - поиск по метрическому дереву ближайших соседей
        и взвешенное усреднение
        Параметры
        ----------
        X : двумерный массив признаков размера n_queries x n_features
        Выход
        -------
        y : Массив размера n_queries
        """
        def get_weights(distances, weights):
            if weights == 'uniform':
                return None
            elif weights == 'distance':
                for i, dist in enumerate(distances):
                    if 0. in dist:
                        distances[i] = dist == 0.
                    else:
                        distances[i] = 1. / dist
                return distances
            elif callable(weights):
                return weights(distances)

        neigh_dist, neigh_indarray = self.kneighbors(x, self.n_neighbors)
        weights = get_weights(neigh_dist, self.weights)
        if weights is None:
            y = np.mean(self.y[neigh_indarray], axis=1)
        else:
            y = np.sum(self.y[neigh_indarray] * weights, axis=1) / np.sum(weights, axis=1)  # weighted arithmetic mean
        return y

    def kneighbors(self, x, n_neighbors):
        """Возвращает n_neighbors ближайших соседей для всех входных объектов при помощи поска по kd-дереву
        и расстояния до них
        Параметры
        ----------
        X : двумерный массив признаков размера n_queries x n_features
        Выход
        -------
        neigh_dist массив размера n_queries х n_neighbors
        расстояния до ближайших элементов
        neigh_indarray, массив размера n_queries x n_neighbors
        индексы ближайших элементов
        """
        def distance(p, q):
            return np.sum(abs(p - q))  # l1 distance, also known as taxicab metric

        def get_leaf(point):
            node = self.tree
            while isinstance(node, Node):
                node = node.left if point[node.axis] < node.median else node.right
            return node

        def get_leaf_neighbors(point, leaf):
            distances = np.array(list(map(distance, self.x[leaf.indices], np.ones((leaf.size, len(point))) * point)))
            sort_by_distances = distances.argsort()
            distances, indices = distances[sort_by_distances], leaf.indices[sort_by_distances]
            if len(distances) > n_neighbors:
                distances, indices = distances[:n_neighbors], indices[:n_neighbors]
            return distances, indices

        neigh_dist, neigh_indarray = np.empty((x.shape[0], n_neighbors)), np.empty((x.shape[0], n_neighbors), dtype=int)
        distances, indices, furthest_distance = None, None, np.inf

        def update_neighbors(point, node):
            nonlocal distances, indices, furthest_distance
            if isinstance(node, Leaf):
                new_distances, new_indices = get_leaf_neighbors(point, node)
                distances, indices = np.append(distances, new_distances), np.append(indices, new_indices)
                sort_by_distances = distances.argsort()
                distances, indices = distances[sort_by_distances], indices[sort_by_distances]
                if len(distances) > n_neighbors:
                    distances, indices = distances[:n_neighbors], indices[:n_neighbors]
                if len(distances) == n_neighbors:
                    furthest_distance = distances[-1]
            elif distance(point[node.axis], node.median) < furthest_distance:
                update_neighbors(point, node.left)
                update_neighbors(point, node.right)
            else:
                next_branch = node.left if point[node.axis] < node.median else node.right
                update_neighbors(point, next_branch)


        for i, point in enumerate(x):
            leaf = get_leaf(point)
            distances, indices = get_leaf_neighbors(point, leaf)
            if len(distances) == n_neighbors:
                furthest_distance = distances[-1]
            node = leaf.parent
            while node is not None:
                if distance(point[node.axis], node.median) < furthest_distance:
                    opposite_branch = node.left if point[node.axis] >= node.median else node.right
                    update_neighbors(point, opposite_branch)
                node = node.parent
            neigh_dist[i], neigh_indarray[i] = distances, indices
            furthest_distance = np.inf
        return neigh_dist, neigh_indarray

    def score(self, y_gt, y_pred):
        """Возвращает среднеквадратичную ошибку восстановления регрессии
        Параметры
        ----------
        y_gt : правильные метки объектов
        y_pred: предсказанные метки объектов
        Выход
        -------
        mse - среднеквадратичная ошибка восстановления регрессии
        """
        y_gt, y_pred = np.array(y_gt), np.array(y_pred)
        return 1 / len(y_gt) * sum((y_gt - y_pred) ** 2)  # mean squared error

Класс для хранения внутренних вершин k-d дерева. Конструктор принимает родителя `parent` вершины, номер `axis` оси пространства, по которой происходит разделение в данной вершине, значение медианы `median` обучающей выборки по оси `axis`, а также левую и правую дочерние вершины `left` и `right`.

In [16]:
class Node:
    def __init__(self, parent, axis, median, left=None, right=None):
        self.parent = parent
        self.axis = axis
        self.median = median
        self.left = left
        self.right = right

Класс для хранения листьев k-d дерева. Конструктор принимает родителя `parent` листа и индексы `indices` элементов обучающей выборки, которые были распределены в данный лист. Также определено свойство `size`, возврашающее количество элементов обучающей выборки, попавших в данный лист.

In [17]:
class Leaf:
    def __init__(self, parent, indices):
        self.parent = parent
        self.indices = indices

    @property
    def size(self):
        return len(self.indices)

### 1.2 Пример использования

In [18]:
np.random.seed(1)
n_features = 10
n_train = 1000
n_test = 1500
W = np.random.rand(n_features + 1)
trainX = np.append(np.random.rand(n_train, n_features), np.ones((n_train, 1)), axis=1)
trainY = trainX @ W
testX = np.append(np.random.rand(n_test, n_features), np.ones((n_test, 1)), axis=1)
testY = testX @ W
n_neighbors = 15
weights = 'distance'
leaf_size = 100
knn = KnnKdtreeRegressor(n_neighbors, weights, leaf_size)
knn.fit(trainX, trainY)
y_pred = knn.predict(testX)
mse = knn.score(testY, y_pred)
print('Your mse is %s' % str(mse))

Your mse is 0.014037991922886342


### 2. Покройте ваш класс тестами

#### Импортированный тест метода ближайших соседей

In [19]:
n_neighbors = 5
weights = 'uniform'
leaf_size = 10
model = KnnKdtreeRegressor(n_neighbors, weights, leaf_size)
test_knn_model(model)

fit_predict test passed
score test passed
[[0]] [[0]]
kneighbors test passed
accuracy test passed
Test passed


#### Тест на корректость работы метода ближайших соседей

Тестирование корректности работы класса `KnnKdtreeRegressor` осуществляется путем сравнения полученных результатов с результатами работы класса `KNeighborsRegressor`, реализованного в библиотеке машинного обучения `sklearn`. В конструктор класса `KNeighborsRegressor` подаются те же параметры, что и в `KnnKdtreeRegressor`, а также параметр `algorithm='kd_tree'`, определяющий алгоритм, используемый для вычисления ближайших соседей и `p=1` ‒ параметр мощности для метрики Минковского, при `p=1`, эта метрика эквивалентна манхэттенской метрике, используемой в `KnnKdtreeRegressor`.

In [20]:
from sklearn.neighbors import KNeighborsRegressor
from sklearn.metrics import mean_squared_error


def create_models(n_neighbors=15, weights='uniform', leaf_size=100):
    model = KnnKdtreeRegressor(n_neighbors, weights, leaf_size)
    sklearn_model = KNeighborsRegressor(n_neighbors=n_neighbors, weights=weights, leaf_size=leaf_size,
                                        algorithm='kd_tree', p=1)
    return model, sklearn_model


def generate_data(n_features=10, n_train=1000, n_test=1500):
    W = np.random.rand(n_features + 1)
    trainX = np.c_[np.random.rand(n_train, n_features), np.ones(n_train)]
    trainY = trainX @ W
    testX = np.c_[np.random.rand(n_test, n_features), np.ones(n_test)]
    testY = testX @ W
    return trainX, trainY, testX, testY


def fit_models(model, sklearn_model, trainX, trainY):
    model.fit(trainX, trainY)
    sklearn_model.fit(trainX, trainY)


epsilon = 1e-10


class Color:
    green = '\033[32m'
    red = '\033[91m'
    end = '\033[0m'


def test_score(model, n=100):
    y_gt, y_pred = np.random.rand(n), np.random.rand(n)
    assert abs(model.score(y_gt, y_pred) - mean_squared_error(y_gt, y_pred)) < epsilon, 'wrong score function'


def test_kneighbors(model, sklearn_model, testX):
    neigh_dist, neigh_indarray = model.kneighbors(testX, model.n_neighbors)
    sklearn_neigh_dist, sklearn_neigh_indarray = sklearn_model.kneighbors(testX, model.n_neighbors)
    assert np.all(neigh_indarray == sklearn_neigh_indarray) and \
           np.all(abs(neigh_dist - sklearn_neigh_dist) < epsilon), 'wrong neighbors'


def test_predict(model, sklearn_model, testX, testY):
    y_pred = model.predict(testX)
    sklearn_y_pred = sklearn_model.predict(testX)
    assert abs(model.score(testY, y_pred) - mean_squared_error(testY, sklearn_y_pred)) < epsilon, \
        'unacceptably low prediction accuracy'


def test(n_neighbors=1, weights='uniform', leaf_size=30, n_features=1, n_train=150, n_test=150, seed=0):
    np.random.seed(seed)
    model, sklearn_model = create_models(n_neighbors, weights, leaf_size)
    trainX, trainY, testX, testY = generate_data(n_features, n_train, n_test)
    fit_models(model, sklearn_model, trainX, trainY)
    parameters = {
        'seed': seed,
        'n_neighbors': n_neighbors,
        'weights': weights,
        'leaf_size': leaf_size,
        'n_features': n_features,
        'n_train': n_train,
        'n_test': n_test
    }
    try:
        test_score(model)
        test_kneighbors(model, sklearn_model, testX)
        test_predict(model, sklearn_model, testX, testY)
    except AssertionError as error:
        print(f'{Color.red}{error}{Color.end}')
        test_passed = False
    else:
        test_passed = True
    return test_passed, parameters


test_passed, parameters = test(n_neighbors=15, weights='distance', leaf_size=50, n_features=10, n_train=500, n_test=500)
if test_passed:
    print(f'{Color.green}test passed{Color.end}')
else:
    for parameter, value in parameters.items():
        print(parameter, value)

[32mtest passed[0m


### 3. Примените ваш класс к самостоятельно подготовленной задаче восстановления регрессии и подберите оптимальные параметры для вашей модели

Выбрана задача восстановления регрессии для данных с параметрами `n_features=10, n_train=100, n_test=100`, сгенерированных случайно при `np.random.seed(0)`. Оптимальные параметры `leaf_size, n_neighbors, weights` для данной задачи подбираются путем перебора всех допустимых значений этих параметров и выбора того набора параметров, при котором значение `mse` среднеквадратичной ошибки восстановления регрессии минимально. Для `leaf_size` и `n_neighbors` допустимы целые положительные значения от `1` до `n_train`. Параметр `weights` выбирается из двух вариантов: `uniform` и `distance`.

Подобранные для данной задачи оптимальные параметры: `leaf_size=1, n_neighbors=5, weights='distance'`.

In [22]:
np.random.seed(0)
n_features = 10
n_train = 100
n_test = 100
W = np.random.rand(n_features + 1)
trainX = np.c_[np.random.rand(n_train, n_features), np.ones(n_train)]
trainY = trainX @ W
testX = np.c_[np.random.rand(n_test, n_features), np.ones(n_test)]
testY = testX @ W


def get_optimal_parameters(weights):
    parameters_mse = list()
    for leaf_size in range(1, n_train + 1):
        for n_neighbors in range(1, n_train + 1):
            knn = KnnKdtreeRegressor(n_neighbors, weights, leaf_size)
            knn.fit(trainX, trainY)
            y_pred = knn.predict(testX)
            mse = knn.score(testY, y_pred)
            parameters_mse.append([leaf_size, n_neighbors, mse])
    parameters_mse = np.array(parameters_mse)
    return parameters_mse[np.argmin(parameters_mse[:, 2])]


parameters_uniform = get_optimal_parameters('uniform')
parameters_distance = get_optimal_parameters('distance')
if parameters_uniform[2] < parameters_distance[2]:
    optimal_parameters = parameters_uniform
    weights = 'uniform'
else:
    optimal_parameters = parameters_distance
    weights = 'distance'


print('optimal leaf_size', int(optimal_parameters[0]))
print('optimal n_neighbors', int(optimal_parameters[1]))
print('optimal weights', weights)
print('minimal mse', optimal_parameters[2])

optimal leaf_size 1
optimal n_neighbors 5
optimal weights distance
minimal mse 0.10439044483706239
