In [87]:
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import numpy as np

# 获取MNIST数据集,并抽样一部分数据以便后续的计算
idx = np.random.choice(70000,5000,replace=False)
mnist = fetch_openml("mnist_784")
X, y = mnist.data.to_numpy(), mnist.target.to_numpy().astype('int')
X = X[idx]
y = y[idx]

# 划分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


  warn(


In [88]:
print("训练集大小：",X_train.shape)
print("测试集大小：",X_test.shape)

训练集大小： (4000, 784)
测试集大小： (1000, 784)


# 基于sklearn的kd树

In [89]:
# 创建KNeighborsClassifier模型，使用kd树作为搜索算法
knn = KNeighborsClassifier(n_neighbors=3, algorithm='kd_tree')

# 在训练集上训练模型
knn.fit(X_train, y_train)

# 在测试集上进行预测
y_pred = knn.predict(X_test)

# 评估模型性能
accuracy = accuracy_score(y_test, y_pred)
print(f"KNN Accuracy: {accuracy * 100:.2f}%")

KNN Accuracy: 92.80%


# 自定义kd树

In [90]:
from tqdm import tqdm

# 定义KD树节点类
class Node:
    def __init__(self, data, left=None, right=None):
        self.data = data #节点本身的数据
        self.left = left #节点的左子树
        self.right = right #节点右子树

# 递归方法构建KD树

def build_kd_tree(X, depth=0):
    if len(X) == 0:
        return None
    k = X.shape[1]
    axis = depth % k #根据当前深度，选择划分的维度
    X = X[X[:, axis].argsort()]
    median = X.shape[0] // 2 #将当前结点数据一分为二
    return Node(data=X[median], left=build_kd_tree(X[:median], depth + 1), right=build_kd_tree(X[median + 1:], depth + 1))

# 计算点之间的距离，这里使用欧几里得距离
def euclidean_distance(x1, x2):
    return np.sqrt(np.sum((x1 - x2) ** 2))

# 搜索KD树
def search_kd_tree(tree, target, k=3):
    if tree is None:
        return []
    k_nearest = [] # list用于储存target当前遍历到的k个k近邻
    stack = [(tree, 0)] # 用于储存待遍历节点的stack
    while stack:
        node, depth = stack.pop() # 节点出栈
        if node is None:
            continue
        distance = euclidean_distance(target, node.data) # 计算需要分类的目标点与节点的距离
        if len(k_nearest) < k: # 当k_nearest未装满时，直接将节点放入
            k_nearest.append((node.data, distance))
        else: # 当k_nearest装满时，对比该节点与k_nearest中与目标点距离最远的节点的距离，如果小于则替换，如果大于则不替换
            max_index = max(range(k), key=lambda i: k_nearest[i][1])
            if k_nearest[max_index][1] > distance:
                k_nearest[max_index] = (node.data, distance)
        axis = depth % target.shape[0] # 计算当前深度对应的划分维度
        axis_diff = target[axis] - node.data[axis] # 计算该维度下目标点与当前节点的差
        if axis_diff <= 0: 
            # 当差小于0时，则该节点的左子树入栈 
            # 如果k_nearest未装满或k_nearest中相距目标点最远的点与目标点的距离大于axis_diff的绝对值时，则右子树也入栈
            stack.append((node.left, depth + 1))
            if len(k_nearest) < k:
                stack.append((node.right, depth + 1))
            else:
                max_index = max(range(k), key=lambda i: k_nearest[i][1])
                if k_nearest[max_index][1] > abs(axis_diff):
                    stack.append((node.right, depth + 1))
        else:
            # 当差大于0时，则该节点的右子树入栈
            # 如果k_nearest未装满或k_nearest中相距目标点最远的点与目标点的距离大于axis_diff的绝对值时，则左子树也入栈
            stack.append((node.right, depth + 1))
            if len(k_nearest) < k:
                stack.append((node.left, depth + 1))
            else:
                max_index = max(range(k), key=lambda i: k_nearest[i][1])
                if k_nearest[max_index][1] > abs(axis_diff):
                    stack.append((node.left, depth + 1))
    return [data for data, _ in k_nearest] # 返回遍历完的kd树后的k_nearest


# 使用KNN算法分类
def knn_classifier(X_train, y_train, X_test, k=3):
    y_pred = []
    for test_point in tqdm(X_test):
        k_nearest = search_kd_tree(kd_tree, test_point, k)
        labels = [y_train[np.where((X_train == point).all(axis=1))[0][0]] for point in k_nearest]
        counts = np.bincount(labels)#计算k_nearest中样本最多的标签，预测目标样本为该标签
        y_pred.append(np.argmax(counts))
    return y_pred

# 构建KD树
kd_tree = build_kd_tree(X_train)

# 使用KNN算法进行分类
k_neighbors = 3
print("KNN分类中...")
y_pred = knn_classifier(X_train, y_train, X_test, k_neighbors)

# 评估分类性能
accuracy = accuracy_score(y_test, y_pred)
print(f"KNN Accuracy: {accuracy * 100:.2f}%")

KNN分类中...


100%|██████████| 1000/1000 [00:22<00:00, 45.43it/s]

KNN Accuracy: 92.80%





In [91]:
# # 超参数搜索，搜索k从1到10，输出使得模型性能最好的k

best_acc = -np.inf

for k in range(1, 11):
    print(f"Searching for k = {k}")
    y_pred_k = knn_classifier(X_train, y_train, X_test, k)
    accuracy = accuracy_score(y_test, y_pred_k)
    print(f"KNN Accuracy when k = {k}: {accuracy * 100:.2f}%")

    if accuracy > best_acc:
        best_acc = accuracy
        best_k = k
    
print(f"The best k is {best_k} with accuracy {best_acc * 100:.2f}%")
    


Searching for k = 1


100%|██████████| 1000/1000 [00:17<00:00, 55.92it/s]


KNN Accuracy when k = 1: 91.70%
Searching for k = 2


100%|██████████| 1000/1000 [00:20<00:00, 49.73it/s]


KNN Accuracy when k = 2: 91.30%
Searching for k = 3


100%|██████████| 1000/1000 [00:21<00:00, 45.97it/s]


KNN Accuracy when k = 3: 92.80%
Searching for k = 4


100%|██████████| 1000/1000 [00:23<00:00, 42.70it/s]


KNN Accuracy when k = 4: 91.90%
Searching for k = 5


100%|██████████| 1000/1000 [00:25<00:00, 39.90it/s]


KNN Accuracy when k = 5: 92.60%
Searching for k = 6


100%|██████████| 1000/1000 [00:26<00:00, 37.21it/s]


KNN Accuracy when k = 6: 92.20%
Searching for k = 7


100%|██████████| 1000/1000 [00:28<00:00, 34.95it/s]


KNN Accuracy when k = 7: 92.40%
Searching for k = 8


100%|██████████| 1000/1000 [00:30<00:00, 33.20it/s]


KNN Accuracy when k = 8: 91.70%
Searching for k = 9


100%|██████████| 1000/1000 [00:31<00:00, 31.41it/s]


KNN Accuracy when k = 9: 91.90%
Searching for k = 10


100%|██████████| 1000/1000 [00:33<00:00, 29.72it/s]

KNN Accuracy when k = 10: 91.30%
The best k is 3 with accuracy 92.80%





In [92]:
from heapq import heappush, heappop

# 搜索KD的最大堆实现
def search_kd_tree_heap(tree, target, k=3):
    best_nodes = []  # 使用最大堆来存储最近邻
    def visit_node(node, target, depth):
        if node is None:
            return
        node_distance = euclidean_distance(target, node.data)

        # 使用负距离以实现最大堆
        if len(best_nodes) < k:
            heappush(best_nodes, (-node_distance, tuple(node.data)))
        elif -node_distance > best_nodes[0][0]:
            heappop(best_nodes)
            heappush(best_nodes, (-node_distance, tuple(node.data)))
        
        axis = depth % len(target)
        next_node = node.left if target[axis] < node.data[axis] else node.right
        other_node = node.right if next_node == node.left else node.left
        
        visit_node(next_node, target, depth + 1)

        # 检查另一侧的子树是否有可能包含更近的点
        if (len(best_nodes) < k or abs(target[axis] - node.data[axis]) < -best_nodes[0][0]):
            visit_node(other_node, target, depth + 1)

    visit_node(tree, target, 0)

    return [data for _, data in best_nodes]

# 使用KNN算法分类(堆实现)
def knn_classifier_heap(X_train, y_train, X_test, k=3):
    y_pred = []
    for test_point in tqdm(X_test):
        k_nearest = search_kd_tree_heap(kd_tree, test_point, k)
        labels = [y_train[np.where((X_train == point).all(axis=1))[0][0]] for point in k_nearest]
        counts = np.bincount(labels)#计算k_nearest中样本最多的标签，预测目标样本为该标签
        y_pred.append(np.argmax(counts))
    return y_pred


print("KNN(堆实现)分类中...")
k_neighbors = 2
y_pred_heap = knn_classifier_heap(X_train, y_train, X_test, k_neighbors)

accuracy_heap = accuracy_score(y_test, y_pred_heap)
print(f"KNN Accuracy (堆实现): {accuracy * 100:.2f}%")

KNN(堆实现)分类中...


100%|██████████| 1000/1000 [00:17<00:00, 56.58it/s]

KNN Accuracy (堆实现): 91.30%



