Browse files

Refactor timeouts a little bit to fix a API mismatch in BlockingConne…

…ction

Fix issue #176 while cleaning up the code enough to be able to add better unit tests for the timeout functionality in BlockingConnection
  • Loading branch information...
1 parent 5a00707 commit 6c9a07fa3921ed5b274ac9c7882a8430b0d3b0e3 Gavin M. Roy committed Oct 1, 2012
Showing with 27 additions and 12 deletions.
  1. +27 −12 pika/adapters/blocking_connection.py
View
39 pika/adapters/blocking_connection.py
@@ -35,12 +35,12 @@ def add_timeout(self, deadline, callback_method):
:param int deadline: The number of seconds to wait to call callback
:param method callback_method: The callback method
- :rtype: int
+ :rtype: str
"""
timeout_id = '%.8f' % time.time()
- self._timeouts[timeout_id] = {'deadline': deadline,
- 'handler': callback_method}
+ self._timeouts[timeout_id] = {'deadline': deadline + time.time(),
+ 'method': callback_method}
return timeout_id
def channel(self, channel_number=None):
@@ -104,20 +104,15 @@ def process_data_events(self):
def process_timeouts(self):
"""Process the self._timeouts event stack"""
- keys = self._timeouts.keys()
- start_time = time.time()
- for timeout_id in keys:
- if timeout_id in self._timeouts and \
- self._timeouts[timeout_id]['deadline'] <= start_time:
- LOGGER.debug('Timeout calling %s',
- self._timeouts[timeout_id]['handler'])
- self._timeouts.pop(timeout_id)['handler']()
+ for timeout_id in self._timeouts.keys():
+ if self._deadline_passed(timeout_id):
+ self._call_timeout_method(self._timeouts.pop(timeout_id))
def remove_timeout(self, timeout_id):
"""Remove the timeout from the IOLoop by the ID returned from
add_timeout.
- :rtype: int
+ :param str timeout_id: The id of the timeout to remove
"""
if timeout_id in self._timeouts:
@@ -153,6 +148,26 @@ def _adapter_disconnect(self):
self.disconnect()
self._check_state_on_disconnect()
+ def _call_timeout_method(self, timeout_value):
+ """Execute the method that was scheduled to be called.
+
+ :param dict timeout_value: The configuration for the timeout
+
+ """
+ LOGGER.debug('Invoking scheduled call of %s', timeout_value['method'])
+ timeout_value['method']()
+
+ def _deadline_passed(self, timeout_id):
+ """Returns True if the deadline has passed for the specified timeout_id.
+
+ :param str timeout_id: The id of the timeout to check
+ :rtype: bool
+
+ """
+ if timeout_id not in self._timeouts.keys():
+ return False
+ return self._timeouts[timeout_id]['deadline'] <= time.time()
+
def _handle_disconnect(self):
"""Called internally when the socket is disconnected already"""
self.disconnect()

0 comments on commit 6c9a07f

Please sign in to comment.