diff --git a/docs/reference/dsl_how_to_guides.md b/docs/reference/dsl_how_to_guides.md index ce128528a..5f0884c3c 100644 --- a/docs/reference/dsl_how_to_guides.md +++ b/docs/reference/dsl_how_to_guides.md @@ -1425,6 +1425,127 @@ print(response.took) If you want to inspect the contents of the `response` objects, just use its `to_dict` method to get access to the raw data for pretty printing. +## ES|QL Queries + +When working with `Document` classes, you can use the ES|QL query language to retrieve documents. For this you can use the `esql_from()` and `esql_execute()` methods available to all sub-classes of `Document`. + +Consider the following `Employee` document definition: + +```python +from elasticsearch.dsl import Document, InnerDoc, M + +class Address(InnerDoc): + address: M[str] + city: M[str] + zip_code: M[str] + +class Employee(Document): + emp_no: M[int] + first_name: M[str] + last_name: M[str] + height: M[float] + still_hired: M[bool] + address: M[Address] + + class Index: + name = 'employees' +``` + +The `esql_from()` method creates a base ES|QL query for the index associated with the document class. The following example creates a base query for the `Employee` class: + +```python +query = Employee.esql_from() +``` + +This query includes a `FROM` command with the index name, and a `KEEP` command that retrieves all the document attributes. + +To execute this query and receive the results, you can pass the query to the `esql_execute()` method: + +```python +for emp in Employee.esql_execute(query): + print(f"{emp.name} from {emp.address.city} is {emp.height:.2f}m tall") +``` + +In this example, the `esql_execute()` class method runs the query and returns all the documents in the index, up to the maximum of 1000 results allowed by ES|QL. Here is a possible output from this example: + +``` +Kevin Macias from North Robert is 1.60m tall +Drew Harris from Boltonshire is 1.68m tall +Julie Williams from Maddoxshire is 1.99m tall +Christopher Jones from Stevenbury is 1.98m tall +Anthony Lopez from Port Sarahtown is 2.42m tall +Tricia Stone from North Sueshire is 2.39m tall +Katherine Ramirez from Kimberlyton is 1.83m tall +... +``` + +To search for specific documents you can extend the base query with additional ES|QL commands that narrow the search criteria. The next example searches for documents that include only employees that are taller than 2 meters, sorted by their last name. It also limits the results to 4 people: + +```python +query = ( + Employee.esql_from() + .where(Employee.height > 2) + .sort(Employee.last_name) + .limit(4) +) +``` + +When running this query with the same for-loop shown above, possible results would be: + +``` +Michael Adkins from North Stacey is 2.48m tall +Kimberly Allen from Toddside is 2.24m tall +Crystal Austin from East Michaelchester is 2.30m tall +Rebecca Berger from Lake Adrianside is 2.40m tall +``` + +### Additional fields + +ES|QL provides a few ways to add new fields to a query, for example through the `EVAL` command. The following example shows a query that adds an evaluated field: + +```python +from elasticsearch.esql import E, functions + +query = ( + Employee.esql_from() + .eval(height_cm=functions.round(Employee.height * 100)) + .where(E("height_cm") >= 200) + .sort(Employee.last_name) + .limit(10) +) +``` + +In this example we are adding the height in centimeters to the query, calculated from the `height` document field, which is in meters. The `height_cm` calculated field is available to use in other query clauses, and in particular is referenced in `where()` in this example. Note how the new field is given as `E("height_cm")` in this clause. The `E()` wrapper tells the query builder that the argument is an ES|QL field name and not a string literal. This is done automatically for document fields that are given as class attributes, such as `Employee.height` in the `eval()`. The `E()` wrapper is only needed for fields that are not in the document. + +By default, the `esql_execute()` method returns only document instances. To receive any additional fields that are not part of the document in the query results, the `return_additional=True` argument can be passed to it, and then the results are returned as tuples with the document as first element, and a dictionary with the additional fields as second element: + +```python +for emp, additional in Employee.esql_execute(query, return_additional=True): + print(emp.name, additional) +``` + +Example output from the query given above: + +``` +Michael Adkins {'height_cm': 248.0} +Kimberly Allen {'height_cm': 224.0} +Crystal Austin {'height_cm': 230.0} +Rebecca Berger {'height_cm': 240.0} +Katherine Blake {'height_cm': 214.0} +Edward Butler {'height_cm': 246.0} +Steven Carlson {'height_cm': 242.0} +Mark Carter {'height_cm': 240.0} +Joseph Castillo {'height_cm': 229.0} +Alexander Cohen {'height_cm': 245.0} +``` + +### Missing fields + +The base query returned by the `esql_from()` method includes a `KEEP` command with the complete list of fields that are part of the document. If any subsequent clauses added to the query remove fields that are part of the document, then the `esql_execute()` method will raise an exception, because it will not be able construct complete document instances to return as results. + +To prevent errors, it is recommended that the `keep()` and `drop()` clauses are not used when working with `Document` instances. + +If a query has missing fields, it can be forced to execute without errors by passing the `ignore_missing_fields=True` argument to `esql_execute()`. When this option is used, returned documents will have any missing fields set to `None`. ## Using asyncio with Elasticsearch Python DSL [asyncio] diff --git a/docs/reference/dsl_tutorials.md b/docs/reference/dsl_tutorials.md index 77992587b..16224a13f 100644 --- a/docs/reference/dsl_tutorials.md +++ b/docs/reference/dsl_tutorials.md @@ -83,7 +83,7 @@ Let’s have a simple Python class representing an article in a blogging system: ```python from datetime import datetime -from elasticsearch.dsl import Document, Date, Integer, Keyword, Text, connections +from elasticsearch.dsl import Document, Date, Integer, Keyword, Text, connections, mapped_field # Define a default Elasticsearch client connections.create_connection(hosts="https://localhost:9200") @@ -91,7 +91,7 @@ connections.create_connection(hosts="https://localhost:9200") class Article(Document): title: str = mapped_field(Text(analyzer='snowball', fields={'raw': Keyword()})) body: str = mapped_field(Text(analyzer='snowball')) - tags: str = mapped_field(Keyword()) + tags: list[str] = mapped_field(Keyword()) published_from: datetime lines: int @@ -216,6 +216,20 @@ response = ubq.execute() As you can see, the `Update By Query` object provides many of the savings offered by the `Search` object, and additionally allows one to update the results of the search based on a script assigned in the same manner. +## ES|QL Queries + +The DSL module features an integration with the ES|QL query builder, consisting of two methods available in all `Document` sub-classes: `esql_from()` and `esql_execute()`. Using the `Article` document from above, we can search for up to ten articles that include `"world"` in their titles with the following ES|QL query: + +```python +from elasticsearch.esql import functions + +query = Article.esql_from().where(functions.match(Article.title, 'world')).limit(10) +for a in Article.esql_execute(query): + print(a.title) +``` + +Review the [ES|QL Query Builder section](esql-query-builder.md) to learn more about building ES|QL queries in Python. + ## Migration from the standard client [_migration_from_the_standard_client] You don’t have to port your entire application to get the benefits of the DSL module, you can start gradually by creating a `Search` object from your existing `dict`, modifying it using the API and serializing it back to a `dict`: diff --git a/docs/reference/esql-query-builder.md b/docs/reference/esql-query-builder.md index 8390ea983..7ef9a0960 100644 --- a/docs/reference/esql-query-builder.md +++ b/docs/reference/esql-query-builder.md @@ -20,7 +20,7 @@ The ES|QL Query Builder allows you to construct ES|QL queries using Python synta You can then see the assembled ES|QL query by printing the resulting query object: ```python ->>> query +>>> print(query) FROM employees | SORT emp_no | KEEP first_name, last_name, height @@ -28,12 +28,12 @@ FROM employees | LIMIT 3 ``` -To execute this query, you can cast it to a string and pass the string to the `client.esql.query()` endpoint: +To execute this query, you can pass it to the `client.esql.query()` endpoint: ```python >>> from elasticsearch import Elasticsearch >>> client = Elasticsearch(hosts=[os.environ['ELASTICSEARCH_URL']]) ->>> response = client.esql.query(query=str(query)) +>>> response = client.esql.query(query=query) ``` The response body contains a `columns` attribute with the list of columns included in the results, and a `values` attribute with the list of results for the query, each given as a list of column values. Here is a possible response body returned by the example query given above: @@ -216,7 +216,7 @@ def find_employee_by_name(name): .keep("first_name", "last_name", "height") .where(E("first_name") == E("?")) ) - return client.esql.query(query=str(query), params=[name]) + return client.esql.query(query=query, params=[name]) ``` Here the part of the query in which the untrusted data needs to be inserted is replaced with a parameter, which in ES|QL is defined by the question mark. When using Python expressions, the parameter must be given as `E("?")` so that it is treated as an expression and not as a literal string. diff --git a/elasticsearch/dsl/_async/document.py b/elasticsearch/dsl/_async/document.py index de6e9eecc..53b4f12c3 100644 --- a/elasticsearch/dsl/_async/document.py +++ b/elasticsearch/dsl/_async/document.py @@ -20,6 +20,7 @@ TYPE_CHECKING, Any, AsyncIterable, + AsyncIterator, Dict, List, Optional, @@ -42,6 +43,7 @@ if TYPE_CHECKING: from elasticsearch import AsyncElasticsearch + from elasticsearch.esql.esql import ESQLBase class AsyncIndexMeta(DocumentMeta): @@ -520,3 +522,85 @@ async def __anext__(self) -> Dict[str, Any]: return action return await async_bulk(es, Generate(actions), **kwargs) + + @classmethod + async def esql_execute( + cls, + query: "ESQLBase", + return_additional: bool = False, + ignore_missing_fields: bool = False, + using: Optional[AsyncUsingType] = None, + **kwargs: Any, + ) -> AsyncIterator[Union[Self, Tuple[Self, Dict[str, Any]]]]: + """ + Execute the given ES|QL query and return an iterator of 2-element tuples, + where the first element is an instance of this ``Document`` and the + second a dictionary with any remaining columns requested in the query. + + :arg query: an ES|QL query object created with the ``esql_from()`` method. + :arg return_additional: if ``False`` (the default), this method returns + document objects. If set to ``True``, the method returns tuples with + a document in the first element and a dictionary with any additional + columns returned by the query in the second element. + :arg ignore_missing_fields: if ``False`` (the default), all the fields of + the document must be present in the query, or else an exception is + raised. Set to ``True`` to allow missing fields, which will result in + partially initialized document objects. + :arg using: connection alias to use, defaults to ``'default'`` + :arg kwargs: additional options for the ``client.esql.query()`` function. + """ + es = cls._get_connection(using) + response = await es.esql.query(query=str(query), **kwargs) + query_columns = [col["name"] for col in response.body.get("columns", [])] + + # Here we get the list of columns defined in the document, which are the + # columns that we will take from each result to assemble the document + # object. + # When `for_esql=False` is passed below by default, the list will include + # nested fields, which ES|QL does not return, causing an error. When passing + # `ignore_missing_fields=True` the list will be generated with + # `for_esql=True`, so the error will not occur, but the documents will + # not have any Nested objects in them. + doc_fields = set(cls._get_field_names(for_esql=ignore_missing_fields)) + if not ignore_missing_fields and not doc_fields.issubset(set(query_columns)): + raise ValueError( + f"Not all fields of {cls.__name__} were returned by the query. " + "Make sure your document does not use Nested fields, which are " + "currently not supported in ES|QL. To force the query to be " + "evaluated in spite of the missing fields, pass set the " + "ignore_missing_fields=True option in the esql_execute() call." + ) + non_doc_fields: set[str] = set(query_columns) - doc_fields - {"_id"} + index_id = query_columns.index("_id") + + results = response.body.get("values", []) + for column_values in results: + # create a dictionary with all the document fields, expanding the + # dot notation returned by ES|QL into the recursive dictionaries + # used by Document.from_dict() + doc_dict: Dict[str, Any] = {} + for col, val in zip(query_columns, column_values): + if col in doc_fields: + cols = col.split(".") + d = doc_dict + for c in cols[:-1]: + if c not in d: + d[c] = {} + d = d[c] + d[cols[-1]] = val + + # create the document instance + obj = cls(meta={"_id": column_values[index_id]}) + obj._from_dict(doc_dict) + + if return_additional: + # build a dict with any other values included in the response + other = { + col: val + for col, val in zip(query_columns, column_values) + if col in non_doc_fields + } + + yield obj, other + else: + yield obj diff --git a/elasticsearch/dsl/_sync/document.py b/elasticsearch/dsl/_sync/document.py index f68be4aae..07bda6ec1 100644 --- a/elasticsearch/dsl/_sync/document.py +++ b/elasticsearch/dsl/_sync/document.py @@ -21,6 +21,7 @@ Any, Dict, Iterable, + Iterator, List, Optional, Tuple, @@ -42,6 +43,7 @@ if TYPE_CHECKING: from elasticsearch import Elasticsearch + from elasticsearch.esql.esql import ESQLBase class IndexMeta(DocumentMeta): @@ -512,3 +514,85 @@ def __next__(self) -> Dict[str, Any]: return action return bulk(es, Generate(actions), **kwargs) + + @classmethod + def esql_execute( + cls, + query: "ESQLBase", + return_additional: bool = False, + ignore_missing_fields: bool = False, + using: Optional[UsingType] = None, + **kwargs: Any, + ) -> Iterator[Union[Self, Tuple[Self, Dict[str, Any]]]]: + """ + Execute the given ES|QL query and return an iterator of 2-element tuples, + where the first element is an instance of this ``Document`` and the + second a dictionary with any remaining columns requested in the query. + + :arg query: an ES|QL query object created with the ``esql_from()`` method. + :arg return_additional: if ``False`` (the default), this method returns + document objects. If set to ``True``, the method returns tuples with + a document in the first element and a dictionary with any additional + columns returned by the query in the second element. + :arg ignore_missing_fields: if ``False`` (the default), all the fields of + the document must be present in the query, or else an exception is + raised. Set to ``True`` to allow missing fields, which will result in + partially initialized document objects. + :arg using: connection alias to use, defaults to ``'default'`` + :arg kwargs: additional options for the ``client.esql.query()`` function. + """ + es = cls._get_connection(using) + response = es.esql.query(query=str(query), **kwargs) + query_columns = [col["name"] for col in response.body.get("columns", [])] + + # Here we get the list of columns defined in the document, which are the + # columns that we will take from each result to assemble the document + # object. + # When `for_esql=False` is passed below by default, the list will include + # nested fields, which ES|QL does not return, causing an error. When passing + # `ignore_missing_fields=True` the list will be generated with + # `for_esql=True`, so the error will not occur, but the documents will + # not have any Nested objects in them. + doc_fields = set(cls._get_field_names(for_esql=ignore_missing_fields)) + if not ignore_missing_fields and not doc_fields.issubset(set(query_columns)): + raise ValueError( + f"Not all fields of {cls.__name__} were returned by the query. " + "Make sure your document does not use Nested fields, which are " + "currently not supported in ES|QL. To force the query to be " + "evaluated in spite of the missing fields, pass set the " + "ignore_missing_fields=True option in the esql_execute() call." + ) + non_doc_fields: set[str] = set(query_columns) - doc_fields - {"_id"} + index_id = query_columns.index("_id") + + results = response.body.get("values", []) + for column_values in results: + # create a dictionary with all the document fields, expanding the + # dot notation returned by ES|QL into the recursive dictionaries + # used by Document.from_dict() + doc_dict: Dict[str, Any] = {} + for col, val in zip(query_columns, column_values): + if col in doc_fields: + cols = col.split(".") + d = doc_dict + for c in cols[:-1]: + if c not in d: + d[c] = {} + d = d[c] + d[cols[-1]] = val + + # create the document instance + obj = cls(meta={"_id": column_values[index_id]}) + obj._from_dict(doc_dict) + + if return_additional: + # build a dict with any other values included in the response + other = { + col: val + for col, val in zip(query_columns, column_values) + if col in non_doc_fields + } + + yield obj, other + else: + yield obj diff --git a/elasticsearch/dsl/document_base.py b/elasticsearch/dsl/document_base.py index 09da7d459..4df900a39 100644 --- a/elasticsearch/dsl/document_base.py +++ b/elasticsearch/dsl/document_base.py @@ -49,6 +49,7 @@ if TYPE_CHECKING: from elastic_transport import ObjectApiResponse + from ..esql.esql import ESQLBase from .index_base import IndexBase @@ -602,3 +603,44 @@ def to_dict(self, include_meta: bool = False, skip_empty: bool = True) -> Dict[s meta["_source"] = d return meta + + @classmethod + def _get_field_names( + cls, for_esql: bool = False, nested_class: Optional[type[InnerDoc]] = None + ) -> List[str]: + """Return the list of field names used by this document. + If the document has nested objects, their fields are reported using dot + notation. If the ``for_esql`` argument is set to ``True``, the list omits + nested fields, which are currently unsupported in ES|QL. + """ + fields = [] + class_ = nested_class or cls + for field_name in class_._doc_type.mapping: + field = class_._doc_type.mapping[field_name] + if isinstance(field, Object): + if for_esql and isinstance(field, Nested): + # ES|QL does not recognize Nested fields at this time + continue + sub_fields = cls._get_field_names( + for_esql=for_esql, nested_class=field._doc_class + ) + for sub_field in sub_fields: + fields.append(f"{field_name}.{sub_field}") + else: + fields.append(field_name) + return fields + + @classmethod + def esql_from(cls) -> "ESQLBase": + """Return a base ES|QL query for instances of this document class. + + The returned query is initialized with ``FROM`` and ``KEEP`` statements, + and can be completed as desired. + """ + from ..esql import ESQL # here to avoid circular imports + + return ( + ESQL.from_(cls) + .metadata("_id") + .keep("_id", *tuple(cls._get_field_names(for_esql=True))) + ) diff --git a/elasticsearch/esql/__init__.py b/elasticsearch/esql/__init__.py index 8da8f852a..4a843ad59 100644 --- a/elasticsearch/esql/__init__.py +++ b/elasticsearch/esql/__init__.py @@ -16,4 +16,4 @@ # under the License. from ..dsl import E # noqa: F401 -from .esql import ESQL, and_, not_, or_ # noqa: F401 +from .esql import ESQL, ESQLBase, and_, not_, or_ # noqa: F401 diff --git a/elasticsearch/esql/functions.py b/elasticsearch/esql/functions.py index 91f18d2d8..ada6c2911 100644 --- a/elasticsearch/esql/functions.py +++ b/elasticsearch/esql/functions.py @@ -661,11 +661,11 @@ def multi_match( """ if options is not None: return InstrumentedExpression( - f"MULTI_MATCH({_render(query)}, {_render(fields)}, {_render(options)})" + f'MULTI_MATCH({_render(query)}, {", ".join([_render(c) for c in fields])}, {_render(options)})' ) else: return InstrumentedExpression( - f"MULTI_MATCH({_render(query)}, {_render(fields)})" + f'MULTI_MATCH({_render(query)}, {", ".join([_render(c) for c in fields])})' ) diff --git a/examples/dsl/async/esql_employees.py b/examples/dsl/async/esql_employees.py new file mode 100644 index 000000000..986c84235 --- /dev/null +++ b/examples/dsl/async/esql_employees.py @@ -0,0 +1,170 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +# ES|QL query builder example + +Requirements: + +$ pip install "elasticsearch[async]" faker + +To run the example: + +$ python esql_employees.py "name to search" + +The index will be created automatically with a list of 1000 randomly generated +employees if it does not exist. Add `--recreate-index` or `-r` to the command +to regenerate it. + +Examples: + +$ python esql_employees "Mark" # employees named Mark (first or last names) +$ python esql_employees "Sarah" --limit 10 # up to 10 employees named Sarah +$ python esql_employees "Sam" --sort height # sort results by height +$ python esql_employees "Sam" --sort name # sort results by last name +""" + +import argparse +import asyncio +import os +import random + +from faker import Faker + +from elasticsearch.dsl import AsyncDocument, InnerDoc, M, async_connections +from elasticsearch.esql import ESQLBase +from elasticsearch.esql.functions import concat, multi_match + +fake = Faker() + + +class Address(InnerDoc): + address: M[str] + city: M[str] + zip_code: M[str] + + +class Employee(AsyncDocument): + emp_no: M[int] + first_name: M[str] + last_name: M[str] + height: M[float] + still_hired: M[bool] + address: M[Address] + + class Index: + name = "employees" + + @property + def name(self) -> str: + return f"{self.first_name} {self.last_name}" + + def __repr__(self) -> str: + return f"" + + +async def create(num_employees: int = 1000) -> None: + print("Creating a new employee index...") + if await Employee._index.exists(): + await Employee._index.delete() + await Employee.init() + + for i in range(num_employees): + address = Address( + address=fake.address(), city=fake.city(), zip_code=fake.zipcode() + ) + emp = Employee( + emp_no=10000 + i, + first_name=fake.first_name(), + last_name=fake.last_name(), + height=int((random.random() * 0.8 + 1.5) * 1000) / 1000, + still_hired=random.random() >= 0.5, + address=address, + ) + await emp.save() + await Employee._index.refresh() + + +async def search(query: str, limit: int, sort: str) -> None: + q: ESQLBase = ( + Employee.esql_from() + .where(multi_match(query, Employee.first_name, Employee.last_name)) + .eval(full_name=concat(Employee.first_name, " ", Employee.last_name)) + ) + if sort == "height": + q = q.sort(Employee.height.desc()) + elif sort == "name": + q = q.sort(Employee.last_name.asc()) + q = q.limit(limit) + async for result in Employee.esql_execute(q, return_additional=True): + assert type(result) == tuple + employee = result[0] + full_name = result[1]["full_name"] + print( + f"{full_name:<20}", + f"{'Hired' if employee.still_hired else 'Not hired':<10}", + f"{employee.height:5.2f}m", + f"{employee.address.city:<20}", + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Employee ES|QL example") + parser.add_argument( + "--recreate-index", + "-r", + action="store_true", + help="Recreate and populate the index", + ) + parser.add_argument( + "--limit", + action="store", + type=int, + default=100, + help="Maximum number or employees to return (default: 100)", + ) + parser.add_argument( + "--sort", + action="store", + type=str, + default=None, + help='Sort by "name" (ascending) or by "height" (descending) (default: no sorting)', + ) + parser.add_argument( + "query", action="store", help="The name or partial name to search for" + ) + return parser.parse_args() + + +async def main() -> None: + args = parse_args() + + # initiate the default connection to elasticsearch + async_connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]]) + + if args.recreate_index or not await Employee._index.exists(): + await create() + await Employee.init() + + await search(args.query, args.limit, args.sort) + + # close the connection + await async_connections.get_connection().close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/dsl/esql_employees.py b/examples/dsl/esql_employees.py new file mode 100644 index 000000000..364a2c2d5 --- /dev/null +++ b/examples/dsl/esql_employees.py @@ -0,0 +1,169 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +# ES|QL query builder example + +Requirements: + +$ pip install elasticsearch faker + +To run the example: + +$ python esql_employees.py "name to search" + +The index will be created automatically with a list of 1000 randomly generated +employees if it does not exist. Add `--recreate-index` or `-r` to the command +to regenerate it. + +Examples: + +$ python esql_employees "Mark" # employees named Mark (first or last names) +$ python esql_employees "Sarah" --limit 10 # up to 10 employees named Sarah +$ python esql_employees "Sam" --sort height # sort results by height +$ python esql_employees "Sam" --sort name # sort results by last name +""" + +import argparse +import os +import random + +from faker import Faker + +from elasticsearch.dsl import Document, InnerDoc, M, connections +from elasticsearch.esql import ESQLBase +from elasticsearch.esql.functions import concat, multi_match + +fake = Faker() + + +class Address(InnerDoc): + address: M[str] + city: M[str] + zip_code: M[str] + + +class Employee(Document): + emp_no: M[int] + first_name: M[str] + last_name: M[str] + height: M[float] + still_hired: M[bool] + address: M[Address] + + class Index: + name = "employees" + + @property + def name(self) -> str: + return f"{self.first_name} {self.last_name}" + + def __repr__(self) -> str: + return f"" + + +def create(num_employees: int = 1000) -> None: + print("Creating a new employee index...") + if Employee._index.exists(): + Employee._index.delete() + Employee.init() + + for i in range(num_employees): + address = Address( + address=fake.address(), city=fake.city(), zip_code=fake.zipcode() + ) + emp = Employee( + emp_no=10000 + i, + first_name=fake.first_name(), + last_name=fake.last_name(), + height=int((random.random() * 0.8 + 1.5) * 1000) / 1000, + still_hired=random.random() >= 0.5, + address=address, + ) + emp.save() + Employee._index.refresh() + + +def search(query: str, limit: int, sort: str) -> None: + q: ESQLBase = ( + Employee.esql_from() + .where(multi_match(query, Employee.first_name, Employee.last_name)) + .eval(full_name=concat(Employee.first_name, " ", Employee.last_name)) + ) + if sort == "height": + q = q.sort(Employee.height.desc()) + elif sort == "name": + q = q.sort(Employee.last_name.asc()) + q = q.limit(limit) + for result in Employee.esql_execute(q, return_additional=True): + assert type(result) == tuple + employee = result[0] + full_name = result[1]["full_name"] + print( + f"{full_name:<20}", + f"{'Hired' if employee.still_hired else 'Not hired':<10}", + f"{employee.height:5.2f}m", + f"{employee.address.city:<20}", + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Employee ES|QL example") + parser.add_argument( + "--recreate-index", + "-r", + action="store_true", + help="Recreate and populate the index", + ) + parser.add_argument( + "--limit", + action="store", + type=int, + default=100, + help="Maximum number or employees to return (default: 100)", + ) + parser.add_argument( + "--sort", + action="store", + type=str, + default=None, + help='Sort by "name" (ascending) or by "height" (descending) (default: no sorting)', + ) + parser.add_argument( + "query", action="store", help="The name or partial name to search for" + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + + # initiate the default connection to elasticsearch + connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]]) + + if args.recreate_index or not Employee._index.exists(): + create() + Employee.init() + + search(args.query, args.limit, args.sort) + + # close the connection + connections.get_connection().close() + + +if __name__ == "__main__": + main() diff --git a/examples/dsl/semantic_text.py b/examples/dsl/semantic_text.py index 8d552a2aa..6f4bb8f7d 100644 --- a/examples/dsl/semantic_text.py +++ b/examples/dsl/semantic_text.py @@ -21,7 +21,7 @@ Requirements: -$ pip install "elasticsearch" tqdm +$ pip install elasticsearch tqdm Before running this example, an ELSER inference endpoint must be created in the Elasticsearch cluster. This can be done manually from Kibana, or with the diff --git a/examples/dsl/sparse_vectors.py b/examples/dsl/sparse_vectors.py index a92e82026..c328769eb 100644 --- a/examples/dsl/sparse_vectors.py +++ b/examples/dsl/sparse_vectors.py @@ -20,7 +20,7 @@ Requirements: -$ pip install nltk tqdm "elasticsearch" +$ pip install nltk tqdm elasticsearch Before running this example, the ELSER v2 model must be downloaded and deployed to the Elasticsearch cluster, and an ingest pipeline must be defined. This can diff --git a/examples/dsl/vectors.py b/examples/dsl/vectors.py index 3afd76991..b4c700b71 100644 --- a/examples/dsl/vectors.py +++ b/examples/dsl/vectors.py @@ -20,7 +20,7 @@ Requirements: -$ pip install nltk sentence_transformers tqdm "elasticsearch" +$ pip install nltk sentence_transformers tqdm elasticsearch To run the example: diff --git a/test_elasticsearch/test_dsl/test_integration/_async/test_esql.py b/test_elasticsearch/test_dsl/test_integration/_async/test_esql.py index 27d26ca99..ae99873f8 100644 --- a/test_elasticsearch/test_dsl/test_integration/_async/test_esql.py +++ b/test_elasticsearch/test_dsl/test_integration/_async/test_esql.py @@ -17,8 +17,13 @@ import pytest -from elasticsearch.dsl import AsyncDocument, E, M -from elasticsearch.esql import ESQL, functions +from elasticsearch.dsl import AsyncDocument, InnerDoc, M +from elasticsearch.esql import ESQL, E, functions + + +class Address(InnerDoc): + address: M[str] + city: M[str] class Employee(AsyncDocument): @@ -27,6 +32,7 @@ class Employee(AsyncDocument): last_name: M[str] height: M[float] still_hired: M[bool] + address: M[Address] class Index: name = "employees" @@ -34,16 +40,86 @@ class Index: async def load_db(): data = [ - [10000, "Joseph", "Wall", 2.2, True], - [10001, "Stephanie", "Ward", 1.749, True], - [10002, "David", "Keller", 1.872, True], - [10003, "Roger", "Hinton", 1.694, False], - [10004, "Joshua", "Garcia", 1.661, False], - [10005, "Matthew", "Richards", 1.633, False], - [10006, "Maria", "Luna", 1.893, True], - [10007, "Angela", "Navarro", 1.604, False], - [10008, "Maria", "Cannon", 2.079, False], - [10009, "Joseph", "Sutton", 2.025, True], + [ + 10000, + "Joseph", + "Wall", + 2.2, + True, + Address(address="8875 Long Shoals Suite 441", city="Marcville, TX"), + ], + [ + 10001, + "Stephanie", + "Ward", + 1.749, + True, + Address(address="90162 Carter Harbor Suite 099", city="Davisborough, DE"), + ], + [ + 10002, + "David", + "Keller", + 1.872, + True, + Address(address="6697 Patrick Union Suite 797", city="Fuentesmouth, SD"), + ], + [ + 10003, + "Roger", + "Hinton", + 1.694, + False, + Address(address="809 Kelly Mountains", city="South Megan, DE"), + ], + [ + 10004, + "Joshua", + "Garcia", + 1.661, + False, + Address(address="718 Angela Forks", city="Port Erinland, MA"), + ], + [ + 10005, + "Matthew", + "Richards", + 1.633, + False, + Address(address="2869 Brown Mountains", city="New Debra, NH"), + ], + [ + 10006, + "Maria", + "Luna", + 1.893, + True, + Address(address="5861 Morgan Springs", city="Lake Daniel, WI"), + ], + [ + 10007, + "Angela", + "Navarro", + 1.604, + False, + Address(address="2848 Allen Station", city="Saint Joseph, OR"), + ], + [ + 10008, + "Maria", + "Cannon", + 2.079, + False, + Address(address="322 NW Johnston", city="Bakerburgh, MP"), + ], + [ + 10009, + "Joseph", + "Sutton", + 2.025, + True, + Address(address="77 Cardinal E", city="Lakestown, IL"), + ], ] if await Employee._index.exists(): await Employee._index.delete() @@ -51,7 +127,12 @@ async def load_db(): for e in data: employee = Employee( - emp_no=e[0], first_name=e[1], last_name=e[2], height=e[3], still_hired=e[4] + emp_no=e[0], + first_name=e[1], + last_name=e[2], + height=e[3], + still_hired=e[4], + address=e[5], ) await employee.save() await Employee._index.refresh() @@ -64,9 +145,9 @@ async def test_esql(async_client): # get the full names of the employees query = ( ESQL.from_(Employee) - .eval(name=functions.concat(Employee.first_name, " ", Employee.last_name)) - .keep("name") - .sort("name") + .eval(full_name=functions.concat(Employee.first_name, " ", Employee.last_name)) + .keep("full_name") + .sort("full_name") .limit(10) ) r = await async_client.esql.query(query=str(query)) @@ -101,3 +182,73 @@ async def test_esql(async_client): ) r = await async_client.esql.query(query=str(query), params=["Maria"]) assert r.body["values"] == [["Luna"], ["Cannon"]] + + +@pytest.mark.asyncio +async def test_esql_dsl(async_client): + await load_db() + + # get employees with first name "Maria" + query = ( + Employee.esql_from() + .where(Employee.first_name == "Maria") + .sort("last_name") + .limit(10) + ) + marias = [] + async for emp in Employee.esql_execute(query): + marias.append(emp) + assert len(marias) == 2 + assert marias[0].last_name == "Cannon" + assert marias[0].address.address == "322 NW Johnston" + assert marias[0].address.city == "Bakerburgh, MP" + assert marias[1].last_name == "Luna" + assert marias[1].address.address == "5861 Morgan Springs" + assert marias[1].address.city == "Lake Daniel, WI" + + # run a query with a missing field + query = ( + Employee.esql_from() + .where(Employee.first_name == "Maria") + .drop(Employee.address.city) + .sort("last_name") + .limit(10) + ) + with pytest.raises(ValueError): + await Employee.esql_execute(query).__anext__() + marias = [] + async for emp in Employee.esql_execute(query, ignore_missing_fields=True): + marias.append(emp) + assert marias[0].last_name == "Cannon" + assert marias[0].address.address == "322 NW Johnston" + assert marias[0].address.city is None + assert marias[1].last_name == "Luna" + assert marias[1].address.address == "5861 Morgan Springs" + assert marias[1].address.city is None + + # run a query with additional calculated fields + query = ( + Employee.esql_from() + .where(Employee.first_name == "Maria") + .eval( + full_name=functions.concat(Employee.first_name, " ", Employee.last_name), + height_cm=functions.to_integer(Employee.height * 100), + ) + .sort("last_name") + .limit(10) + ) + assert isinstance(await Employee.esql_execute(query).__anext__(), Employee) + assert isinstance( + await Employee.esql_execute(query, return_additional=True).__anext__(), tuple + ) + marias = [] + async for emp, extra in Employee.esql_execute(query, return_additional=True): + marias.append([emp, extra]) + assert marias[0][0].last_name == "Cannon" + assert marias[0][0].address.address == "322 NW Johnston" + assert marias[0][0].address.city == "Bakerburgh, MP" + assert marias[0][1] == {"full_name": "Maria Cannon", "height_cm": 208} + assert marias[1][0].last_name == "Luna" + assert marias[1][0].address.address == "5861 Morgan Springs" + assert marias[1][0].address.city == "Lake Daniel, WI" + assert marias[1][1] == {"full_name": "Maria Luna", "height_cm": 189} diff --git a/test_elasticsearch/test_dsl/test_integration/_sync/test_esql.py b/test_elasticsearch/test_dsl/test_integration/_sync/test_esql.py index 85ceee5ae..d02484013 100644 --- a/test_elasticsearch/test_dsl/test_integration/_sync/test_esql.py +++ b/test_elasticsearch/test_dsl/test_integration/_sync/test_esql.py @@ -17,8 +17,13 @@ import pytest -from elasticsearch.dsl import Document, E, M -from elasticsearch.esql import ESQL, functions +from elasticsearch.dsl import Document, InnerDoc, M +from elasticsearch.esql import ESQL, E, functions + + +class Address(InnerDoc): + address: M[str] + city: M[str] class Employee(Document): @@ -27,6 +32,7 @@ class Employee(Document): last_name: M[str] height: M[float] still_hired: M[bool] + address: M[Address] class Index: name = "employees" @@ -34,16 +40,86 @@ class Index: def load_db(): data = [ - [10000, "Joseph", "Wall", 2.2, True], - [10001, "Stephanie", "Ward", 1.749, True], - [10002, "David", "Keller", 1.872, True], - [10003, "Roger", "Hinton", 1.694, False], - [10004, "Joshua", "Garcia", 1.661, False], - [10005, "Matthew", "Richards", 1.633, False], - [10006, "Maria", "Luna", 1.893, True], - [10007, "Angela", "Navarro", 1.604, False], - [10008, "Maria", "Cannon", 2.079, False], - [10009, "Joseph", "Sutton", 2.025, True], + [ + 10000, + "Joseph", + "Wall", + 2.2, + True, + Address(address="8875 Long Shoals Suite 441", city="Marcville, TX"), + ], + [ + 10001, + "Stephanie", + "Ward", + 1.749, + True, + Address(address="90162 Carter Harbor Suite 099", city="Davisborough, DE"), + ], + [ + 10002, + "David", + "Keller", + 1.872, + True, + Address(address="6697 Patrick Union Suite 797", city="Fuentesmouth, SD"), + ], + [ + 10003, + "Roger", + "Hinton", + 1.694, + False, + Address(address="809 Kelly Mountains", city="South Megan, DE"), + ], + [ + 10004, + "Joshua", + "Garcia", + 1.661, + False, + Address(address="718 Angela Forks", city="Port Erinland, MA"), + ], + [ + 10005, + "Matthew", + "Richards", + 1.633, + False, + Address(address="2869 Brown Mountains", city="New Debra, NH"), + ], + [ + 10006, + "Maria", + "Luna", + 1.893, + True, + Address(address="5861 Morgan Springs", city="Lake Daniel, WI"), + ], + [ + 10007, + "Angela", + "Navarro", + 1.604, + False, + Address(address="2848 Allen Station", city="Saint Joseph, OR"), + ], + [ + 10008, + "Maria", + "Cannon", + 2.079, + False, + Address(address="322 NW Johnston", city="Bakerburgh, MP"), + ], + [ + 10009, + "Joseph", + "Sutton", + 2.025, + True, + Address(address="77 Cardinal E", city="Lakestown, IL"), + ], ] if Employee._index.exists(): Employee._index.delete() @@ -51,7 +127,12 @@ def load_db(): for e in data: employee = Employee( - emp_no=e[0], first_name=e[1], last_name=e[2], height=e[3], still_hired=e[4] + emp_no=e[0], + first_name=e[1], + last_name=e[2], + height=e[3], + still_hired=e[4], + address=e[5], ) employee.save() Employee._index.refresh() @@ -64,9 +145,9 @@ def test_esql(client): # get the full names of the employees query = ( ESQL.from_(Employee) - .eval(name=functions.concat(Employee.first_name, " ", Employee.last_name)) - .keep("name") - .sort("name") + .eval(full_name=functions.concat(Employee.first_name, " ", Employee.last_name)) + .keep("full_name") + .sort("full_name") .limit(10) ) r = client.esql.query(query=str(query)) @@ -101,3 +182,73 @@ def test_esql(client): ) r = client.esql.query(query=str(query), params=["Maria"]) assert r.body["values"] == [["Luna"], ["Cannon"]] + + +@pytest.mark.sync +def test_esql_dsl(client): + load_db() + + # get employees with first name "Maria" + query = ( + Employee.esql_from() + .where(Employee.first_name == "Maria") + .sort("last_name") + .limit(10) + ) + marias = [] + for emp in Employee.esql_execute(query): + marias.append(emp) + assert len(marias) == 2 + assert marias[0].last_name == "Cannon" + assert marias[0].address.address == "322 NW Johnston" + assert marias[0].address.city == "Bakerburgh, MP" + assert marias[1].last_name == "Luna" + assert marias[1].address.address == "5861 Morgan Springs" + assert marias[1].address.city == "Lake Daniel, WI" + + # run a query with a missing field + query = ( + Employee.esql_from() + .where(Employee.first_name == "Maria") + .drop(Employee.address.city) + .sort("last_name") + .limit(10) + ) + with pytest.raises(ValueError): + Employee.esql_execute(query).__next__() + marias = [] + for emp in Employee.esql_execute(query, ignore_missing_fields=True): + marias.append(emp) + assert marias[0].last_name == "Cannon" + assert marias[0].address.address == "322 NW Johnston" + assert marias[0].address.city is None + assert marias[1].last_name == "Luna" + assert marias[1].address.address == "5861 Morgan Springs" + assert marias[1].address.city is None + + # run a query with additional calculated fields + query = ( + Employee.esql_from() + .where(Employee.first_name == "Maria") + .eval( + full_name=functions.concat(Employee.first_name, " ", Employee.last_name), + height_cm=functions.to_integer(Employee.height * 100), + ) + .sort("last_name") + .limit(10) + ) + assert isinstance(Employee.esql_execute(query).__next__(), Employee) + assert isinstance( + Employee.esql_execute(query, return_additional=True).__next__(), tuple + ) + marias = [] + for emp, extra in Employee.esql_execute(query, return_additional=True): + marias.append([emp, extra]) + assert marias[0][0].last_name == "Cannon" + assert marias[0][0].address.address == "322 NW Johnston" + assert marias[0][0].address.city == "Bakerburgh, MP" + assert marias[0][1] == {"full_name": "Maria Cannon", "height_cm": 208} + assert marias[1][0].last_name == "Luna" + assert marias[1][0].address.address == "5861 Morgan Springs" + assert marias[1][0].address.city == "Lake Daniel, WI" + assert marias[1][1] == {"full_name": "Maria Luna", "height_cm": 189} diff --git a/utils/run-unasync-dsl.py b/utils/run-unasync-dsl.py index 59c0b05bc..b74c748fa 100644 --- a/utils/run-unasync-dsl.py +++ b/utils/run-unasync-dsl.py @@ -121,7 +121,7 @@ def main(check=False): [ "sed", "-i.bak", - "s/elasticsearch\\[async\\]/elasticsearch/", + 's/"elasticsearch\\[async\\]"/elasticsearch/', f"{output_dir}{file}", ] )