Skip to content

Commit

Permalink
Refactor exceptions to be more similar to elasticsearch-py (#31)
Browse files Browse the repository at this point in the history
* Handle bad request, and better handle unknown errors

* Refactor exceptions to be more similar to elasticsearch-py
  • Loading branch information
odedlaz authored and avihad committed Aug 14, 2016
1 parent cd5ccd7 commit b4118af
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 92 deletions.
141 changes: 95 additions & 46 deletions tests/test_bulk_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,62 @@


class TestBulkUtility(TestCase):

def setUp(self):
self.bulk_utility = BulkUtility(MagicMock())

@inlineCallbacks
def test_bulk_stats_only(self):
self.bulk_utility.streaming_bulk = MagicMock(return_value=[succeed([ITEM_SUCCESS, ITEM_SUCCESS, ITEM_FAILED])])
return_value = [succeed([ITEM_SUCCESS, ITEM_SUCCESS, ITEM_FAILED])]
self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
success, faileds = yield self.bulk_utility.bulk(None, stats_only=True)
self.assertEqual(2, success)
self.assertEqual(1, faileds)

@inlineCallbacks
def test_bulk(self):
self.bulk_utility.streaming_bulk = MagicMock(return_value=[succeed([ITEM_SUCCESS, ITEM_SUCCESS, ITEM_FAILED])])
return_value = [succeed([ITEM_SUCCESS, ITEM_SUCCESS, ITEM_FAILED])]
self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
errors = yield self.bulk_utility.bulk(None)
self.assertEqual((2, [ERROR_MSG]), errors)

def test_streaming_bulk(self):
self.bulk_utility._process_bulk_chunk = MagicMock()
num_of_actions = 30
actions = [i for i in range(num_of_actions)]
mock_sub_actions = [1, 2, 3]
num_of_chunks = 4
self.bulk_utility._chunk_actions = MagicMock(return_value=[mock_sub_actions for i in range(num_of_chunks)])
expand_action_callback = MagicMock()
for d in self.bulk_utility.streaming_bulk(actions, expand_action_callback=expand_action_callback):
pass
return_value = [mock_sub_actions for i in range(num_of_chunks)]
self.bulk_utility._chunk_actions = MagicMock(return_value=return_value)

cb = MagicMock()

self.assertEqual(num_of_actions, expand_action_callback.call_count)
self.assertEqual(num_of_chunks, self.bulk_utility._process_bulk_chunk.call_count)
bulk = list(self.bulk_utility.streaming_bulk(range(num_of_actions),
expand_action_callback=cb))

self.assertEqual(num_of_actions,
cb.call_count)
self.assertEqual(num_of_chunks,
self.bulk_utility._process_bulk_chunk.call_count)

def test_action_parser(self):
update_record = {EsBulk.OP_TYPE: EsBulk.UPDATE, EsDocProperties.INDEX: SOME_INDEX,
EsDocProperties.TYPE: SOME_DOC_TYPE, EsDocProperties.ID: SOME_ID,
update_record = {EsBulk.OP_TYPE: EsBulk.UPDATE,
EsDocProperties.INDEX: SOME_INDEX,
EsDocProperties.TYPE: SOME_DOC_TYPE,
EsDocProperties.ID: SOME_ID,
EsDocProperties.SOURCE: SOME_DOC}

result = ActionParser.expand_action(update_record)

expected = (self._create_action_row(EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID), SOME_DOC)
row = [EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]
expected = (self._create_action_row(*row), SOME_DOC)

self.assertEqual(expected, result)

@staticmethod
def _create_action_row(op_type, index, doc_type, id):
return {op_type: {EsDocProperties.INDEX: index, EsDocProperties.TYPE: doc_type,
EsDocProperties.ID: id}}
return {op_type: {EsDocProperties.INDEX: index,
EsDocProperties.TYPE: doc_type,
EsDocProperties.ID: id}
}

def test__expand_action_doc_str(self):
doc_dumps = json.dumps(SOME_DOC)
Expand All @@ -78,37 +89,53 @@ def test__expand_action_doc_str(self):
self.assertEqual(expected, result)

def test__expand_action_delete(self):
delete_record = {EsBulk.OP_TYPE: EsBulk.DELETE, EsDocProperties.INDEX: SOME_INDEX,
EsDocProperties.TYPE: SOME_DOC_TYPE, EsDocProperties.ID: SOME_ID}
delete_record = {EsBulk.OP_TYPE: EsBulk.DELETE,
EsDocProperties.INDEX: SOME_INDEX,
EsDocProperties.TYPE: SOME_DOC_TYPE,
EsDocProperties.ID: SOME_ID}

result = ActionParser.expand_action(delete_record)

expected = (self._create_action_row(EsBulk.DELETE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID), None)
row = [EsBulk.DELETE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]
expected = (self._create_action_row(*row), None)

self.assertEqual(expected, result)

def test__chunk_actions_by_chunk_size(self):
num_of_tasks = 10
actions = [(self._create_action_row(EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID), SOME_DOC) for i in
range(num_of_tasks)]
num_of_tasks_per_chunk = 3
chunks = [c for c in
self.bulk_utility._chunk_actions(actions, chunk_size=num_of_tasks_per_chunk, max_chunk_bytes=100000)]
self.assertEqual(math.ceil(num_of_tasks / num_of_tasks_per_chunk), len(chunks))
tasks_per_chunk = 3

row = [EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]

actions = [(self._create_action_row(*row), SOME_DOC)
for i in range(num_of_tasks)]

chunks = list(self.bulk_utility._chunk_actions(actions,
chunk_size=tasks_per_chunk,
max_chunk_bytes=100000))
self.assertEqual(math.ceil(num_of_tasks / tasks_per_chunk),
len(chunks))

def test__chunk_actions_by_chunk_bytes(self):
num_of_tasks = 10
actions = [(self._create_action_row(EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID), SOME_DOC) for i in
row = [EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]
actions = [(self._create_action_row(*row), SOME_DOC) for _ in
range(num_of_tasks)]
chunks = [c for c in self.bulk_utility._chunk_actions(actions, chunk_size=20, max_chunk_bytes=350)]

chunks = list(self.bulk_utility._chunk_actions(actions,
chunk_size=20,
max_chunk_bytes=350))

# Every 2 records is about 350 bytes so we will have 5 chunks
self.assertEqual(5, len(chunks))

def test__chunk_actions_serialize(self):
num_of_tasks = 10
actions = [(self._create_action_row(EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID), SOME_DOC) for i in
row = [EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]
actions = [(self._create_action_row(*row), SOME_DOC) for _ in
range(num_of_tasks)]
chunks = [c for c in self.bulk_utility._chunk_actions(actions, chunk_size=20, max_chunk_bytes=100000)]
chunks = list(self.bulk_utility._chunk_actions(actions,
chunk_size=20,
max_chunk_bytes=100000))
# Every 2 records is about 350 bytes so we will have 5 chunks
expected = []
for a, d in actions:
Expand All @@ -123,25 +150,36 @@ def test__process_bulk_chunk_good_results(self):
op_type2 = EsBulk.DELETE
good_index_result = {op_type1: {'status': 200}}
good_delete_result = {op_type2: {'status': 200}}
bulk_mock_result = {'items': [good_index_result.copy(), good_delete_result.copy()]}
bulk_mock_result = {'items': [good_index_result.copy(),
good_delete_result.copy()]}

self.bulk_utility.client.bulk = MagicMock(
return_value=bulk_mock_result)

row = [op_type1, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]
actions = [self._create_action_row(*row)]

self.bulk_utility.client.bulk = MagicMock(return_value=bulk_mock_result)
actions = [self._create_action_row(op_type1, SOME_INDEX, SOME_DOC_TYPE, SOME_ID)]
results = yield self.bulk_utility._process_bulk_chunk(json.dumps(actions))
self.assertEqual([(True, good_index_result), (True, good_delete_result)], results)
self.assertEqual([(True, good_index_result),
(True, good_delete_result)], results)

@inlineCallbacks
def test__process_bulk_chunk(self):
op_type1 = EsBulk.INDEX
op_type2 = EsBulk.DELETE
good_index_result = {op_type1: {'status': 200}}
good_delete_result = {op_type2: {'status': 200}}
bulk_mock_result = {'items': [good_index_result.copy(), good_delete_result.copy()]}
bulk_mock_result = {'items': [good_index_result.copy(),
good_delete_result.copy()]}

self.bulk_utility.client.bulk = MagicMock(return_value=bulk_mock_result)
actions = [self._create_action_row(op_type1, SOME_INDEX, SOME_DOC_TYPE, SOME_ID)]
self.bulk_utility.client.bulk = MagicMock(
return_value=bulk_mock_result)
row = [op_type1, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]
actions = [self._create_action_row(*row)]
results = yield self.bulk_utility._process_bulk_chunk(json.dumps(actions))
self.assertEqual([(True, good_index_result), (True, good_delete_result)], results)
self.assertEqual([(True, good_index_result),
(True, good_delete_result)],
results)

@inlineCallbacks
def test__process_bulk_chunk_error(self):
Expand All @@ -151,30 +189,41 @@ def test__process_bulk_chunk_error(self):
bad_delete_result = {op_type2: {'status': 500}}
bulk_mock_result = {'items': [good_index_result, bad_delete_result]}

self.bulk_utility.client.bulk = MagicMock(return_value=bulk_mock_result)
actions = [self._create_action_row(op_type1, SOME_INDEX, SOME_DOC_TYPE, SOME_ID)]
yield self.assertFailure(self.bulk_utility._process_bulk_chunk(json.dumps(actions)), BulkIndexError)
self.bulk_utility.client.bulk = MagicMock(
return_value=bulk_mock_result)
row = [op_type1, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]
actions = [self._create_action_row(*row)]
yield self.assertFailure(self.bulk_utility._process_bulk_chunk(json.dumps(actions)),
BulkIndexError)

@inlineCallbacks
def test__process_bulk_connection_timeout_raise(self):
self.bulk_utility.client.bulk = MagicMock(side_effect=ConnectionTimeout)
self.bulk_utility.client.bulk = MagicMock(
side_effect=ConnectionTimeout("test-timeout"))
self.bulk_utility._handle_transport_error = MagicMock()
yield self.assertFailure(self.bulk_utility._process_bulk_chunk([]), ConnectionTimeout)
yield self.assertFailure(self.bulk_utility._process_bulk_chunk([]),
ConnectionTimeout)

@inlineCallbacks
def test__process_bulk_transport_error_raise_false(self):
self.bulk_utility.client.bulk = MagicMock(side_effect=ConnectionTimeout)
self.bulk_utility.client.bulk = MagicMock(
side_effect=ConnectionTimeout("test-timeout"))
self.bulk_utility._handle_transport_error = MagicMock()
results = yield self.bulk_utility._process_bulk_chunk([], raise_on_exception=False)
results = yield self.bulk_utility._process_bulk_chunk([],
raise_on_exception=False)
self.assertEqual([], results)

def test__handle_transport_error(self):
actions = [(self._create_action_row(EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID), SOME_DOC) for i in
row = [EsBulk.UPDATE, SOME_INDEX, SOME_DOC_TYPE, SOME_ID]
actions = [(self._create_action_row(*row), SOME_DOC) for i in
range(10)]
dump_actions = []
for a, d in actions:
dump_actions.append(json.dumps(a))
dump_actions.append(json.dumps(d))

self.assertRaises(BulkIndexError, self.bulk_utility._handle_transport_error, dump_actions, Exception(),
self.assertRaises(BulkIndexError,
self.bulk_utility._handle_transport_error,
dump_actions,
Exception(),
raise_on_error=True)
14 changes: 12 additions & 2 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

from twistes.client import Elasticsearch
from twistes.consts import HttpMethod, EsConst, ResponseCodes, EsMethods
from twistes.exceptions import NotFoundError, ConnectionTimeout
from twistes.exceptions import (NotFoundError,
ConnectionTimeout,
ElasticsearchException,
RequestError)
from twistes.scroller import Scroller

FIELD_2 = 'FIELD_2'
Expand Down Expand Up @@ -150,11 +153,18 @@ def test_get_not_found(self):
auth=(SOME_USER, SOME_PASS), data=None,
timeout=TIMEOUT)


@inlineCallbacks
def test_bad_request_raises_bad_request_error(self):
self.es._async_http_client.request = MagicMock(return_value=self.generate_response(400))

yield self.assertFailure(self.es.get(SOME_INDEX, id=SOME_ID), RequestError)

@inlineCallbacks
def test_get_unknown_error(self):
self.es._async_http_client.request = MagicMock(return_value=self.generate_response(504))

yield self.assertFailure(self.es.get(SOME_INDEX, id=SOME_ID), Exception)
yield self.assertFailure(self.es.get(SOME_INDEX, id=SOME_ID), ElasticsearchException)

@inlineCallbacks
def test_get_connection_timeout(self):
Expand Down
19 changes: 13 additions & 6 deletions twistes/bulk_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def _get_relevant_action_params(data, op_type):


class BulkUtility(object):

def __init__(self, es):
self.client = es

Expand Down Expand Up @@ -147,16 +148,18 @@ def _process_bulk_chunk(self, bulk_actions, raise_on_exception=True, raise_on_er
"""
Send a bulk request to elasticsearch and process the output.
"""
# if raise on error is set, we need to collect errors per chunk before raising them
# if raise on error is set, we need to collect errors per chunk before
# raising them

resp = None
try:
# send the actual request
resp = yield self.client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)
actions = "{}\n".format('\n'.join(bulk_actions))
resp = yield self.client.bulk(actions, **kwargs)
except ConnectionTimeout as e:
# default behavior - just propagate exception
if raise_on_exception:
raise e
raise

self._handle_transport_error(bulk_actions, e, raise_on_error)
returnValue([])
Expand All @@ -175,13 +178,15 @@ def _process_bulk_chunk(self, bulk_actions, raise_on_exception=True, raise_on_er
results.append((ok, {op_type: item}))

if errors:
raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
msg_fmt = '{num} document(s) failed to index.'
raise BulkIndexError(msg_fmt.format(num=len(errors)), errors)
else:
returnValue(results)

@staticmethod
def _handle_transport_error(bulk_actions, e, raise_on_error):
# if we are not propagating, mark all actions in current chunk as failed
# if we are not propagating, mark all actions in current chunk as
# failed
exc_errors = []
# deserialize the data back, this is expensive but only run on
# errors if raise_on_exception is false, so shouldn't be a real
Expand All @@ -203,4 +208,6 @@ def _handle_transport_error(bulk_actions, e, raise_on_error):

# emulate standard behavior for failed actions
if raise_on_error:
raise BulkIndexError('%i document(s) failed to index.' % len(exc_errors), exc_errors)
msg_fmt = '{num} document(s) failed to index.'
raise BulkIndexError(msg_fmt.format(num=len(exc_errors)),
exc_errors)
Loading

0 comments on commit b4118af

Please sign in to comment.