Skip to content

Commit

Permalink
Fail on ES errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Bob Corsaro committed Feb 10, 2016
1 parent 3ab6685 commit 3f9a3cd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
17 changes: 16 additions & 1 deletion tubing/ext/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,30 @@ def transform(self, chunk):

PrepareBulkUpdate = pipes.MakePipe(BulkUpdateTransformer, default_chunk_size=2 ** 10)

def BulkUpdate(base_url, index, username=None, password=None, chunk_size=50, chunks_per_post=20):
def BulkUpdate(base_url, index, username=None, password=None, chunk_size=50, chunks_per_post=20, fail_on_error=True):
"""
Docs per post is chunk_size * chunks_per_post.
"""
url = "%s/%s/_bulk" % (base_url, index)
def response_handler(resp):
try:
try:
resp_obj = json.loads(resp.text)
except ValueError:
raise ElasticSearchError("invalid response: '%s'" % (resp.text))

if resp_obj['errors']:
raise ElasticSearchError("errors in response: '%s'" % (resp.text))
except:
logger.exception(resp)
if fail_on_error:
raise

return sinks.HTTPPost(
url=url,
username=username,
password=password,
chunk_size=chunk_size,
chunks_per_post=chunks_per_post,
response_handler=response_handler,
)
5 changes: 3 additions & 2 deletions tubing/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ class HTTPPost(object):
"""
Expects a stream of byte strings.
"""
def __init__(self, url, username=None, password=None, chunk_size=2 ** 4, chunks_per_post=2 ** 10):
def __init__(self, url, username=None, password=None, chunk_size=2 ** 4, chunks_per_post=2 ** 10, response_handler=lambda _: None):
self.url = url
self.auth = None
if username and password:
self.auth = username, password
self.chunk_size = chunk_size
self.per_post = chunks_per_post
self.response_handler = response_handler

def __call__(self, source):
posts = GeneratorGeneratorSink(
Expand All @@ -175,4 +176,4 @@ def __call__(self, source):
source=source
)
for post in posts:
resp = requests.post(self.url, data=post, auth=self.auth)
self.response_handler(requests.post(self.url, data=post, auth=self.auth))

0 comments on commit 3f9a3cd

Please sign in to comment.