Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
bugfixes from upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
hlindset committed Feb 16, 2016
1 parent d0f63fb commit 3792f35
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
11 changes: 7 additions & 4 deletions piped_database/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ def _fail_if_configuration_is_invalid(self):
if not self.engine_configuration.get('url'):
raise exceptions.ConfigurationError('missing database-URL')
if not self.ping_interval:
detail = 'currently set to [{0!r}]'.format(configuration['ping_interval'])
raise exceptions.ConfigurationError('Please specify a non-zero ping-interval', detail)

This comment has been minimized.

Copy link
@halvorgb

halvorgb Feb 16, 2016

Member

Gjør detta noe nyttig a? samme erroren bare med mindre info

raise exceptions.ConfigurationError('Please specify a non-zero ping-interval')

@defer.inlineCallbacks
def run(self):
Expand Down Expand Up @@ -398,8 +397,12 @@ def unlisten(self, queue):
if queue in queues:
queues.discard(queue)
if not queues:
yield self._run_exclusively(self._connection.runOperation, 'UNLISTEN "%s"' % listener_name)
logger.info('"%s"-listener is now NOT listening to "%s"' % (self.profile_name, listener_name))
try:
yield self._run_exclusively(self._connection.runOperation, 'UNLISTEN "%s"' % listener_name)
logger.info('"%s"-listener is now NOT listening to "%s"' % (self.profile_name, listener_name))
except Exception as e:
# This happens when a connection drops, in which case we're re-establishing it.
pass

def wait_for_txid_min(self, txid):
""" Returns a Deferred that is callbacked with the current
Expand Down
39 changes: 21 additions & 18 deletions piped_database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import random
import os

import sqlalchemy as sa
from piped import exceptions, service, util
Expand Down Expand Up @@ -56,33 +57,33 @@ def run_with_dependencies(self):
if not self.is_enabled():
return

while self.running:
try:
self.listener = yield self.cancellable(self.listener_dependency.wait_for_resource())
try:
self.listener = yield self.cancellable(self.listener_dependency.wait_for_resource())

if self.lock_name:
yield self.cancellable(self.listener.wait_for_advisory_lock(self.lock_name))

yield self.run_as_leader()
if self.lock_name:
yield self.cancellable(self.listener.wait_for_advisory_lock(self.lock_name))

except defer.CancelledError:
break
yield self.cancellable(self.run_as_leader())

except Exception as e:
logger.exception('unhandled exception')
except defer.CancelledError:
return

finally:
self.listener.release_lock(self.lock_name)
yield self.wait()
except Exception as e:
logger.exception('unhandled exception')

finally:
self.listener.release_lock(self.lock_name)
yield self.wait()

@defer.inlineCallbacks
def run_as_leader(self):
logger.info('Running as leader for service [{0}]'.format(self.service_name))
notification_queue = yield self.listener.listen(self.channels)

yield self.process_initial()
logger.info('Running as leader for service [{0}]. pid: [{1}]'.format(self.service_name, os.getpid()))

try:
notification_queue = yield self.cancellable(self.listener.listen(self.channels))

yield self.cancellable(self.process_initial())

while self.running:
event = yield self.cancellable(notification_queue.get())

Expand All @@ -98,6 +99,8 @@ def run_as_leader(self):

try:
result = yield handler(payload)
except defer.CancelledError:
raise
except Exception as e:
logger.exception('unhandled exception in run_as_leader')

Expand Down

0 comments on commit 3792f35

Please sign in to comment.