Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #827 from Parsely/bugfix/cleanup_fetch
Browse files Browse the repository at this point in the history
call cleanup() in fetch instead of autocommitter
  • Loading branch information
Emmett J. Butler committed Jul 19, 2018
2 parents 2419a49 + 2f5a96e commit e460fff
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ def autocommitter():
# surface all exceptions to the main thread
self._worker_exception = sys.exc_info()
break
self.cleanup()
log.debug("Autocommitter thread exiting")
log.debug("Starting autocommitter thread")
return self._cluster.handler.spawn(autocommitter, name="pykafka.SimpleConsumer.autocommiter")
Expand All @@ -447,6 +446,11 @@ def fetcher():
# surface all exceptions to the main thread
self._worker_exception = sys.exc_info()
break
try:
self.cleanup()
except ReferenceError as e:
log.debug("Attempt to cleanup consumer failed")
log.exception(e)
log.debug("Fetcher thread exiting")
log.info("Starting %s fetcher threads", self._num_consumer_fetchers)
return [self._cluster.handler.spawn(fetcher, name="pykafka.SimpleConsumer.fetcher")
Expand Down

0 comments on commit e460fff

Please sign in to comment.