Skip to content

Commit

Permalink
cascadedconsumer: use exception_hook to store error in db
Browse files Browse the repository at this point in the history
  • Loading branch information
markokr committed Sep 10, 2009
1 parent 381c53f commit 0daf50c
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions python/pgq/cascade/consumer.py
Expand Up @@ -150,6 +150,10 @@ def refresh_state(self, dst_db, full_logic = True):
q = "select * from pgq_node.set_consumer_uptodate(%s, %s, true)"
self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ])

if state['cur_error'] and self.work_state != -1:
q = "select * from pgq_node.set_consumer_error(%s, %s, NULL)"
self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ])

if not state['paused'] or not full_logic:
break
time.sleep(self.loop_delay)
Expand Down Expand Up @@ -211,3 +215,12 @@ def finish_remote_batch(self, src_db, dst_db, tick_id):
q = "select * from pgq_node.set_consumer_completed(%s, %s, %s)"
self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name, tick_id ])

def exception_hook(self, det, emsg, cname):
try:
dst_db = self.get_database(self.target_db)
q = "select * from pgq_node.set_consumer_error(%s, %s, %s)"
self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name, emsg ])
except:
self.log.warning("Failure to call pgq_node.set_consumer_error()")
self.reset()

0 comments on commit 0daf50c

Please sign in to comment.