# HNSW

Hierarchical Navigable Small World，分层可导航小世界

是一种基于图结构的近似最近邻（ANN）搜索算法

## 核心思想

先在高层图中快速找到大致方向，再在底层图中精确定位目标。




In [None]:
import numpy as np
import random
import math
import matplotlib.pyplot as plt
import time
from collections import defaultdict

In [3]:
# 强制设置中文字体，解决中文显示问题
plt.rcParams['font.sans-serif'] = ['PingFang SC', 'Microsoft YaHei', 'SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 设置随机种子以保证结果可重现
np.random.seed(41)


In [5]:
class SimpleHNSW:
    '''
    HSNW 索引的简化实现
    '''

    def __init__(self, max_elements=1000, M=10, ef_construction=50, max_layers=6) -> None:
        '''
        初始化HNSW索引

        参数
        - max_elements: 最大元素数量
        - M: 每个节点的最大连接数
        - ef_construction: 构建时的搜索范围
        - max_layers: 最大层数
        '''
        self.max_elements = max_elements
        self.M = M 
        self.ef_confstruction = ef_construction
        self.max_layers = max_layers

        # 存储所有数据点
        self.data_points = []  # point_id -> vector
        # 每层的图结构（领接表）
        # 图是一个dict, key: 节点id; value:是邻居列表
        self.layers = [defaultdict(list) for _ in range(max_layers)]

        # 全局入口点（最高层节点）
        self.entry_point = None
        self.entry_level = -1 # 入口点所在的最高层级

    def _random_level(self):
        '''
        随机生成节点的层级（指数分布）
        '''
        level = 0
        while random.random() < 0.5 and level < self.max_layers - 1:
            level += 1
        return level

    def _euclidean_distance(self, a, b):
        ''' 计算欧氏距离 '''
        return np.sqrt(np.sum((a - b) ** 2))

    def _search_layer(self, query, entry_point, ef, layer):
        '''
        在指定层搜索最近邻
        
        参数:
        - query: 查询向量
        - entry_point: 搜索的起始点
        - ef: 搜索范围（返回的候选点数量）
        - layer: 搜索的层级
        '''
        if entry_point is None or entry_point not in self.layers[layer]:
            return []
        
        visited = set([entry_point])

        # 候选集: 保存（距离，节点ID）元组，从入口开始
        candidates = [(self._euclidean_distance(query, self.data_points[entry_point]), entry_point)]
        # 使用堆来维护候选集 TODO:这里使用列表排序
        results = []

        while candidates and len(results) < ef:
            # 获取距离最近的candidate
            candidates.sort(key=lambda x:x[0])      # 按照距离排序
            current_dist, current_point = candidates.pop(0)

            # 检查是否应该将当前点加入结果
            if not results or current_dist < results[-1][0]:
                results.append((current_dist, current_point))
                results.sort(key=lambda x:x[0])
                if len(results) > ef:
                    results = results[:ef]

            # 探索当前点的所有邻居节点
            for neighbor in self.layers[layer][current_point]:
                if neighbor not in visited:
                    visited.add(neighbor)
                    dist = self._euclidean_distance(query, self.data_points[neighbor])
                    candidates.append((dist, neighbor))
            
        return results

    def add_point(self, point):
        '''
        向HNSW中添加新节点

        参数:
        - point: 要添加的数据点向量
        ''' 
        if len(self.data_points) >= self.max_elements:
            raise ValueError('too many elements')
        
        point_id = len(self.data_points)
        self.data_points.append(point)

        # 确定新节点的层级
        level = self._random_level()

        # 如果是第一个点，设为入口点
        if self.entry_point is None:
            self.entry_point = point_id
            self.entry_level = level
            for l in range(level+1):
                self.layers[l][point_id] = [] # 在新点的每一层创建空邻居列表
            return 
        
        # 从最高层开始搜索，找到每层的入口点
        current_point = self.entry_point
        current_max_level = self.entry_level

        # 从顶层开始搜索，找到到每层的入口点
        for l in range(current_max_level, level, -1):
            if l < len(self.layers):
                results = self._search_layer(point, current_point, 1, l)
                if results:
                    current_point = results[0][1] # 更新为更近的点

        # 从新节点的最高层开始，逐层向下插入并建立连接
        for l in range(min(current_max_level, level), -1, -1):
            # 在当前层搜索ef_construction个最近领
            results = self._search_layer(point, current_point, self.ef_confstruction, l)

            # 选择前M个最近领作为连接
            neighbors = [idx for _, idx in results[:self.M]] 

            # 在新节点的当前层创建连接
            self.layers[l][point_id] = neighbors.copy()

            # 双向连接：邻居也要连接到新节点
            for neighbor in neighbors:
                if len(self.layers[l][neighbor]) < self.M:
                    # 邻居连接数未满，直接添加
                    self.layers[l][neighbor].append(point_id)
                else:
                    # 如果邻居连接数已满，替换最远的连接
                    neighbor_neighbors = self.layers[l][neighbor]
                    distances = [self._euclidean_distance(
                        self.data_points[neighbor], self.data_points[n]) 
                        for n in neighbor_neighbors
                    ]
                    max_idx = np.argmax(distances) # 找到最远的邻居
                    # 如果新点更近，则替换最远的邻居
                    if self._euclidean_distance(self.data_points[neighbor], point) < distances[max_idx]:
                        neighbor_neighbors[max_idx] = point_id

        # 如果新节点的层级比当前入口点更高，更新entry_point和entry_level
        if level > self.entry_level:
            self.entry_point = point_id
            self.entry_level = level
        
        



    def search(self, query, k=5, ef_search=50):
        '''
        在HNSW中搜索最近邻

        参数:
        - query: 查询向量
        - k: 返回的最近邻数量
        - ef_search: 搜索时的候选集大小（越大精度越高但速度越慢）

        返回:
        - List<节点Id,距离> 按照距离从近到远排序
        '''
        if self.entry_point is None:
            return []
        
        current_point = self.entry_point
        current_level = self.entry_level

        # 从顶层开始搜索
        for l in range(current_level, 0, -1):
            results = self._search_layer(query, current_point, 1, l)
            if results:
                current_point = results[0][1] # 更新为每层的入口点

        # 在最底层进行精细搜索
        results = self._search_layer(query, current_point, ef_search, 0)

        # 返回前k个结果
        return [(idx, dist) for dist, idx in results[:k]]
