Реализация построения Small world графа (SW) и алгоритма навигации по нему (Navigable small world).

In [4]:
from typing import Callable, Tuple, Dict, List

import numpy as np


def distance(pointA: np.ndarray, documents: np.ndarray) -> np.ndarray:
    """Рассчитывает Евклидово (L2) расстояние от точки pointA
    (вектор размера 1∗D, где D - размерность эмбеддинга) до всех объектов во
    множестве documents (матрица размера N∗D, где N - количество документов)

    Parameters
    ----------
    pointA : np.ndarray
        
    documents : np.ndarray
        

    Returns
    -------
    distance : np.ndarray - матрица дистанций между pointA и всеми объектами в documents
        

    
    """
    if documents.ndim == 2:
        distance = np.apply_along_axis(np.linalg.norm, 1, (pointA-documents))    
    else:
        distance = np.linalg.norm(pointA-documents)
    return distance

def create_sw_graph(
        data: np.ndarray,
        num_candidates_for_choice_long: int = 10,
        num_edges_long: int = 5,
        num_candidates_for_choice_short: int = 10,
        num_edges_short: int = 5,
        dist_f: Callable = distance
    ) -> Dict[int, List[int]]:
    """Создаёт граф связей в множестве документов data (матрица размеров N∗D)

    Parameters
    ----------
    data : np.ndarray
        
    num_candidates_for_choice_long : int
        (Default value = 10) - количество кандидатов, являющихся самыми далекими объектами
    num_edges_long : int
        (Default value = 5) - количество случайно отобранных точек для построения рёбер графа из всех далёких объектов
    num_candidates_for_choice_short : int
        (Default value = 10) - количество кандидатов, являющихся самыми близкими объектами
    num_edges_short : int
        (Default value = 5) - количество случайно отобранных точек для построения рёбер графа из всех близких объектов
    dist_f : Callable
        (Default value = distance) - функция расёта расстояния от точки до набора точек

    Returns
    -------
    SW : Dict - словарь, где ключ - индекс точки (и количество ключей равно количеству точек N),
        
    а значение — список индексов точек, которыми образованы связи в виде ребер (длинных и коротких совместно, без разделения)
        

    
    """
       
        
    distances = np.zeros((data.shape[0], data.shape[0]))
    longs = np.zeros((data.shape[0], num_edges_long))
    shorts = np.zeros((data.shape[0], num_edges_short))
    SW = {}
    for i in range(len(data)):
        distances[i]=dist_f(data[i], data)
        longs[i]=np.random.choice(np.sort(dist_f(data[i], data))[::-1][:num_candidates_for_choice_long],
                                      num_edges_long, replace=False)
        shorts[i]=np.random.choice(np.sort(dist_f(data[i], data))[1:num_candidates_for_choice_short],
                                       num_edges_short, replace=False)
        sorter=np.argsort(distances[i])
        long = sorter[np.searchsorted(distances[i], longs[i], sorter=sorter)].tolist()
        short = sorter[np.searchsorted(distances[i], shorts[i], sorter=sorter)].tolist()
        SW[i]=(long+short)
    
    return SW

def nsw(query_point: np.ndarray,
        all_documents: np.ndarray, 
        graph_edges: Dict[int, List[int]],
        search_k: int = 10, 
        num_start_points: int = 5,
        dist_f: Callable = distance) -> np.ndarray:
    """Поиск ближайших к pointA (вектор размера 1∗D, где D - размерность эмбеддинга) точек в графе.

    Parameters
    ----------
    query_point : np.ndarray
        
    all_documents : np.ndarray
        
    graph_edges : Dict[int, List[int]]
        
    search_k : int
        (Default value = 10) - количество соседей, которое необходимо вернуть
    num_start_points : int
        (Default value = 5) - количество случайно выбранных стартовых точек
    dist_f : Callable
        (Default value = distance)  - функция расёта расстояния от точки до набора точек

    Returns
    -------
    list : список из search_k точек в all_documents, ближайших к query_point
        

    
    """

    result_queue = []
    visited_set = set()
    
    hops = 0
    for _ in range(num_start_points):
        entry_node = np.random.randint(0, len(graph_edges) - 1)
        entry_dist = dist_f(query_point, all_documents[entry_node])
        candidate_queue = []
        candidate_queue.append((entry_dist, entry_node))

        temp_result_queue = []
        while candidate_queue:
            candidate_dist, candidate_idx = candidate_queue.pop()

            if len(result_queue) >= search_k:
                # if candidate is further than the k-th element from the result,
                # then we would break the repeat loop
                current_k_dist, current_k_idx = sorted(result_queue, key=lambda x: x[0])[0]
                if candidate_dist > current_k_dist:
                    break

            for friend_node in graph_edges[candidate_idx]:
                if friend_node not in visited_set:
                    visited_set.add(friend_node)
                
                    friend_dist = dist_f(query_point, all_documents[friend_node])
                    candidate_queue.append((friend_dist, friend_node))
                    temp_result_queue.append((friend_dist,  friend_node))

        result_queue = result_queue+temp_result_queue

    return [el[1] for el in sorted(result_queue, key=lambda x: x[0])[:search_k]]


query = np.random.rand(20)

data = np.random.rand(40,20)

graf=create_sw_graph(data=data)
nsw(query,
    data, 
    graf,
    10, 
    5,
    distance)

[29, 17, 10, 37, 26, 19, 2, 32, 20, 34]