In [51]:
import time

import numpy as np
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

Here two utility functions
1) hamming : Compute the hamming distance between two binary vectors
2) process_block: Is the function executed by a thread in our ThreadPool and is responsible of updating the SharedHeap for the corresponding query

In [52]:
def hamming_distance(a, b):
    return np.count_nonzero(a != b)


def process_block(queries, database_block, shared_heap, start_index):
    for i, query in enumerate(queries):
        for j, db_vector in enumerate(database_block):
            distance = hamming_distance(query, db_vector)
            shared_heap.update(i, distance, start_index + j)
    return "Block processed"  # Return a value to indicate completion

Here we present the implementation of a SharedHeapDataStracture useful for implementing a Faiss-Like LSH approach

Heap is a datastructure based on binary three that could be easily implemented by using arrays. In particular we realize a Max-Heap.
Max-Heap means that in the root of our binary three we will have the highest value with the property that each node must have descendents with a lower value
This datastructure is modified concurrently by different thread(the reason of using Locks) and at the end will contain the top k(approximate) closest neighbour.

In [54]:
class SharedMaxHeap:
    def __init__(self, n_queries, k):
        self.n_queries = n_queries
        self.k = k
        self.distances = np.full((n_queries, k), np.inf)
        self.indices = np.full((n_queries, k), -1, dtype=np.int64)
        self.locks = [threading.Lock() for _ in range(n_queries)]

    def update(self, query_idx, distance, index):
        with self.locks[query_idx]:
            if distance < self.distances[query_idx, 0]:
                self.distances[query_idx, 0] = distance
                self.indices[query_idx, 0] = index
                # Maintain heap property
                i = 0
                while 2 * i + 1 < self.k:  # Ha almeno un figlio
                    left = 2 * i + 1
                    right = 2 * i + 2
                    largest = i
                    if left < self.k and self.distances[query_idx, left] > self.distances[query_idx, largest]:
                        largest = left
                    if right < self.k and self.distances[query_idx, right] > self.distances[query_idx, largest]:
                        largest = right
                    if largest == i:  # Se invece il maggiore è quello in cima stiamo bene
                        break
                    self.distances[query_idx, i], self.distances[query_idx, largest] = \
                        self.distances[query_idx, largest], self.distances[query_idx, i]
                    self.indices[query_idx, i], self.indices[query_idx, largest] = \
                        self.indices[query_idx, largest], self.indices[query_idx, i]
                    i = largest

In [55]:
def knn_hamming_search(queries, database, k, block_size=12):
    n_queries, n_database = len(queries), len(database)
    shared_heap = SharedMaxHeap(n_queries, k)

    futures = []
    with ThreadPoolExecutor() as executor:  # Default n_workers=Logical CPU Cores+ 4
        for i in range(0, n_database, block_size):  # For each block
            database_block = database[i:min(i + block_size, n_database)]  # Pick a block from the index
            future = executor.submit(process_block, queries, database_block, shared_heap, i)  # Submit a
            futures.append(future)

        # Wait for all tasks to complete
        for future in as_completed(futures):
            future.result()  # This will raise an exception if the task failed

    # Sort results for each query
    results = []
    for i in range(n_queries):
        indices = shared_heap.indices[i]
        distances = shared_heap.distances[i]
        sorted_pairs = sorted(zip(distances, indices))
        sorted_distances, sorted_indices = zip(*sorted_pairs)
        results.append((list(sorted_indices), list(sorted_distances)))

    return results

In [60]:
# Example usage
if __name__ == "__main__":
    np.random.seed(0)
    queries = np.random.randint(0, 2, (100, 512))  # 100 query vectors of 64 bits each
    database = np.random.randint(0, 2, (20000, 512))  # 10000 database vectors of 64 bits each
    k = 5  # find top 5 nearest neighbors

    prima = time.time()
    results = knn_hamming_search(queries, queries, k)
    print(time.time() - prima)

    # Print results for the first query
    print("Top 5 matches for the first query:")
    print("Global Indices:", results[0][0])
    print("Distances:", results[0][1])

    print("Search completed.")




0.07957983016967773
Top 5 matches for the first query:
Global Indices: [0, 77, 34, 74, 84]
Distances: [0.0, 225.0, 236.0, 237.0, 238.0]
Search completed.
