Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion elasticsearch_dsl/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
# specific language governing permissions and limitations
# under the License.

from elasticsearch import Elasticsearch
from elasticsearch import Elasticsearch, __version__
from six import string_types

from .serializer import serializer

# The 'body' parameter was deprecated in favor of named
# body parameters in version 7.15.0 of the client. The relevant APIs
# affected include 'search', 'index', 'update', and 'indices.create'
CLIENT_HAS_NAMED_BODY_PARAMS = __version__ >= (7, 15, 0)


class Connections(object):
"""
Expand Down
38 changes: 23 additions & 15 deletions elasticsearch_dsl/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from elasticsearch.exceptions import NotFoundError, RequestError
from six import add_metaclass, iteritems

from .connections import get_connection
from .connections import CLIENT_HAS_NAMED_BODY_PARAMS, get_connection
from .exceptions import IllegalOperation, ValidationException
from .field import Field
from .index import Index
Expand Down Expand Up @@ -416,22 +416,29 @@ def update(
body["doc"] = {k: values.get(k) for k in fields.keys()}

# extract routing etc from meta
doc_meta = {k: self.meta[k] for k in DOC_META_FIELDS if k in self.meta}
params = {k: self.meta[k] for k in DOC_META_FIELDS if k in self.meta}

if retry_on_conflict is not None:
doc_meta["retry_on_conflict"] = retry_on_conflict
params["retry_on_conflict"] = retry_on_conflict

# Optimistic concurrency control
if (
retry_on_conflict in (None, 0)
and "seq_no" in self.meta
and "primary_term" in self.meta
):
doc_meta["if_seq_no"] = self.meta["seq_no"]
doc_meta["if_primary_term"] = self.meta["primary_term"]
params["if_seq_no"] = self.meta["seq_no"]
params["if_primary_term"] = self.meta["primary_term"]

params["refresh"] = refresh

if CLIENT_HAS_NAMED_BODY_PARAMS:
params.update(body)
else:
params["body"] = body

meta = self._get_connection(using).update(
index=self._get_index(index), body=body, refresh=refresh, **doc_meta
index=self._get_index(index), **params
)
# update meta information from ES
for k in META_FIELDS:
Expand Down Expand Up @@ -474,19 +481,20 @@ def save(

es = self._get_connection(using)
# extract routing etc from meta
doc_meta = {k: self.meta[k] for k in DOC_META_FIELDS if k in self.meta}
params = {k: self.meta[k] for k in DOC_META_FIELDS if k in self.meta}

# Optimistic concurrency control
if "seq_no" in self.meta and "primary_term" in self.meta:
doc_meta["if_seq_no"] = self.meta["seq_no"]
doc_meta["if_primary_term"] = self.meta["primary_term"]
params["if_seq_no"] = self.meta["seq_no"]
params["if_primary_term"] = self.meta["primary_term"]

doc_meta.update(kwargs)
meta = es.index(
index=self._get_index(index),
body=self.to_dict(skip_empty=skip_empty),
**doc_meta
)
if CLIENT_HAS_NAMED_BODY_PARAMS:
params["document"] = self.to_dict(skip_empty=skip_empty)
else:
params["body"] = self.to_dict(skip_empty=skip_empty)

params.update(kwargs)
meta = es.index(index=self._get_index(index), **params)
# update meta information from ES
for k in META_FIELDS:
if "_" + k in meta:
Expand Down
14 changes: 10 additions & 4 deletions elasticsearch_dsl/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

from . import analysis
from .connections import get_connection
from .connections import CLIENT_HAS_NAMED_BODY_PARAMS, get_connection
from .exceptions import IllegalOperation
from .mapping import Mapping
from .search import Search
Expand Down Expand Up @@ -276,9 +276,15 @@ def create(self, using=None, **kwargs):
Any additional keyword arguments will be passed to
``Elasticsearch.indices.create`` unchanged.
"""
return self._get_connection(using).indices.create(
index=self._name, body=self.to_dict(), **kwargs
)
es = self._get_connection(using)

if CLIENT_HAS_NAMED_BODY_PARAMS:
params = self.to_dict()
else:
params = {"body": self.to_dict()}
params.update(kwargs)

return es.indices.create(index=self._name, **params)

