Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix usage of reply_map to avoid spurious RecursiveOperationDetected.

  • Loading branch information...
commit 2e30b67570ef62e31c684fed1d85ccc6bcdcd9e6 1 parent 2d25505
@tonyg tonyg authored
Showing with 91 additions and 4 deletions.
  1. +82 −0 examples/demo_get.py
  2. +9 −4 pika/channel.py
View
82 examples/demo_get.py
@@ -0,0 +1,82 @@
+#!/usr/bin/env python
+# ***** BEGIN LICENSE BLOCK *****
+# Version: MPL 1.1/GPL 2.0
+#
+# The contents of this file are subject to the Mozilla Public License
+# Version 1.1 (the "License"); you may not use this file except in
+# compliance with the License. You may obtain a copy of the License at
+# http://www.mozilla.org/MPL/
+#
+# Software distributed under the License is distributed on an "AS IS"
+# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+# the License for the specific language governing rights and
+# limitations under the License.
+#
+# The Original Code is Pika.
+#
+# The Initial Developers of the Original Code are LShift Ltd, Cohesive
+# Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+# created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+# Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+# (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+# Rabbit Technologies Ltd.
+#
+# Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+# Ltd. Portions created by Cohesive Financial Technologies LLC are
+# Copyright (C) 2007-2009 Cohesive Financial Technologies
+# LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+# 2007-2009 Rabbit Technologies Ltd.
+#
+# Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+# LShift Ltd and Tony Garnock-Jones.
+#
+# All Rights Reserved.
+#
+# Contributor(s): ______________________________________.
+#
+# Alternatively, the contents of this file may be used under the terms
+# of the GNU General Public License Version 2 or later (the "GPL"), in
+# which case the provisions of the GPL are applicable instead of those
+# above. If you wish to allow use of your version of this file only
+# under the terms of the GPL, and not to allow others to use your
+# version of this file under the terms of the MPL, indicate your
+# decision by deleting the provisions above and replace them with the
+# notice and other provisions required by the GPL. If you do not
+# delete the provisions above, a recipient may use your version of
+# this file under the terms of any one of the MPL or the GPL.
+#
+# ***** END LICENSE BLOCK *****
+
+'''
+Example of the use of basic_get. NOT RECOMMENDED - use
+basic_consume instead if at all possible!
+'''
+
+import sys
+import pika
+import asyncore
+import time
+
+conn = pika.AsyncoreConnection(pika.ConnectionParameters(
+ (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1',
+ credentials = pika.PlainCredentials('guest', 'guest')))
+
+print 'Connected to %r' % (conn.server_properties,)
+
+qname = (len(sys.argv) > 2) and sys.argv[2] or 'test'
+
+ch = conn.channel()
+ch.queue_declare(queue=qname, durable=True, exclusive=False, auto_delete=False)
+
+while conn.is_alive():
+ result = ch.basic_get(queue = qname)
+ print result
+ if isinstance(result, pika.spec.Basic.GetEmpty):
+ pass
+ elif isinstance(result, pika.spec.Basic.GetOk):
+ ch.basic_ack(result.delivery_tag)
+ else:
+ raise Exception("Hmm, that's unexpected. basic_get should have returned either "
+ "Basic.GetOk or Basic.GetEmpty",
+ result)
+ time.sleep(1)
View
13 pika/channel.py
@@ -57,7 +57,7 @@ def __init__(self, connection, channel_number = None):
self.frame_handler = self._handle_method
self.channel_close = None
self.async_map = {}
- self.reply_map = {}
+ self.reply_map = None
self.flow_active = True ## we are permitted to transmit, so True.
self.channel_state_change_event = event.Event()
@@ -102,13 +102,18 @@ def wait_for_reply(self, acceptable_replies):
if not acceptable_replies:
# One-way.
return
+
+ if self.reply_map is not None:
+ raise RecursiveOperationDetected([p.NAME for p in acceptable_replies])
+
reply = [None]
def set_reply(r):
reply[0] = r
+
+ self.reply_map = {}
for possibility in acceptable_replies:
- if possibility in self.reply_map:
- raise RecursiveOperationDetected(possibility.NAME)
self.reply_map[possibility] = set_reply
+
while True:
self._ensure()
self.connection.drain_events()
@@ -122,7 +127,7 @@ def _handle_async(self, method_frame, header_frame, body):
if header_frame is not None:
method._set_content(header_frame.properties, body)
handler = self.reply_map[methodClass]
- del self.reply_map[methodClass]
+ self.reply_map = None
handler(method)
elif methodClass in self.async_map:
self.async_map[methodClass](method_frame, header_frame, body)
Please sign in to comment.
Something went wrong with that request. Please try again.