Skip to content

Commit

Permalink
Support of skip_ids in merge_from_multiple function of OnDiskInverted…
Browse files Browse the repository at this point in the history
…Lists (#3327)

Summary:
Pull Request resolved: #3327

**Context**
1. [Issue 2621](#2621) discuss inconsistency between OnDiskInvertedList and InvertedList. OnDiskInvertedList is supposed to handle disk based multiple Index Shards. Thus, we should name it differently when merging invls from index shard.
2. [Issue 2876](#2876) provides usecase of shifting ids when merging invls from different shards.

**In this diff**,
1. To address #1 above, I renamed the merge_from function to merge_from_multiple without touching merge_from base class.
why so? To continue to allow merge invl from one index to ondiskinvl from other index.

2. To address #2 above, I have added support of shift_ids in merge_from_multiple to shift ids from different shards. This can be used when each shard has same set of ids but different data. This is not recommended if id is already unique across shards.

Reviewed By: mdouze

Differential Revision: D55482518

fbshipit-source-id: 95470c7449160488d2b45b024d134cbc037a2083
  • Loading branch information
kuarora authored and facebook-github-bot committed Apr 3, 2024
1 parent c9c86f0 commit da9f292
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 16 deletions.
4 changes: 2 additions & 2 deletions contrib/ondisk.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


def merge_ondisk(
trained_index: faiss.Index, shard_fnames: List[str], ivfdata_fname: str
trained_index: faiss.Index, shard_fnames: List[str], ivfdata_fname: str, shift_ids=False
) -> None:
"""Add the contents of the indexes stored in shard_fnames into the index
trained_index. The on-disk data is stored in ivfdata_fname"""
Expand Down Expand Up @@ -51,7 +51,7 @@ def merge_ondisk(
ivf_vector.push_back(ivf)

LOG.info("merge %d inverted lists " % ivf_vector.size())
ntotal = invlists.merge_from(ivf_vector.data(), ivf_vector.size())
ntotal = invlists.merge_from_multiple(ivf_vector.data(), ivf_vector.size(), shift_ids)

# now replace the inverted lists in the output index
index.ntotal = index_ivf.ntotal = ntotal
Expand Down
23 changes: 19 additions & 4 deletions faiss/invlists/OnDiskInvertedLists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,22 +565,27 @@ void OnDiskInvertedLists::free_slot(size_t offset, size_t capacity) {
/*****************************************
* Compact form
*****************************************/

size_t OnDiskInvertedLists::merge_from(
size_t OnDiskInvertedLists::merge_from_multiple(
const InvertedLists** ils,
int n_il,
bool shift_ids,
bool verbose) {
FAISS_THROW_IF_NOT_MSG(
totsize == 0, "works only on an empty InvertedLists");

std::vector<size_t> sizes(nlist);
std::vector<size_t> shift_id_offsets(n_il);
for (int i = 0; i < n_il; i++) {
const InvertedLists* il = ils[i];
FAISS_THROW_IF_NOT(il->nlist == nlist && il->code_size == code_size);

for (size_t j = 0; j < nlist; j++) {
sizes[j] += il->list_size(j);
}

size_t il_totsize = il->compute_ntotal();
shift_id_offsets[i] =
(shift_ids && i > 0) ? shift_id_offsets[i - 1] + il_totsize : 0;
}

size_t cums = 0;
Expand All @@ -605,11 +610,21 @@ size_t OnDiskInvertedLists::merge_from(
const InvertedLists* il = ils[i];
size_t n_entry = il->list_size(j);
l.size += n_entry;
ScopedIds scope_ids(il, j);
const idx_t* scope_ids_data = scope_ids.get();
std::vector<idx_t> new_ids;
if (shift_ids) {
new_ids.resize(n_entry);
for (size_t k = 0; k < n_entry; k++) {
new_ids[k] = scope_ids[k] + shift_id_offsets[i];
}
scope_ids_data = new_ids.data();
}
update_entries(
j,
l.size - n_entry,
n_entry,
ScopedIds(il, j).get(),
scope_ids_data,
ScopedCodes(il, j).get());
}
assert(l.size == l.capacity);
Expand Down Expand Up @@ -638,7 +653,7 @@ size_t OnDiskInvertedLists::merge_from(
size_t OnDiskInvertedLists::merge_from_1(
const InvertedLists* ils,
bool verbose) {
return merge_from(&ils, 1, verbose);
return merge_from_multiple(&ils, 1, verbose);
}

void OnDiskInvertedLists::crop_invlists(size_t l0, size_t l1) {
Expand Down
3 changes: 2 additions & 1 deletion faiss/invlists/OnDiskInvertedLists.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ struct OnDiskInvertedLists : InvertedLists {

// copy all inverted lists into *this, in compact form (without
// allocating slots)
size_t merge_from(
size_t merge_from_multiple(
const InvertedLists** ils,
int n_il,
bool shift_ids = false,
bool verbose = false);

/// same as merge_from for a single invlist
Expand Down
73 changes: 66 additions & 7 deletions tests/test_contrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import platform
import os
import random
import shutil
import tempfile

from faiss.contrib import datasets
Expand All @@ -17,15 +18,13 @@
from faiss.contrib import ivf_tools
from faiss.contrib import clustering
from faiss.contrib import big_batch_search
from faiss.contrib.ondisk import merge_ondisk

from common_faiss_tests import get_dataset_2
try:
from faiss.contrib.exhaustive_search import \
knn_ground_truth, knn, range_ground_truth, \
range_search_max_results, exponential_query_iterator
except:
pass # Submodule import broken in python 2.

from faiss.contrib.exhaustive_search import \
knn_ground_truth, knn, range_ground_truth, \
range_search_max_results, exponential_query_iterator
from contextlib import contextmanager

@unittest.skipIf(platform.python_version_tuple()[0] < '3',
'Submodule import broken in python 2.')
Expand Down Expand Up @@ -674,3 +673,63 @@ def test_code_set(self):
np.testing.assert_equal(
np.sort(np.unique(codes, axis=0), axis=None),
np.sort(codes[inserted], axis=None))


@unittest.skipIf(platform.system() == 'Windows',
'OnDiskInvertedLists is unsupported on Windows.')
class TestMerge(unittest.TestCase):
@contextmanager
def temp_directory(self):
temp_dir = tempfile.mkdtemp()
try:
yield temp_dir
finally:
shutil.rmtree(temp_dir)

def do_test_ondisk_merge(self, shift_ids=False):
with self.temp_directory() as tmpdir:
# only train and add index to disk without adding elements.
# this will create empty inverted lists.
ds = datasets.SyntheticDataset(32, 2000, 200, 20)
index = faiss.index_factory(ds.d, "IVF32,Flat")
index.train(ds.get_train())
faiss.write_index(index, tmpdir + "/trained.index")

# create 4 shards and add elements to them
ns = 4 # number of shards

for bno in range(ns):
index = faiss.read_index(tmpdir + "/trained.index")
i0, i1 = int(bno * ds.nb / ns), int((bno + 1) * ds.nb / ns)
if shift_ids:
index.add_with_ids(ds.xb[i0:i1], np.arange(0, ds.nb / ns))
else:
index.add_with_ids(ds.xb[i0:i1], np.arange(i0, i1))
faiss.write_index(index, tmpdir + "/block_%d.index" % bno)

# construct the output index and merge them on disk
index = faiss.read_index(tmpdir + "/trained.index")
block_fnames = [tmpdir + "/block_%d.index" % bno for bno in range(4)]

merge_ondisk(
index, block_fnames, tmpdir + "/merged_index.ivfdata", shift_ids
)
faiss.write_index(index, tmpdir + "/populated.index")

# perform a search from index on disk
index = faiss.read_index(tmpdir + "/populated.index")
index.nprobe = 5
D, I = index.search(ds.xq, 5)

# ground-truth
gtI = ds.get_groundtruth(5)

recall_at_1 = (I[:, :1] == gtI[:, :1]).sum() / float(ds.xq.shape[0])
self.assertGreaterEqual(recall_at_1, 0.5)

def test_ondisk_merge(self):
self.do_test_ondisk_merge()

def test_ondisk_merge_with_shift_ids(self):
# verified that recall is same for test_ondisk_merge and
self.do_test_ondisk_merge(True)
28 changes: 26 additions & 2 deletions tests/test_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ size_t nq = 100;
int nindex = 4;
int k = 10;
int nlist = 40;
int shard_size = nb / nindex;

struct CommonData {
std::vector<float> database;
Expand Down Expand Up @@ -100,7 +101,7 @@ int compare_merged(
auto il = new faiss::OnDiskInvertedLists(
index0->nlist, index0->code_size, filename.c_str());

il->merge_from(lists.data(), lists.size());
il->merge_from_multiple(lists.data(), lists.size(), shift_ids);

index0->replace_invlists(il, true);
index0->ntotal = ntotal;
Expand All @@ -110,11 +111,14 @@ int compare_merged(
nq, cd.queries.data(), k, newD.data(), newI.data());

size_t ndiff = 0;
bool adjust_ids = shift_ids && !standard_merge;
for (size_t i = 0; i < k * nq; i++) {
if (refI[i] != newI[i]) {
idx_t new_id = adjust_ids ? refI[i] % shard_size : refI[i];
if (refI[i] != new_id) {
ndiff++;
}
}

return ndiff;
}

Expand Down Expand Up @@ -220,3 +224,23 @@ TEST(MERGE, merge_flat_ondisk_2) {
int ndiff = compare_merged(&index_shards, false, false);
EXPECT_GE(0, ndiff);
}

// now use ondisk specific merge and use shift ids
TEST(MERGE, merge_flat_ondisk_3) {
faiss::IndexShards index_shards(d, false, false);
index_shards.own_indices = true;

std::vector<idx_t> ids;
for (int i = 0; i < nb; ++i) {
int id = i % shard_size;
ids.push_back(id);
}
for (int i = 0; i < nindex; i++) {
index_shards.add_shard(
new faiss::IndexIVFFlat(&cd.quantizer, d, nlist));
}
EXPECT_TRUE(index_shards.is_trained);
index_shards.add_with_ids(nb, cd.database.data(), ids.data());
int ndiff = compare_merged(&index_shards, true, false);
EXPECT_GE(0, ndiff);
}

0 comments on commit da9f292

Please sign in to comment.