def is_closed(self, using=None):
state = self._get_connection(using).cluster.state(
Expand Down
12 changes: 10 additions & 2 deletions elasticsearch_dsl/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from six import iteritems, string_types

from .aggs import A, AggBase
from .connections import get_connection
from .connections import CLIENT_HAS_NAMED_BODY_PARAMS, get_connection
from .exceptions import IllegalOperation
from .query import Bool, Q
from .response import Hit, Response
Expand Down Expand Up @@ -711,8 +711,16 @@ def execute(self, ignore_cache=False):
if ignore_cache or not hasattr(self, "_response"):
es = get_connection(self._using)

if CLIENT_HAS_NAMED_BODY_PARAMS:
params = self.to_dict()
if "from" in params:
params["from_"] = params.pop("from")
else:
params = {"body": self.to_dict()}
params.update(self._params)

self._response = self._response_class(
self, es.search(index=self._index, body=self.to_dict(), **self._params)
self, es.search(index=self._index, **params)
)
return self._response

Expand Down
14 changes: 12 additions & 2 deletions tests/test_integration/test_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import warnings
from datetime import datetime
from ipaddress import ip_address

Expand Down Expand Up @@ -407,7 +408,11 @@ def test_mget_ignores_missing_docs_when_missing_param_is_skip(data_client):
def test_update_works_from_search_response(data_client):
elasticsearch_repo = Repository.search().execute()[0]

elasticsearch_repo.update(owner={"other_name": "elastic"})
with warnings.catch_warnings(record=True) as w:
elasticsearch_repo.update(owner={"other_name": "elastic"})
assert [
str(x.message) for x in w if issubclass(x.category, DeprecationWarning)
] == []
assert "elastic" == elasticsearch_repo.owner.other_name

new_version = Repository.get("elasticsearch-dsl-py")
Expand Down Expand Up @@ -442,7 +447,12 @@ def test_save_updates_existing_doc(data_client):

elasticsearch_repo.new_field = "testing-save"
old_seq_no = elasticsearch_repo.meta.seq_no
assert "updated" == elasticsearch_repo.save()

with warnings.catch_warnings(record=True) as w:
assert "updated" == elasticsearch_repo.save()
assert [
str(x.message) for x in w if issubclass(x.category, DeprecationWarning)
] == []

new_repo = data_client.get(index="git", id="elasticsearch-dsl-py")
assert "testing-save" == new_repo["_source"]["new_field"]
Expand Down
9 changes: 8 additions & 1 deletion tests/test_integration/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.

import warnings

from elasticsearch_dsl import Date, Document, Index, IndexTemplate, Text, analysis


Expand Down Expand Up @@ -65,7 +67,12 @@ def test_index_can_be_created_with_settings_and_mappings(write_client):
i = Index("test-blog", using=write_client)
i.document(Post)
i.settings(number_of_replicas=0, number_of_shards=1)
i.create()

with warnings.catch_warnings(record=True) as w:
i.create()
assert [
str(x.message) for x in w if issubclass(x.category, DeprecationWarning)
] == []

assert {
"test-blog": {
Expand Down
13 changes: 13 additions & 0 deletions tests/test_integration/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from __future__ import unicode_literals

import warnings

from elasticsearch import TransportError
from pytest import raises

Expand Down Expand Up @@ -169,3 +171,14 @@ def test_raw_subfield_can_be_used_in_aggs(data_client):
authors = r.aggregations.authors
assert 1 == len(authors)
assert {"key": "Honza Král", "doc_count": 52} == authors[0]


def test_no_deprecation_warnings(data_client):
s = Search(index="git")[0:0]
s.aggs.bucket("authors", "terms", field="author.name.raw", size=1)

with warnings.catch_warnings(record=True) as w:
s.execute()
assert [
str(x.message) for x in w if issubclass(x.category, DeprecationWarning)
] == []
11 changes: 9 additions & 2 deletions tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pytest import raises

from elasticsearch_dsl import Document, Q, query, search
from elasticsearch_dsl.connections import CLIENT_HAS_NAMED_BODY_PARAMS
from elasticsearch_dsl.exceptions import IllegalOperation


Expand All @@ -43,7 +44,10 @@ def test_cache_can_be_ignored(mock_client):
s._response = r
s.execute(ignore_cache=True)

mock_client.search.assert_called_once_with(index=None, body={})
if CLIENT_HAS_NAMED_BODY_PARAMS:
mock_client.search.assert_called_once_with(index=None)
else:
mock_client.search.assert_called_once_with(index=None, body={})


def test_iter_iterates_over_hits():
Expand Down Expand Up @@ -411,7 +415,10 @@ def test_params_being_passed_to_search(mock_client):
s = s.params(routing="42")
s.execute()

mock_client.search.assert_called_once_with(index=None, body={}, routing="42")
if CLIENT_HAS_NAMED_BODY_PARAMS:
mock_client.search.assert_called_once_with(index=None, routing="42")
else:
mock_client.search.assert_called_once_with(index=None, body={}, routing="42")


def test_source():
Expand Down