Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor waiter handling in channel #83

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 48 additions & 144 deletions aioamqp/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,22 +187,31 @@ def server_channel_close(self, frame):
}
self.connection_closed(results['reply_code'], results['reply_text'])

@asyncio.coroutine
def _write_frame_awaiting_response(self, waiter_id, frame, request, no_wait, **kwargs):
'''Write a frame and set a waiter for the response (unless no_wait is set)'''
if no_wait:
yield from self._write_frame(frame, request, no_wait=True, **kwargs)
else:
f = self._set_waiter(waiter_id)
try:
yield from self._write_frame(frame, request, no_wait=False, **kwargs)
except Exception:
self._get_waiter(waiter_id)
f.cancel()
raise
return (yield from f)

@asyncio.coroutine
def flow(self, active, timeout=None):
frame = amqp_frame.AmqpRequest(self.protocol.writer, amqp_constants.TYPE_METHOD, self.channel_id)
frame.declare_method(
amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_FLOW)
request = amqp_frame.AmqpEncoder()
request.write_bits(active)
fut = self._set_waiter('flow')
try:
yield from self._write_frame(frame, request, no_wait=False, timeout=timeout, no_check_open=True)
except Exception:
self._get_waiter('flow')
fut.cancel()
raise
yield from fut
return fut.result()
return (yield from self._write_frame_awaiting_response(
'flow', frame, request, no_wait=False, timeout=timeout,
no_check_open=True))

@asyncio.coroutine
def flow_ok(self, frame):
Expand Down Expand Up @@ -234,17 +243,8 @@ def exchange_declare(self, exchange_name, type_name, passive=False, durable=Fals
request.write_bits(passive, durable, auto_delete, internal, no_wait)
request.write_table(arguments)

if not no_wait:
future = self._set_waiter('exchange_declare')
try:
yield from self._write_frame(frame, request, no_wait, timeout=timeout)
except Exception:
self._get_waiter('exchange_declare')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'exchange_declare', frame, request, no_wait, timeout=timeout))

@asyncio.coroutine
def exchange_declare_ok(self, frame):
Expand All @@ -264,17 +264,8 @@ def exchange_delete(self, exchange_name, if_unused=False, no_wait=False, timeout
request.write_shortstr(exchange_name)
request.write_bits(if_unused, no_wait)

if not no_wait:
future = self._set_waiter('exchange_delete')
try:
yield from self._write_frame(frame, request, no_wait, timeout=timeout)
except Exception:
self._get_waiter('exchange_delete')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'exchange_delete', frame, request, no_wait, timeout=timeout))

@asyncio.coroutine
def exchange_delete_ok(self, frame):
Expand All @@ -299,17 +290,8 @@ def exchange_bind(self, exchange_destination, exchange_source, routing_key,

request.write_bits(no_wait)
request.write_table(arguments)
if not no_wait:
future = self._set_waiter('exchange_bind')
try:
yield from self._write_frame(frame, request, no_wait, timeout=timeout)
except Exception:
self._get_waiter('exchange_bind')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'exchange_bind', frame, request, no_wait, timeout=timeout))

@asyncio.coroutine
def exchange_bind_ok(self, frame):
Expand All @@ -333,17 +315,8 @@ def exchange_unbind(self, exchange_destination, exchange_source, routing_key, no

request.write_bits(no_wait)
request.write_table(arguments)
if not no_wait:
future = self._set_waiter('exchange_unbind')
try:
yield from self._write_frame(frame, request, no_wait, timeout=timeout)
except Exception:
self._get_waiter('exchange_unbind')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'exchange_unbind', frame, request, no_wait, timeout=timeout))

@asyncio.coroutine
def exchange_unbind_ok(self, frame):
Expand Down Expand Up @@ -388,17 +361,8 @@ def queue_declare(self, queue_name=None, passive=False, durable=False,
request.write_shortstr(queue_name)
request.write_bits(passive, durable, exclusive, auto_delete, no_wait)
request.write_table(arguments)
if not no_wait:
future = self._set_waiter('queue_declare')
try:
yield from self._write_frame(frame, request, no_wait, timeout=timeout)
except Exception:
self._get_waiter('queue_declare')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'queue_declare', frame, request, no_wait, timeout=timeout))

@asyncio.coroutine
def queue_declare_ok(self, frame):
Expand Down Expand Up @@ -430,17 +394,8 @@ def queue_delete(self, queue_name, if_unused=False, if_empty=False, no_wait=Fals
request.write_short(0) # reserved
request.write_shortstr(queue_name)
request.write_bits(if_unused, if_empty, no_wait)
if not no_wait:
future = self._set_waiter('queue_delete')
try:
yield from self._write_frame(frame, request, no_wait, timeout=timeout)
except Exception:
self._get_waiter('queue_delete')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'queue_delete', frame, request, no_wait, timeout=timeout))

