Skip to content

Commit

Permalink
Add verbose flag to bulk() for better insights
Browse files Browse the repository at this point in the history
  • Loading branch information
odedlaz committed Mar 16, 2017
1 parent 897ad02 commit d0f2d4a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 17 deletions.
30 changes: 25 additions & 5 deletions tests/test_bulk_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
SOME_ID = "some_id"

ERROR_MSG = "error_msg"
SUCCESS = "success"
ITEM_FAILED = (False, ERROR_MSG)

ITEM_SUCCESS = (True, None)
ITEM_SUCCESS = (True, SUCCESS)


class TestBulkUtility(TestCase):
Expand All @@ -35,11 +36,30 @@ def test_bulk_stats_only(self):
self.assertEqual(1, faileds)

@inlineCallbacks
def test_bulk(self):
return_value = [succeed([ITEM_SUCCESS, ITEM_SUCCESS, ITEM_FAILED])]
def test_bulk_not_stats_only(self):
return_value = [succeed([ITEM_SUCCESS,
ITEM_SUCCESS,
ITEM_FAILED,
ITEM_FAILED,
ITEM_FAILED])]
self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
retval = yield self.bulk_utility.bulk(None, stats_only=False)
self.assertEqual([ERROR_MSG] * 3, retval)

@inlineCallbacks
def test_bulk_verbose_output(self):
output = [ITEM_SUCCESS,
ITEM_SUCCESS,
ITEM_FAILED,
ITEM_FAILED,
ITEM_FAILED]

return_value = [succeed(output)]

self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
errors = yield self.bulk_utility.bulk(None)
self.assertEqual((2, [ERROR_MSG]), errors)
inserted, errors = yield self.bulk_utility.bulk(None, verbose=True)
self.assertEqual([SUCCESS] * 2, inserted)
self.assertEqual([ERROR_MSG] * 3, errors)

def test_streaming_bulk(self):
self.bulk_utility._process_bulk_chunk = MagicMock()
Expand Down
28 changes: 16 additions & 12 deletions twistes/bulk_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from operator import methodcaller

from twisted.internet.defer import inlineCallbacks, returnValue

from twistes.compatability import string_types
from twistes.consts import EsBulk, EsDocProperties
from twistes.exceptions import BulkIndexError, ConnectionTimeout
Expand Down Expand Up @@ -54,7 +53,7 @@ def __init__(self, es):
self.client = es

@inlineCallbacks
def bulk(self, actions, stats_only=False, **kwargs):
def bulk(self, actions, stats_only=False, verbose=False, **kwargs):
"""
Helper for the :meth:`~elasticsearch.Elasticsearch.bulk` api that provides
a more human friendly interface - it consumes an iterator of actions and
Expand All @@ -67,26 +66,29 @@ def bulk(self, actions, stats_only=False, **kwargs):
:arg stats_only: if `True` only report number of successful/failed
operations instead of just number of successful and a list of error responses
Any additional keyword arguments will be passed to
:arg verbose: return verbose data: (inserted, errors)
:func:`~elasticsearch.helpers.streaming_bulk` which is used to execute
the operation.
"""
success, failed = 0, 0

# list of errors to be collected is not stats_only
inserted = []
errors = []

for deferred_bulk in self.streaming_bulk(actions, **kwargs):
bulk_results = yield deferred_bulk
for ok, item in bulk_results:
# go through request-response pairs and detect failures
if not ok:
if not stats_only:
errors.append(item)
failed += 1
else:
success += 1
summarized_results = success, failed if stats_only else errors
returnValue(summarized_results)
l = inserted if ok else errors
l.append(item)

if verbose:
returnValue((inserted, errors))

if stats_only:
returnValue((len(inserted), len(errors)))

# here for backwards compatibility
returnValue(errors)

def streaming_bulk(self, actions, chunk_size=500, max_chunk_bytes=100 * 1024 * 1024,
raise_on_error=True, expand_action_callback=ActionParser.expand_action,
Expand Down Expand Up @@ -137,6 +139,7 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes):
bulk_actions.append(action)
if data is not None:
bulk_actions.append(data)

size += cur_size
action_count += 1

Expand Down Expand Up @@ -201,6 +204,7 @@ def _handle_transport_error(bulk_actions, e, raise_on_error):
info = {"error": str(e), "exception": e}
if op_type != 'delete':
info['data'] = next(bulk_data)

info.update(action)
exc_errors.append({op_type: info})
except StopIteration:
Expand Down

0 comments on commit d0f2d4a

Please sign in to comment.