Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(elastic): update offset2ids management #416

Merged
merged 10 commits into from
Jun 27, 2022
16 changes: 16 additions & 0 deletions docarray/array/storage/elastic/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ def _update_offset2ids_meta(self):
r = bulk(self._client, requests)
self._client.indices.refresh(index=self._index_name_offset2id)

# Clean trailing unused offsets
offset_count = self._client.count(index=self._index_name_offset2id)
unused_offsets = range(len(self._offset2ids.ids), offset_count['count'])

if len(unused_offsets) > 0:
requests = [
{
'_op_type': 'delete',
'_id': offset_, # note offset goes here because it's what we want to get by
'_index': self._index_name_offset2id,
}
for offset_ in unused_offsets
]
r = bulk(self._client, requests)
self._client.indices.refresh(index=self._index_name_offset2id)

def _get_offset2ids_meta(self) -> List:
"""Return the offset2ids stored in elastic

Expand Down
4 changes: 3 additions & 1 deletion docarray/array/storage/elastic/seqlike.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ def _upload_batch(self, docs: Iterable['Document']):
def extend(self, docs: Iterable['Document']):
docs = list(docs)
self._upload_batch(docs)
self._offset2ids.extend([doc.id for doc in docs])
self._offset2ids.extend(
[doc.id for doc in docs if doc.id not in self._offset2ids.ids]
)
Empty file.
Empty file.
42 changes: 42 additions & 0 deletions tests/unit/array/storage/elastic/test_add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from docarray import Document, DocumentArray


def test_add_ignore_existing_doc_id(start_storage):
elastic_doc = DocumentArray(
storage='elasticsearch',
config={
'n_dim': 3,
'columns': [('price', 'int')],
'distance': 'l2_norm',
'index_name': 'test_add',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you perhaps randomize the index name or give the index name the exact same name as the test function name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sure will do

},
)

with elastic_doc:
elastic_doc.extend(
[
Document(id='r0', embedding=[0, 0, 0]),
Document(id='r1', embedding=[1, 1, 1]),
Document(id='r2', embedding=[2, 2, 2]),
Document(id='r3', embedding=[3, 3, 3]),
Document(id='r4', embedding=[4, 4, 4]),
]
)

with elastic_doc:
elastic_doc.extend(
[
Document(id='r0', embedding=[0, 0, 0]),
Document(id='r2', embedding=[2, 2, 2]),
Document(id='r4', embedding=[4, 4, 4]),
Document(id='r5', embedding=[2, 2, 2]),
Document(id='r6', embedding=[4, 4, 4]),
]
)

indexed_offset_count = elastic_doc._client.count(
index=elastic_doc._index_name_offset2id
)['count']

assert len(elastic_doc) == len(elastic_doc[:, 'embedding'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assert that len(elastic_doc) == 7 also for extra security, this test otherwise would pass even with wrong behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay noted

assert len(elastic_doc) == indexed_offset_count
47 changes: 47 additions & 0 deletions tests/unit/array/storage/elastic/test_del.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from docarray import Document, DocumentArray
import pytest


@pytest.mark.parametrize('deleted_elmnts', [[0, 1], ['r0', 'r1']])
def test_delete_offset_success_sync_es_offset_index(deleted_elmnts, start_storage):
elastic_doc = DocumentArray(
storage='elasticsearch',
config={
'n_dim': 3,
'columns': [('price', 'int')],
'distance': 'l2_norm',
'index_name': 'test_delete',
},
)

with elastic_doc:
elastic_doc.extend(
[
Document(id='r0', embedding=[0, 0, 0]),
Document(id='r1', embedding=[1, 1, 1]),
Document(id='r2', embedding=[2, 2, 2]),
Document(id='r3', embedding=[3, 3, 3]),
Document(id='r4', embedding=[4, 4, 4]),
Document(id='r5', embedding=[5, 5, 5]),
Document(id='r6', embedding=[6, 6, 6]),
Document(id='r7', embedding=[7, 7, 7]),
]
)

expected_offset_after_del = ['r2', 'r3', 'r4', 'r5', 'r6', 'r7']

with elastic_doc:
del elastic_doc[deleted_elmnts]

indexed_offset_count = elastic_doc._client.count(
index=elastic_doc._index_name_offset2id
)['count']

assert len(elastic_doc._offset2ids.ids) == indexed_offset_count
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, what should be the length here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep will be updated


for id in expected_offset_after_del:
expected_offset = str(expected_offset_after_del.index(id))
actual_offset_index = elastic_doc._client.search(
index=elastic_doc._index_name_offset2id, query={'match': {'blob': id}}
)['hits']['hits'][0]['_id']
assert actual_offset_index == expected_offset