@asyncio.coroutine
def queue_delete_ok(self, frame):
Expand All @@ -465,17 +420,8 @@ def queue_bind(self, queue_name, exchange_name, routing_key, no_wait=False, argu
request.write_shortstr(routing_key)
request.write_octet(int(no_wait))
request.write_table(arguments)
if not no_wait:
future = self._set_waiter('queue_bind')
try:
yield from self._write_frame(frame, request, no_wait, timeout=timeout)
except Exception:
self._get_waiter('queue_bind')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'queue_bind', frame, request, no_wait, timeout=timeout))

@asyncio.coroutine
def queue_bind_ok(self, frame):
Expand All @@ -498,14 +444,8 @@ def queue_unbind(self, queue_name, exchange_name, routing_key, arguments=None, t
request.write_shortstr(exchange_name)
request.write_shortstr(routing_key)
request.write_table(arguments)
future = self._set_waiter('queue_unbind')
try:
yield from self._write_frame(frame, request, no_wait=False, timeout=timeout)
except Exception:
self._get_waiter('queue_unbind')
future.cancel()
raise
return (yield from future)
return (yield from self._write_frame_awaiting_response(
'queue_unbind', frame, request, no_wait=False, timeout=timeout))

@asyncio.coroutine
def queue_unbind_ok(self, frame):
Expand Down Expand Up @@ -673,20 +613,13 @@ def basic_consume(self, callback, queue_name='', consumer_tag='', no_local=False
self.consumer_callbacks[consumer_tag] = callback
self.last_consumer_tag = consumer_tag

if not no_wait:
future = self._set_waiter('basic_consume')
try:
yield from self._write_frame(frame, request, no_wait=no_wait, timeout=timeout)
except Exception:
self._get_waiter('basic_consume')
future.cancel()
raise
yield from future
return_value = yield from self._write_frame_awaiting_response(
'basic_consume', frame, request, no_wait, timeout=timeout)
if no_wait:
return_value = {'consumer_tag': consumer_tag}
else:
self._ctag_events[consumer_tag].set()
return future.result()

yield from self._write_frame(frame, request, no_wait=no_wait, timeout=timeout)
return {'consumer_tag': consumer_tag}
return return_value

@asyncio.coroutine
def basic_consume_ok(self, frame):
Expand Down Expand Up @@ -742,17 +675,8 @@ def basic_cancel(self, consumer_tag, no_wait=False, timeout=None):
request = amqp_frame.AmqpEncoder()
request.write_shortstr(consumer_tag)
request.write_bits(no_wait)
if not no_wait:
future = self._set_waiter('basic_cancel')
try:
yield from self._write_frame(frame, request, no_wait=no_wait, timeout=timeout)
except Exception:
self._get_waiter('basic_cancel')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait=no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'basic_cancel', frame, request, no_wait=no_wait, timeout=timeout))

@asyncio.coroutine
def basic_cancel_ok(self, frame):
Expand All @@ -773,14 +697,8 @@ def basic_get(self, queue_name='', no_ack=False, timeout=None):
request.write_short(0)
request.write_shortstr(queue_name)
request.write_bits(no_ack)
future = self._set_waiter('basic_get')
try:
yield from self._write_frame(frame, request, no_wait=False, timeout=timeout)
except Exception:
self._get_waiter('basic_get')
future.cancel()
raise
return (yield from future)
return (yield from self._write_frame_awaiting_response(
'basic_get', frame, request, no_wait=False, timeout=timeout))

@asyncio.coroutine
def basic_get_ok(self, frame):
Expand Down Expand Up @@ -867,13 +785,8 @@ def basic_recover(self, requeue=True, timeout=None):
amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RECOVER)
request = amqp_frame.AmqpEncoder()
request.write_bits(requeue)
future = self._set_waiter('basic_recover')

try:
yield from self._write_frame(frame, request, no_wait=False, timeout=timeout)
except Exception as exc:
future.set_exception(exc)
return (yield from future)
return (yield from self._write_frame_awaiting_response(
'basic_recover', frame, request, no_wait=False, timeout=timeout))

@asyncio.coroutine
def basic_recover_ok(self, frame):
Expand Down Expand Up @@ -949,17 +862,8 @@ def confirm_select(self, *, no_wait=False, timeout=None):
request = amqp_frame.AmqpEncoder()
request.write_shortstr('')

if not no_wait:
future = self._set_waiter('confirm_select')
try:
yield from self._write_frame(frame, request, no_wait=no_wait, timeout=timeout)
except Exception:
self._get_waiter('confirm_select')
future.cancel()
raise
return (yield from future)

yield from self._write_frame(frame, request, no_wait=no_wait, timeout=timeout)
return (yield from self._write_frame_awaiting_response(
'confirm_select', frame, request, no_wait, timeout=timeout))

@asyncio.coroutine
def confirm_select_ok(self, frame):
Expand Down