In [50]:
import numpy as np
from collections import Counter
from sklearn.metrics import accuracy_score
from sklearn.model_selection import cross_val_score, KFold, train_test_split

KD树构建

In [51]:
#kd树节点类
class kd_node:
    """
    定义kd树节点类
    self: 节点类，用于表示树中的每个点
    point: 节点对应的点
    label: 点对应的标签
    left: 左子树
    right: 右子树
    split_dim: 表示该节点分割数据的维度; 指示了节点分割数据的维度, 从0开始计算
    """
    def __init__(self, point=None, label=None, left=None, right=None, split_dim=0, depth=0):
        self.point = point
        self.label = label
        self.left = left
        self.right = right
        self.split_dim = split_dim
        self.depth = depth

#递归构造kd树，返回根节点     
def tree(data, label, depth = 0):
    if len(data) == 0:                                                              #检查，用于避免对空数据结构执行排序操作
        return None

    Combined = list(zip(data, label))                                               #将数据点和标签配对
    window_size = data.shape[0]                                                     #每个维度拥有的数据集的数量
    channel = data.shape[1]                                                         #k是数据的维度数
    split_dim = depth % channel                                                     #split_dim通过depth和k取模，一直在0到k-1之间循环
    
    sorted_data = sorted(Combined, key = lambda x:x[0][split_dim])                  #按照数据点的维度进行排序，x[0]是数据点，x[1]是标签
    medianNumber = window_size // 2                                                 #确定数据的中心点
    medianData, medianLabel = sorted_data[medianNumber]

    #建立当前节点，并递归构建左右子树
    node = kd_node(point=medianData, label=medianLabel,depth=depth)                 #节点为数据中心点
    left = np.array([item[0] for item in sorted_data[:medianNumber]])
    leftLabel = np.array([item[1] for item in sorted_data[:medianNumber]])
    right = np.array([item[0] for item in sorted_data[medianNumber+1:]])
    rightLabel = [item[1] for item in sorted_data[medianNumber+1:]]
    
    node.left=tree(left, leftLabel,depth=depth+1)                #数据中心点向左包括中心点归于左子树；depth加一进行下一个分类
    node.right=tree(right, rightLabel ,depth=depth+1)             #数据中心点向右的点归于右子树；depth加一进行下一个分类
    #返回当前节点
    return node                                                                     


In [52]:
#距离计算函数，欧氏距离；当前节点到目标点的距离
def Calculate_d(point1, point2):
    distance = np.sqrt(np.sum((point1-point2)**2))
    return distance

KD树搜索

In [53]:
#创建kd树类
class kdtree:
    def __init__(self,dataset,label):
        self.root = tree(data=dataset, label=label)

    #kd树搜索
    def searchKDtree(self, point, k = 20):
        """
        1.  从根节点出发进行查找，根据当前深度计算比较的特征维度
            若目标节点的特征值小于当前节点的特征值，则遍历左子树，否则遍历右子树
        2.  找到叶子节点后，将其暂时标记为当前最邻近的点
        3.  递归地向上回退，在回退时需要：
                如果当前节点与目标结点的距离更近，则更新最邻近节点为当前节点
                如果当前节点对应特征与目标节点对应特征的值距离小于当前最小值时，
                    进入当前节点的另一个子节点进行查找，有可能存在更近的节点，
                    否则的话继续向上回退
        4.  回退到根节点结束，得到最邻近的点

        1.point: 需要进行判断的数
        2.k    : 附近的点
        """
        #检查kd树是否存在，否则不执行
        if self is None:                              
            return None
        
        #用于记录搜索路径上的节点信息，以便在需要时回溯到之前的节点
        stack = []
        #储存离目标点最近的几个点/标签以及距离
        nearest_nodes = []   
        correspond_label = []               
        nearest_distances = []

        #在kd树中搜索，并确定point会落在哪个叶子节点上
        data_node = self.root                                               #储存根节点；kd树结构为根节点下左右子节点树，下一个循环后目标点更新成左节点或右节点，一直循环，直到目标点为none
        data_point = data_node.point                                        #从当前节点中提取数据点
        label = data_node.label                                             #从当前节点中提取标签
        while data_node:                                                    #当前节点根据循环不停向下递归查找最邻近的点，直到提取数据点为None结束循环
            stack.append(data_node)                                         #存储数据点和标签，将当前节点信息存储于堆栈中
            distance = Calculate_d(data_point,point)                        #测量当前目标节点和测试点之间的距离
            if len(nearest_nodes) < k:                                      #判断附近点的数量是否达到目标值
                nearest_nodes.append(data_point)
                correspond_label.append(label)
                nearest_distances.append(distance)
            else:                                       
                max_distance = max(nearest_distances)                       #提取存储距离中的最大值                        
                if distance < max_distance:                                 #两者做对比，如果新距离更小，用新数据替换旧数据
                    max_index = nearest_distances.index(max_distance)
                    nearest_distances[max_index] = distance
                    nearest_nodes[max_index] = data_point
                    correspond_label[max_index] = label
            #得到当前节点的维度
            axis = data_node.split_dim
            if point[axis] < data_point[axis]:                              #比较当前维度下目标点和数据点的大小，判断下一个是左子树还是右子树
                data_node = data_node.left
            else:
                data_node = data_node.right

        #向上回溯查找是否有更近的点
        while stack:
            current_node = stack.pop()
            data_point = current_node.point                                     #从当前节点中提取数据点
            label = current_node.label                                          #从当前节点中提取标签
            #current_distance = Calculate_d(data_point,point)                   #计算当前节点和目标点的距离
            max_distance = max(nearest_distances)                               #提取存储距离中的最大值
            axis = current_node.split_dim                                       #得到当前节点的维度
            #判断邻近点是否达到需要的最近节点数，判断是否需要转到另一区域搜索
            if len(nearest_nodes) < k or abs(point[axis] - data_point[axis]) < max_distance:
                #搜索与之前相反的区域
                if point[axis] < data_point[axis]:                              #比较当前维度下目标点和数据点的大小，判断下一个是左子树还是右子树
                    current_node = current_node.right
                else:
                    current_node = current_node.left
                if current_node is None:
                    continue
                #重复之前的搜索

                data_point = current_node.point                                        #从当前节点中提取数据点
                label = current_node.label                                             #从当前节点中提取标签
                if current_node:                                                       #当前节点根据循环不停向下递归查找最邻近的点，直到提取数据点为None结束循环
                    stack.append(current_node)                                         #存储数据点和标签，将当前节点信息存储于堆栈中
                    distance = Calculate_d(data_point,point)                        #测量当前目标节点和测试点之间的距离
                    if len(nearest_nodes) < k:                                      #判断附近点的数量是否达到目标值
                        nearest_nodes.append(data_point)
                        correspond_label.append(label)
                        nearest_distances.append(distance)
                    else:                                       
                        max_distance = max(nearest_distances)                       #提取存储距离中的最大值                        
                        if distance < max_distance:                                 #两者做对比，如果新距离更小，用新数据替换旧数据
                            max_index = nearest_distances.index(max_distance)
                            nearest_distances[max_index] = distance
                            nearest_nodes[max_index] = data_point
                            correspond_label[max_index] = label

        return nearest_nodes, nearest_distances, correspond_label

