Skip to content

Commit

Permalink
Autofaiss supports creation of partitioned indexes (#126)
Browse files Browse the repository at this point in the history
This refactoring changes the training logic to work with smaller functions and adds the possibility to create thousands of indices in a distributed way given a partitioned dataset using `autofaiss build_partitioned_indexes` command!
  • Loading branch information
nateagr committed Jul 21, 2022
1 parent 79d7c39 commit be5a88d
Show file tree
Hide file tree
Showing 15 changed files with 1,169 additions and 236 deletions.
27 changes: 16 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,22 @@ numpy array and then call .reconstruct_from_offset() with your custom direct_map

## Using autofaiss with pyspark

Autofaiss allows users to build indices in Spark, you need to do the following steps:
Autofaiss allows you to build indices with Spark for the following two use cases:
- To build a big index in a distributed way
- Given a partitioned dataset of embeddings, building one index per partition in parallel and in a distributed way.

1. Install pyspark by `pip install pyspark`.
2. Prepare your embeddings files.
3. Create a spark session before using `build_index` (optional), if you don't create it, a default session would
be created with the least configuration.
Prerequisities:

Also see [distributed_autofaiss.md](docs/distributed/distributed_autofaiss.md) for a full guide of how to use autofaiss in distributed mode.
1. Install pyspark: `pip install pyspark`.
2. Prepare your embeddings files (partitioned or not).
3. Create a Spark session before calling autofaiss. If no Spark session exists, a default session will be creaed with a minimum configuration.

### Creating a big index in a distributed way

### Producing N indices
See [distributed_autofaiss.md](docs/distributed/distributed_autofaiss.md) for a complete guide.

In the distributed mode, you can generate a set of indices with the total memory larger than your current available
memory by setting `nb_indices_to_keep` different from 1.
For example, if you set `nb_indices_to_keep` to 10 and your `index_path` is `knn.index`, you are expected to produce 10
indices at the end of `build_index` with the followings names:
It is possible to generate an index that would require more memory than what's available. To do so, you can control the number of index splits that will compose your index with `nb_indices_to_keep`.
For example, if `nb_indices_to_keep` is 10 and `index_path` is `knn.index`, the final index will be decomposed into 10 smaller indexes:
- `knn.index01`
- `knn.index02`
- `knn.index03`
Expand All @@ -113,6 +113,11 @@ indices at the end of `build_index` with the followings names:

A [concrete example](examples/distributed_autofaiss_n_indices.py) shows how to produce N indices and how to use them.

### Creating partitioned indexes

Given a partitioned dataset of embeddings, it is possible to create on index per partition by calling the method `build_partitioned_indexes`.

See this [example](examples/partitioned_indexes.py) that shows how to create partitioned indexes.

## Using the command line

Expand Down
2 changes: 1 addition & 1 deletion autofaiss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# pylint: disable=unused-import,missing-docstring

from autofaiss.external.quantize import build_index, score_index, tune_index
from autofaiss.external.quantize import build_index, score_index, tune_index, build_partitioned_indexes

from autofaiss.version import __author__, __version__
238 changes: 121 additions & 117 deletions autofaiss/external/build.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
""" gather functions necessary to build an index """

import logging
from typing import Dict, Optional, Tuple, Union, Callable, Any
from typing import Dict, Optional, Tuple, Union, Callable, Any, List

import faiss
import pandas as pd
from faiss import extract_index_ivf
from embedding_reader import EmbeddingReader

from autofaiss.external.metadata import IndexMetadata
from autofaiss.external.optimize import (
check_if_index_needs_training,
get_optimal_batch_size,
get_optimal_index_keys_v2,
get_optimal_train_size,
)
from autofaiss.indices.index_factory import index_factory
from autofaiss.utils.cast import (
cast_bytes_to_memory_string,
cast_memory_to_bytes,
to_faiss_metric_type,
to_readable_time,
)
from autofaiss.external.optimize import check_if_index_needs_training, get_optimal_index_keys_v2, get_optimal_train_size
from autofaiss.utils.cast import cast_bytes_to_memory_string, cast_memory_to_bytes, to_readable_time
from autofaiss.utils.decorators import Timeit
from autofaiss.indices.distributed import run
from autofaiss.indices import distributed
from autofaiss.indices.index_utils import initialize_direct_map
from autofaiss.indices.training import create_and_train_new_index
from autofaiss.indices.build import add_embeddings_to_index_local


logger = logging.getLogger("autofaiss")

Expand Down Expand Up @@ -92,6 +85,55 @@ def get_estimated_construction_time_infos(nb_vectors: int, vec_dim: int, indent:
return infos


def add_embeddings_to_index(
embedding_reader: EmbeddingReader,
trained_index_or_path: Union[str, faiss.Index],
metadata: IndexMetadata,
current_memory_available: str,
embedding_ids_df_handler: Optional[Callable[[pd.DataFrame, int], Any]] = None,
distributed_engine: Optional[str] = None,
temporary_indices_folder: str = "hdfs://root/tmp/distributed_autofaiss_indices",
nb_indices_to_keep: int = 1,
index_optimizer: Callable = None,
) -> Tuple[Optional[faiss.Index], Optional[Dict[str, str]]]:
"""Add embeddings to the index"""

with Timeit("-> Adding the vectors to the index", indent=2):

# Estimate memory available for adding embeddings to index
size_per_index = metadata.estimated_index_size_in_bytes() / nb_indices_to_keep
memory_available_for_adding = cast_bytes_to_memory_string(
cast_memory_to_bytes(current_memory_available) - size_per_index
)
logger.info(
f"The memory available for adding the vectors is {memory_available_for_adding}"
"(total available - used by the index)"
)

if distributed_engine is None:
return add_embeddings_to_index_local(
embedding_reader=embedding_reader,
trained_index_or_path=trained_index_or_path,
memory_available_for_adding=memory_available_for_adding,
embedding_ids_df_handler=embedding_ids_df_handler,
index_optimizer=index_optimizer,
add_embeddings_with_ids=False,
)

elif distributed_engine == "pyspark":
return distributed.add_embeddings_to_index_distributed(
trained_index_or_path=trained_index_or_path,
embedding_reader=embedding_reader,
memory_available_for_adding=memory_available_for_adding,
embedding_ids_df_handler=embedding_ids_df_handler,
temporary_indices_folder=temporary_indices_folder,
nb_indices_to_keep=nb_indices_to_keep,
index_optimizer=index_optimizer,
)
else:
raise ValueError(f'Distributed by {distributed_engine} is not supported, only "pyspark" is supported')


def create_index(
embedding_reader: EmbeddingReader,
index_key: str,
Expand All @@ -100,120 +142,82 @@ def create_index(
embedding_ids_df_handler: Optional[Callable[[pd.DataFrame, int], Any]] = None,
use_gpu: bool = False,
make_direct_map: bool = False,
distributed: Optional[str] = None,
distributed_engine: Optional[str] = None,
temporary_indices_folder: str = "hdfs://root/tmp/distributed_autofaiss_indices",
nb_indices_to_keep: int = 1,
index_optimizer: Callable = None,
) -> Tuple[Optional[faiss.Index], Dict[str, str]]:
) -> Tuple[Optional[faiss.Index], Optional[Dict[str, str]]]:
"""
Function that returns an index on the numpy arrays stored on disk in the embeddings_path path.
Create an index and add embeddings to the index
"""

# Instanciate the index
with Timeit(f"-> Instanciate the index {index_key}", indent=2):

# Convert metric_type to faiss type
metric_type = to_faiss_metric_type(metric_type)

vec_dim = embedding_reader.dimension

# Instanciate the index
index = index_factory(vec_dim, index_key, metric_type)
metadata = IndexMetadata(index_key, embedding_reader.count, embedding_reader.dimension, make_direct_map)

metadata = IndexMetadata(index_key, embedding_reader.count, vec_dim, make_direct_map)

logger.info(
f"The index size will be approximately {cast_bytes_to_memory_string(metadata.estimated_index_size_in_bytes())}"
# Create and train index
trained_index = create_and_train_new_index(
embedding_reader, index_key, metadata, metric_type, current_memory_available, use_gpu
)

index_needs_training = check_if_index_needs_training(index_key)

if index_needs_training:
# Add embeddings to index
index, metrics = add_embeddings_to_index(
embedding_reader,
trained_index,
metadata,
current_memory_available,
embedding_ids_df_handler,
distributed_engine,
temporary_indices_folder,
nb_indices_to_keep,
index_optimizer,
)

# Extract training vectors
with Timeit("-> Extract training vectors", indent=2):
if make_direct_map:
initialize_direct_map(index)

memory_available_for_training = cast_bytes_to_memory_string(cast_memory_to_bytes(current_memory_available))
return index, metrics

# Determine the number of vectors necessary to train the index
train_size = get_optimal_train_size(
embedding_reader.count, index_key, memory_available_for_training, vec_dim
)
memory_needed_for_training = metadata.compute_memory_necessary_for_training(train_size)
logger.info(
f"Will use {train_size} vectors to train the index, "
f"that will use {cast_bytes_to_memory_string(memory_needed_for_training)} of memory"
)

# Extract training vectors
train_vectors, _ = next(embedding_reader(batch_size=train_size, start=0, end=train_size))

# Instanciate the index and train it
# pylint: disable=no-member
if use_gpu:
# if this fails, it means that the GPU version was not comp.
assert (
faiss.StandardGpuResources
), "FAISS was not compiled with GPU support, or loading _swigfaiss_gpu.so failed"
res = faiss.StandardGpuResources()
dev_no = 0
# transfer to GPU (may be partial).
index = faiss.index_cpu_to_gpu(res, dev_no, index)

with Timeit(
f"-> Training the index with {train_vectors.shape[0]} vectors of dim {train_vectors.shape[1]}", indent=2
):
index.train(train_vectors)

del train_vectors
else:
train_size = 0

size_per_index = metadata.estimated_index_size_in_bytes() / nb_indices_to_keep
def create_partitioned_indexes(
partitions: List[str],
output_root_dir: str,
embedding_column_name: str = "embedding",
id_columns: Optional[List[str]] = None,
should_be_memory_mappable: bool = False,
max_index_query_time_ms: float = 10.0,
max_index_memory_usage: str = "16G",
min_nearest_neighbors_to_retrieve: int = 20,
current_memory_available: str = "32G",
use_gpu: bool = False,
metric_type: str = "ip",
nb_cores: Optional[int] = None,
make_direct_map: bool = False,
temp_root_dir: str = "hdfs://root/tmp/distributed_autofaiss_indices",
big_index_threshold: int = 5_000_000,
nb_splits_per_big_index: int = 1,
maximum_nb_threads: int = 256,
) -> List[Optional[Dict[str, str]]]:
"""
Create partitioned indexes from a list of parquet partitions, i.e. create one index per parquet partition
memory_available_for_adding = cast_bytes_to_memory_string(
cast_memory_to_bytes(current_memory_available) - size_per_index
)
Only supported with Pyspark. An active PySpark session must exist before calling this method
"""

logger.info(
f"The memory available for adding the vectors is {memory_available_for_adding}"
"(total available - used by the index)"
return distributed.create_partitioned_indexes(
partitions=partitions,
big_index_threshold=big_index_threshold,
output_root_dir=output_root_dir,
nb_cores=nb_cores,
nb_splits_per_big_index=nb_splits_per_big_index,
id_columns=id_columns,
max_index_query_time_ms=max_index_query_time_ms,
min_nearest_neighbors_to_retrieve=min_nearest_neighbors_to_retrieve,
embedding_column_name=embedding_column_name,
max_index_memory_usage=max_index_memory_usage,
current_memory_available=current_memory_available,
use_gpu=use_gpu,
metric_type=metric_type,
make_direct_map=make_direct_map,
should_be_memory_mappable=should_be_memory_mappable,
temp_root_dir=temp_root_dir,
maximum_nb_threads=maximum_nb_threads,
)
logger.info("Will be using at most 1GB of ram for adding")
# Add the vectors to the index.
with Timeit("-> Adding the vectors to the index", indent=2):
batch_size = get_optimal_batch_size(vec_dim, memory_available_for_adding)
logger.info(
f"Using a batch size of {batch_size} (memory overhead {cast_bytes_to_memory_string(batch_size*vec_dim*4)})"
)

if make_direct_map:
# Retrieve the embedded index if we are in an IndexPreTransform state
if isinstance(index, faiss.swigfaiss.IndexPreTransform):
embedded_index = extract_index_ivf(index)
else:
embedded_index = index

# Make direct map is only implemented for IndexIVF and IndexBinaryIVF, see built file faiss/swigfaiss.py
if isinstance(embedded_index, (faiss.swigfaiss.IndexIVF, faiss.swigfaiss.IndexBinaryIVF)):
embedded_index.make_direct_map()
if distributed is None:
for batch_id, (vec_batch, ids_batch) in enumerate(embedding_reader(batch_size=batch_size)):
index.add(vec_batch)
if embedding_ids_df_handler:
embedding_ids_df_handler(ids_batch, batch_id)
metric_infos = index_optimizer(index, "") # type: ignore
elif distributed == "pyspark":
index, metric_infos = run(
faiss_index=index,
embedding_reader=embedding_reader,
memory_available_for_adding=memory_available_for_adding,
embedding_ids_df_handler=embedding_ids_df_handler,
temporary_indices_folder=temporary_indices_folder,
nb_indices_to_keep=nb_indices_to_keep,
index_optimizer=index_optimizer,
)
else:
raise ValueError(f'Distributed by {distributed} is not supported, only "pyspark" is supported')
# return the index.
return index, metric_infos
10 changes: 3 additions & 7 deletions autofaiss/external/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def index_key_to_nb_cluster(index_key: str) -> int:


def get_optimal_train_size(
nb_vectors: int, index_key: str, current_memory_available: Optional[str], vec_dim: Optional[int],
nb_vectors: int, index_key: str, current_memory_available: Optional[str], vec_dim: Optional[int]
) -> int:
"""
Function that determines the number of training points necessary to
Expand Down Expand Up @@ -436,7 +436,7 @@ def binary_search_on_param(
timout_s = 15 * max_speed_ms / 1000

get_speed = partial(
speed_test_ms_per_query, query=query_vectors, ksearch=40, timout_s=min(max_timeout_per_iteration_s, timout_s),
speed_test_ms_per_query, query=query_vectors, ksearch=40, timout_s=min(max_timeout_per_iteration_s, timout_s)
)

def is_not_acceptable_speed(rank: int) -> bool:
Expand Down Expand Up @@ -563,11 +563,7 @@ def optimize_and_measure_index(
# Set search hyperparameters for the index
set_search_hyperparameters(index, index_param, use_gpu)
logger.info(f"The best hyperparameters are: {index_param}")
metric_infos = {
"index_key": index_key,
"index_param": index_param,
"index_path": index_path,
}
metric_infos = {"index_key": index_key, "index_param": index_param, "index_path": index_path}
with Timeit("Compute fast metrics", indent=1):
metric_infos.update(compute_fast_metrics(embedding_reader, index))
if save_on_disk:
Expand Down
Loading

0 comments on commit be5a88d

Please sign in to comment.