diff --git a/docs/sphinx/api_helpers.rst b/docs/sphinx/api_helpers.rst index 31d5b241d..aeb791a80 100644 --- a/docs/sphinx/api_helpers.rst +++ b/docs/sphinx/api_helpers.rst @@ -17,6 +17,10 @@ Bulk ---- .. autofunction:: bulk +Dense Vector packing +-------------------- +.. autofunction:: pack_dense_vector + Scan ---- .. autofunction:: scan diff --git a/elasticsearch/dsl/__init__.py b/elasticsearch/dsl/__init__.py index 59294d632..a74ddfc54 100644 --- a/elasticsearch/dsl/__init__.py +++ b/elasticsearch/dsl/__init__.py @@ -73,6 +73,7 @@ MatchOnlyText, Murmur3, Nested, + NumpyDenseVector, Object, Passthrough, Percolator, @@ -189,6 +190,7 @@ "Murmur3", "Nested", "NestedFacet", + "NumpyDenseVector", "Object", "Passthrough", "Percolator", diff --git a/elasticsearch/dsl/field.py b/elasticsearch/dsl/field.py index 3b5075287..6b73be4ac 100644 --- a/elasticsearch/dsl/field.py +++ b/elasticsearch/dsl/field.py @@ -1616,11 +1616,33 @@ def __init__( kwargs["multi"] = True super().__init__(*args, **kwargs) - def _deserialize(self, data: Any) -> Any: - if self._element_type == "float": - return float(data) - elif self._element_type == "byte": - return int(data) + +class NumpyDenseVector(DenseVector): + """A dense vector field that uses numpy arrays. + + Accepts the same arguments as class ``DenseVector`` plus: + + :arg dtype: The numpy data type to use for the array. If not given, numpy will select the type based on the data. + """ + + def __init__(self, *args: Any, dtype: Optional[type] = None, **kwargs: Any): + super().__init__(*args, **kwargs) + self._dtype = dtype + + def deserialize(self, data: Any) -> Any: + if isinstance(data, list): + import numpy as np + + return np.array(data, dtype=self._dtype) + return super().deserialize(data) + + def clean(self, data: Any) -> Any: + # this method does the same as the one in the parent classes, but it + # avoids comparisons that do not work for numpy arrays + if data is not None: + data = self.deserialize(data) + if (data is None or len(data) == 0) and self._required: + raise ValidationException("Value required for this field.") return data diff --git a/elasticsearch/dsl/utils.py b/elasticsearch/dsl/utils.py index cce3c052c..b78f36000 100644 --- a/elasticsearch/dsl/utils.py +++ b/elasticsearch/dsl/utils.py @@ -612,8 +612,17 @@ def to_dict(self, skip_empty: bool = True) -> Dict[str, Any]: if skip_empty: # don't serialize empty values # careful not to include numeric zeros - if v in ([], {}, None): - continue + try: + if v in ([], {}, None): + continue + except ValueError: + # the above fails when v is a numpy array + # try using len() instead + try: + if len(v) == 0: + continue + except TypeError: + pass out[k] = v return out diff --git a/elasticsearch/helpers/__init__.py b/elasticsearch/helpers/__init__.py index 6f8f24c21..2c343df3c 100644 --- a/elasticsearch/helpers/__init__.py +++ b/elasticsearch/helpers/__init__.py @@ -23,6 +23,7 @@ BULK_FLUSH, bulk, expand_action, + pack_dense_vector, parallel_bulk, reindex, scan, @@ -37,6 +38,7 @@ "expand_action", "streaming_bulk", "bulk", + "pack_dense_vector", "parallel_bulk", "scan", "reindex", diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 22d4ebe22..93eb2214e 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -15,12 +15,14 @@ # specific language governing permissions and limitations # under the License. +import base64 import logging import queue import time from enum import Enum from operator import methodcaller from typing import ( + TYPE_CHECKING, Any, Callable, Collection, @@ -31,6 +33,7 @@ Mapping, MutableMapping, Optional, + Sequence, Tuple, Union, ) @@ -43,6 +46,9 @@ from ..serializer import Serializer from .errors import BulkIndexError, ScanError +if TYPE_CHECKING: + import numpy as np + logger = logging.getLogger("elasticsearch.helpers") @@ -708,6 +714,21 @@ def _setup_queues(self) -> None: pool.join() +def pack_dense_vector(vector: Union["np.ndarray", Sequence[float]]) -> str: + """Helper function that packs a dense vector for efficient uploading. + + :arg v: the list or numpy array to pack. + """ + import numpy as np + + if type(vector) is not np.ndarray: + vector = np.array(vector, dtype=np.float32) + elif vector.dtype != np.float32: + raise ValueError("Only arrays of type float32 can be packed") + byte_array = vector.byteswap().tobytes() + return base64.b64encode(byte_array).decode() + + def scan( client: Elasticsearch, query: Optional[Any] = None, diff --git a/examples/quotes/backend/quotes.py b/examples/quotes/backend/quotes.py index 4492d5e7e..6e7fb28b2 100644 --- a/examples/quotes/backend/quotes.py +++ b/examples/quotes/backend/quotes.py @@ -5,26 +5,38 @@ from typing import Annotated from fastapi import FastAPI, HTTPException -from pydantic import BaseModel, Field, ValidationError +import numpy as np +from pydantic import BaseModel, Field, PlainSerializer from sentence_transformers import SentenceTransformer -from elasticsearch import NotFoundError +from elasticsearch import NotFoundError, OrjsonSerializer from elasticsearch.dsl.pydantic import AsyncBaseESModel from elasticsearch import dsl +from elasticsearch.helpers import pack_dense_vector model = SentenceTransformer("all-MiniLM-L6-v2") -dsl.async_connections.create_connection(hosts=[os.environ['ELASTICSEARCH_URL']]) +dsl.async_connections.create_connection(hosts=[os.environ['ELASTICSEARCH_URL']], serializer=OrjsonSerializer()) class Quote(AsyncBaseESModel): quote: str author: Annotated[str, dsl.Keyword()] tags: Annotated[list[str], dsl.Keyword()] - embedding: Annotated[list[float], dsl.DenseVector()] = Field(init=False, default=[]) + embedding: Annotated[ + np.ndarray, + PlainSerializer(lambda v: v.tolist()), + dsl.NumpyDenseVector(dtype=np.float32) + ] = Field(init=False, default_factory=lambda: np.array([], dtype=np.float32)) + + class Config: + arbitrary_types_allowed = True class Index: name = 'quotes' + def clean(self): + # pack the embedding for efficient uploading + self.embedding = pack_dense_vector(self.embedding) class Tag(BaseModel): tag: str @@ -135,7 +147,7 @@ async def search_quotes(req: SearchRequest) -> SearchResponse: def embed_quotes(quotes): embeddings = model.encode([q.quote for q in quotes]) for q, e in zip(quotes, embeddings): - q.embedding = e.tolist() + q.embedding = e async def ingest_quotes(): diff --git a/test_elasticsearch/test_dsl/test_integration/_async/test_document.py b/test_elasticsearch/test_dsl/test_integration/_async/test_document.py index 0dd0de0ca..3170c6958 100644 --- a/test_elasticsearch/test_dsl/test_integration/_async/test_document.py +++ b/test_elasticsearch/test_dsl/test_integration/_async/test_document.py @@ -25,6 +25,7 @@ from ipaddress import ip_address from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, List, Optional, Tuple, Union +import numpy as np import pytest from pytest import raises from pytz import timezone @@ -47,6 +48,7 @@ Mapping, MetaField, Nested, + NumpyDenseVector, Object, Q, RankFeatures, @@ -57,6 +59,7 @@ from elasticsearch.dsl.query import Match from elasticsearch.dsl.types import MatchQuery from elasticsearch.dsl.utils import AttrList +from elasticsearch.helpers import pack_dense_vector from elasticsearch.helpers.errors import BulkIndexError snowball = analyzer("my_snow", tokenizer="standard", filter=["lowercase", "snowball"]) @@ -865,25 +868,47 @@ class Doc(AsyncDocument): float_vector: List[float] = mapped_field(DenseVector()) byte_vector: List[int] = mapped_field(DenseVector(element_type="byte")) bit_vector: List[int] = mapped_field(DenseVector(element_type="bit")) + numpy_float_vector: np.ndarray = mapped_field(NumpyDenseVector()) + packed_float_vector: List[float] = mapped_field(DenseVector()) + packed_numpy_float_vector: np.ndarray = mapped_field(NumpyDenseVector()) class Index: name = "vectors" + def clean(self): + # pack the dense vectors before they are sent to Elasticsearch + self.packed_float_vector = pack_dense_vector(self.packed_float_vector) + self.packed_numpy_float_vector = pack_dense_vector( + self.packed_numpy_float_vector + ) + await Doc._index.delete(ignore_unavailable=True) await Doc.init() + test_float_vector = [1.0, 1.2, 2.3] + test_byte_vector = [12, 23, 34, 45] + test_bit_vector = [18, -43, -112] + doc = Doc( - float_vector=[1.0, 1.2, 2.3], - byte_vector=[12, 23, 34, 45], - bit_vector=[18, -43, -112], + float_vector=test_float_vector, + byte_vector=test_byte_vector, + bit_vector=test_bit_vector, + numpy_float_vector=np.array(test_float_vector), + packed_float_vector=test_float_vector, + packed_numpy_float_vector=np.array(test_float_vector, dtype=np.float32), ) await doc.save(refresh=True) docs = await Doc.search().execute() assert len(docs) == 1 - assert [round(v, 1) for v in docs[0].float_vector] == doc.float_vector - assert docs[0].byte_vector == doc.byte_vector - assert docs[0].bit_vector == doc.bit_vector + assert [round(v, 1) for v in docs[0].float_vector] == test_float_vector + assert docs[0].byte_vector == test_byte_vector + assert docs[0].bit_vector == test_bit_vector + assert type(docs[0].numpy_float_vector) is np.ndarray + assert [round(v, 1) for v in docs[0].numpy_float_vector] == test_float_vector + assert [round(v, 1) for v in docs[0].packed_float_vector] == test_float_vector + assert type(docs[0].packed_numpy_float_vector) is np.ndarray + assert [round(v, 1) for v in docs[0].packed_numpy_float_vector] == test_float_vector @pytest.mark.anyio diff --git a/test_elasticsearch/test_dsl/test_integration/_sync/test_document.py b/test_elasticsearch/test_dsl/test_integration/_sync/test_document.py index ce6ac03ad..57fde0d30 100644 --- a/test_elasticsearch/test_dsl/test_integration/_sync/test_document.py +++ b/test_elasticsearch/test_dsl/test_integration/_sync/test_document.py @@ -25,6 +25,7 @@ from ipaddress import ip_address from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union +import numpy as np import pytest from pytest import raises from pytz import timezone @@ -46,6 +47,7 @@ Mapping, MetaField, Nested, + NumpyDenseVector, Object, Q, RankFeatures, @@ -57,6 +59,7 @@ from elasticsearch.dsl.query import Match from elasticsearch.dsl.types import MatchQuery from elasticsearch.dsl.utils import AttrList +from elasticsearch.helpers import pack_dense_vector from elasticsearch.helpers.errors import BulkIndexError snowball = analyzer("my_snow", tokenizer="standard", filter=["lowercase", "snowball"]) @@ -853,25 +856,47 @@ class Doc(Document): float_vector: List[float] = mapped_field(DenseVector()) byte_vector: List[int] = mapped_field(DenseVector(element_type="byte")) bit_vector: List[int] = mapped_field(DenseVector(element_type="bit")) + numpy_float_vector: np.ndarray = mapped_field(NumpyDenseVector()) + packed_float_vector: List[float] = mapped_field(DenseVector()) + packed_numpy_float_vector: np.ndarray = mapped_field(NumpyDenseVector()) class Index: name = "vectors" + def clean(self): + # pack the dense vectors before they are sent to Elasticsearch + self.packed_float_vector = pack_dense_vector(self.packed_float_vector) + self.packed_numpy_float_vector = pack_dense_vector( + self.packed_numpy_float_vector + ) + Doc._index.delete(ignore_unavailable=True) Doc.init() + test_float_vector = [1.0, 1.2, 2.3] + test_byte_vector = [12, 23, 34, 45] + test_bit_vector = [18, -43, -112] + doc = Doc( - float_vector=[1.0, 1.2, 2.3], - byte_vector=[12, 23, 34, 45], - bit_vector=[18, -43, -112], + float_vector=test_float_vector, + byte_vector=test_byte_vector, + bit_vector=test_bit_vector, + numpy_float_vector=np.array(test_float_vector), + packed_float_vector=test_float_vector, + packed_numpy_float_vector=np.array(test_float_vector, dtype=np.float32), ) doc.save(refresh=True) docs = Doc.search().execute() assert len(docs) == 1 - assert [round(v, 1) for v in docs[0].float_vector] == doc.float_vector - assert docs[0].byte_vector == doc.byte_vector - assert docs[0].bit_vector == doc.bit_vector + assert [round(v, 1) for v in docs[0].float_vector] == test_float_vector + assert docs[0].byte_vector == test_byte_vector + assert docs[0].bit_vector == test_bit_vector + assert type(docs[0].numpy_float_vector) is np.ndarray + assert [round(v, 1) for v in docs[0].numpy_float_vector] == test_float_vector + assert [round(v, 1) for v in docs[0].packed_float_vector] == test_float_vector + assert type(docs[0].packed_numpy_float_vector) is np.ndarray + assert [round(v, 1) for v in docs[0].packed_numpy_float_vector] == test_float_vector @pytest.mark.sync diff --git a/utils/dense-vector-benchmark.py b/utils/dense-vector-benchmark.py new file mode 100644 index 000000000..eb483bb45 --- /dev/null +++ b/utils/dense-vector-benchmark.py @@ -0,0 +1,141 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import asyncio +import json +import os +import time + +import numpy as np + +from elasticsearch import OrjsonSerializer +from elasticsearch.dsl import AsyncDocument, NumpyDenseVector, async_connections +from elasticsearch.dsl.types import DenseVectorIndexOptions +from elasticsearch.helpers import async_bulk, pack_dense_vector + +async_connections.create_connection( + hosts=[os.environ["ELASTICSEARCH_URL"]], serializer=OrjsonSerializer() +) + + +class Doc(AsyncDocument): + title: str + text: str + emb: np.ndarray = NumpyDenseVector( + dtype=np.float32, index_options=DenseVectorIndexOptions(type="flat") + ) + + class Index: + name = "benchmark" + + +async def upload(data_file: str, chunk_size: int, pack: bool) -> tuple[float, float]: + with open(data_file, "rt") as f: + # read the data file, which comes in ndjson format and convert it to JSON + json_data = "[" + f.read().strip().replace("\n", ",") + "]" + dataset = json.loads(json_data) + + # replace the embedding lists with numpy arrays for performance + dataset = [ + { + "docid": doc["docid"], + "title": doc["title"], + "text": doc["text"], + "emb": np.array(doc["emb"], dtype=np.float32), + } + for doc in dataset + ] + + # create mapping and index + if await Doc._index.exists(): + await Doc._index.delete() + await Doc.init() + await Doc._index.refresh() + + async def get_next_document(): + for doc in dataset: + yield { + "_index": "benchmark", + "_id": doc["docid"], + "_source": { + "title": doc["title"], + "text": doc["text"], + "emb": doc["emb"], + }, + } + + async def get_next_document_packed(): + for doc in dataset: + yield { + "_index": "benchmark", + "_id": doc["docid"], + "_source": { + "title": doc["title"], + "text": doc["text"], + "emb": pack_dense_vector(doc["emb"]), + }, + } + + start = time.time() + result = await async_bulk( + client=async_connections.get_connection(), + chunk_size=chunk_size, + actions=get_next_document_packed() if pack else get_next_document(), + stats_only=True, + ) + duration = time.time() - start + assert result[1] == 0 + return result[0], duration + + +async def main(): + parser = argparse.ArgumentParser() + parser.add_argument("data_file", metavar="JSON_DATA_FILE") + parser.add_argument( + "--chunk-sizes", "-s", nargs="+", help="Chunk size(s) for bulk uploader" + ) + args = parser.parse_args() + + for chunk_size in args.chunk_sizes: + print(f"Uploading '{args.data_file}' with chunk size {chunk_size}...") + runs = [] + packed_runs = [] + for _ in range(3): + runs.append(await upload(args.data_file, chunk_size, False)) + packed_runs.append(await upload(args.data_file, chunk_size, True)) + + # ensure that all runs uploaded the same number of documents + size = runs[0][0] + for run in runs: + assert run[0] == size + for run in packed_runs: + assert run[0] == size + + dur = sum([run[1] for run in runs]) / len(runs) + packed_dur = sum([run[1] for run in packed_runs]) / len(packed_runs) + + print(f"Size: {size}") + print(f"float duration: {dur:.02f}s / {size / dur:.02f} docs/s") + print( + f"float base64 duration: {packed_dur:.02f}s / {size / packed_dur:.02f} docs/s" + ) + print(f"Speed up: {dur / packed_dur:.02f}x") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/utils/templates/field.py.tpl b/utils/templates/field.py.tpl index 43df1b5f0..29c1005c4 100644 --- a/utils/templates/field.py.tpl +++ b/utils/templates/field.py.tpl @@ -418,11 +418,30 @@ class {{ k.name }}({{ k.parent }}): kwargs["multi"] = True super().__init__(*args, **kwargs) - def _deserialize(self, data: Any) -> Any: - if self._element_type == "float": - return float(data) - elif self._element_type == "byte": - return int(data) +class NumpyDenseVector(DenseVector): + """A dense vector field that uses numpy arrays. + + Accepts the same arguments as class ``DenseVector`` plus: + + :arg dtype: The numpy data type to use for the array. If not given, numpy will select the type based on the data. + """ + def __init__(self, *args: Any, dtype: Optional[type] = None, **kwargs: Any): + super().__init__(*args, **kwargs) + self._dtype = dtype + + def deserialize(self, data: Any) -> Any: + if isinstance(data, list): + import numpy as np + return np.array(data, dtype=self._dtype) + return super().deserialize(data) + + def clean(self, data: Any) -> Any: + # this method does the same as the one in the parent classes, but it + # avoids comparisons that do not work for numpy arrays + if data is not None: + data = self.deserialize(data) + if (data is None or len(data) == 0) and self._required: + raise ValidationException("Value required for this field.") return data {% elif k.field == "scaled_float" %} if 'scaling_factor' not in kwargs: