Skip to content

Commit

Permalink
[7.x] Add optimistic concurrency and update+_source options to bulk
Browse files Browse the repository at this point in the history
Co-authored-by: Seth Michael Larson <seth.larson@elastic.co>
  • Loading branch information
github-actions[bot] and sethmlarson committed Sep 28, 2020
1 parent 6250aac commit ff378f6
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 3 deletions.
7 changes: 7 additions & 0 deletions elasticsearch/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
map = map
from queue import Queue

try:
from collections.abs import Mapping
except ImportError:
from collections import Mapping


__all__ = [
"string_types",
"quote_plus",
Expand All @@ -41,4 +47,5 @@
"urlparse",
"map",
"Queue",
"Mapping",
]
23 changes: 20 additions & 3 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import time

from ..exceptions import TransportError
from ..compat import map, string_types, Queue
from ..compat import map, string_types, Queue, Mapping

from .errors import ScanError, BulkIndexError

Expand All @@ -43,9 +43,22 @@ def expand_action(data):
data = data.copy()
op_type = data.pop("_op_type", "index")
action = {op_type: {}}

# If '_source' is a dict use it for source
# otherwise if op_type == 'update' then
# '_source' should be in the metadata.
if (
op_type == "update"
and "_source" in data
and not isinstance(data["_source"], Mapping)
):
action[op_type]["_source"] = data.pop("_source")

for key in (
"_id",
"_index",
"_if_seq_no",
"_if_primary_term",
"_parent",
"_percolate",
"_retry_on_conflict",
Expand All @@ -54,6 +67,8 @@ def expand_action(data):
"_type",
"_version",
"_version_type",
"if_seq_no",
"if_primary_term",
"parent",
"pipeline",
"retry_on_conflict",
Expand All @@ -62,13 +77,15 @@ def expand_action(data):
"version_type",
):
if key in data:
if key in [
if key in {
"_if_seq_no",
"_if_primary_term",
"_parent",
"_retry_on_conflict",
"_routing",
"_version",
"_version_type",
]:
}:
action[op_type][key[1:]] = data.pop(key)
else:
action[op_type][key] = data.pop(key)
Expand Down
97 changes: 97 additions & 0 deletions test_elasticsearch/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,103 @@ class TestChunkActions(TestCase):
def setup_method(self, _):
self.actions = [({"index": {}}, {"some": u"datá", "i": i}) for i in range(100)]

def test_expand_action(self):
self.assertEqual(helpers.expand_action({}), ({"index": {}}, {}))
self.assertEqual(
helpers.expand_action({"key": "val"}), ({"index": {}}, {"key": "val"})
)

def test_expand_action_actions(self):
self.assertEqual(
helpers.expand_action(
{"_op_type": "delete", "_id": "id", "_index": "index"}
),
({"delete": {"_id": "id", "_index": "index"}}, None),
)
self.assertEqual(
helpers.expand_action(
{"_op_type": "update", "_id": "id", "_index": "index", "key": "val"}
),
({"update": {"_id": "id", "_index": "index"}}, {"key": "val"}),
)
self.assertEqual(
helpers.expand_action(
{"_op_type": "create", "_id": "id", "_index": "index", "key": "val"}
),
({"create": {"_id": "id", "_index": "index"}}, {"key": "val"}),
)
self.assertEqual(
helpers.expand_action(
{
"_op_type": "create",
"_id": "id",
"_index": "index",
"_source": {"key": "val"},
}
),
({"create": {"_id": "id", "_index": "index"}}, {"key": "val"}),
)

def test_expand_action_options(self):
for option in (
"_id",
"_index",
"_percolate",
"_timestamp",
"_type",
"if_seq_no",
"if_primary_term",
"parent",
"pipeline",
"retry_on_conflict",
"routing",
"version",
"version_type",
("_parent", "parent"),
("_retry_on_conflict", "retry_on_conflict"),
("_routing", "routing"),
("_version", "version"),
("_version_type", "version_type"),
("_if_seq_no", "if_seq_no"),
("_if_primary_term", "if_primary_term"),
):
if isinstance(option, str):
action_option = option
else:
option, action_option = option
self.assertEqual(
helpers.expand_action({"key": "val", option: 0}),
({"index": {action_option: 0}}, {"key": "val"}),
)

def test__source_metadata_or_source(self):
self.assertEqual(
helpers.expand_action({"_source": {"key": "val"}}),
({"index": {}}, {"key": "val"}),
)

self.assertEqual(
helpers.expand_action(
{"_source": ["key"], "key": "val", "_op_type": "update"}
),
({"update": {"_source": ["key"]}}, {"key": "val"}),
)

self.assertEqual(
helpers.expand_action(
{"_source": True, "key": "val", "_op_type": "update"}
),
({"update": {"_source": True}}, {"key": "val"}),
)

# This case is only to ensure backwards compatibility with old functionality.
self.assertEqual(
helpers.expand_action(
{"_source": {"key2": "val2"}, "key": "val", "_op_type": "update"}
),
({"update": {}}, {"key2": "val2"}),
)

def test_chunks_are_chopped_by_byte_size(self):
self.assertEqual(
100,
Expand Down

0 comments on commit ff378f6

Please sign in to comment.