定义KNN分类函数，和找到合适k值的函数

In [60]:
# KNN分类函数
def knn_classify(kd_tree, point, k):
    #使用查找函数得到k个最邻近的点和对应标签
    nearest_nodes, nearest_distances, nearest_labels = kd_tree.searchKDtree(point, k)
    # 将标签转换为元组并计数
    label_counts = Counter(tuple(label) for label in nearest_labels)

    # 找到最常见的标签
    most_common_label = max(label_counts, key=label_counts.get)
    # 计算每个邻居的权重，这里使用距离的倒数作为权重
    #nearest_distances = np.array(nearest_distances).reshape(-1)
    #nearest_labels = np.array(nearest_labels).reshape(-1)
    #weights = 1 / (nearest_distances + 1e-10)  # 添加一个很小的数值，避免除以零
    # 对标签进行加权求和
    #weighted_sum = np.dot(nearest_labels, weights)
    # 对权重求和，得到归一化系数
    #normalization_factor = np.sum(weights)
    # 计算加权平均后的预测标签
    #predicted_label = round(weighted_sum / normalization_factor)
    predicted_label = most_common_label
    return predicted_label

def kfold(X_train, y_train):
    # 选择不同的k值进行交叉验证
    best_k = None
    best_score = 0
     
    #使用 KFold 进行交叉验证
    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    for k in range(1, 100):
        #初始化准确度总和
        accuracy_sum = 0
       
       #对训练集进行交叉验证
        for train_index, val_index in kf.split(X_train):
            X_train_fold, X_val_fold = X_train[train_index], X_train[val_index]
            y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]
           
           #构建KD树
            kd_tree = kdtree(X_train_fold, y_train_fold)
           
           #在验证集上评估模型
            y_pred = [knn_classify(kd_tree, point, k) for point in X_val_fold]
            accuracy = np.mean(y_pred == y_val_fold)
            accuracy_sum += accuracy
       
        #计算平均准确度
        avg_accuracy = accuracy_sum / kf.get_n_splits(X_train)
       #更新最佳 k 值和最高准确度
        if avg_accuracy > best_score:
            best_score = avg_accuracy
            best_k = k
    return best_k, best_score


得到最适合的k值，并进行模型评估

In [55]:
import h5py
from sklearn.preprocessing import MinMaxScaler
from sklearn import metrics
from sklearn.model_selection import cross_val_score
import time 
start_time = time.time()


file = h5py.File('DB2_S1_feature_200_0.h5','r')
featureData   = file['featureData'][:]
featureLabel  = file['featureLabel'][:]
file.close()

featureData = MinMaxScaler().fit_transform(featureData) # 缩放到[0, 1]
train_x, test_x, train_y, test_y = train_test_split(featureData, featureLabel, test_size=0.2)
print(train_y.shape)



(9531, 1)


In [61]:
best_k, best_score = kfold(train_x, train_y)

#用best_k进行一次模型评估
#将数据和标签分训练集和测试集
#Data_train, Data_test, Label_train, Label_test = train_test_split(data, label, test_size=0.2, random_state=42)

#构建KD树
kd_tree = kdtree(train_x, train_y)
#对测试集进行批量预测
Label_pred = [knn_classify(kd_tree, point, best_k) for point in test_x]


In [66]:

test_y = np.array(test_y)
Label_pred_array = np.array(Label_pred)


#评估模型性能
accuracy = accuracy_score(test_y, Label_pred)
print(f"Accuracy: {accuracy}")


Accuracy: 0.15190935795216115


sklearn库中的knn对比精度

In [62]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [63]:

knn_model = KNeighborsClassifier(n_neighbors=7)
knn_model.fit(train_x, train_y)
y_pred = knn_model.predict(test_x)
accuracy = accuracy_score(test_y, y_pred)

print(f"Accuracy: {accuracy}")
    

Accuracy: 0.6017624842635334


  return self._fit(X, y)


In [None]:
print(f"Accuracy: {accuracy}")

Accuracy: 0.6722618548048678
