Permalink
Browse files

Merge pull request #710 from vitaly-krugl/cleanup-async-publisher

Cleaned up async publisher example in docs
  • Loading branch information...
2 parents 0310ee6 + 3c3db29 commit 9f62cbe032e9b4fa0fe1842587ce0702c3926a3d @gmr gmr committed Mar 1, 2016
Showing with 173 additions and 200 deletions.
  1. +46 −62 docs/examples/asynchronous_publisher_example.rst
  2. +127 −138 examples/tmp.py
@@ -11,7 +11,7 @@ publisher.py::
import json
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
- '-35s %(lineno) -5d: %(message)s')
+ '-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
@@ -43,13 +43,14 @@ publisher.py::
"""
self._connection = None
self._channel = None
- self._deliveries = []
- self._acked = 0
- self._nacked = 0
- self._message_number = 0
+
+ self._deliveries = None
+ self._acked = None
+ self._nacked = None
+ self._message_number = None
+
self._stopping = False
self._url = amqp_url
- self._closing = False
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
@@ -63,7 +64,8 @@ publisher.py::
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(pika.URLParameters(self._url),
- self.on_connection_open,
+ on_open_callback=self.on_connection_open,
+ on_close_callback=self.on_connection_closed,
stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
@@ -75,17 +77,8 @@ publisher.py::
"""
LOGGER.info('Connection opened')
- self.add_on_connection_close_callback()
self.open_channel()
- def add_on_connection_close_callback(self):
- """This method adds an on close callback that will be invoked by pika
- when RabbitMQ closes the connection to the publisher unexpectedly.
-
- """
- LOGGER.info('Adding connection close callback')
- self._connection.add_on_close_callback(self.on_connection_closed)
-
def on_connection_closed(self, connection, reply_code, reply_text):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
@@ -97,31 +90,12 @@ publisher.py::
"""
self._channel = None
- if self._closing:
+ if self._stopping:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
- self._connection.add_timeout(5, self.reconnect)
-
- def reconnect(self):
- """Will be invoked by the IOLoop timer if the connection is
- closed. See the on_connection_closed method.
-
- """
- self._deliveries = []
- self._acked = 0
- self._nacked = 0
- self._message_number = 0
-
- # This is the old connection IOLoop instance, stop its ioloop
- self._connection.ioloop.stop()
-
- # Create a new connection
- self._connection = self.connect()
-
- # There is now a new connection, needs a new ioloop to run
- self._connection.ioloop.start()
+ self._connection.add_timeout(5, self._connection.ioloop.stop)
def open_channel(self):
"""This method will open a new channel with RabbitMQ by issuing the
@@ -168,7 +142,8 @@ publisher.py::
"""
LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
- if not self._closing:
+ self._channel = None
+ if not self._stopping:
self._connection.close()
def setup_exchange(self, exchange_name):
@@ -282,8 +257,6 @@ publisher.py::
message to be delivered in PUBLISH_INTERVAL seconds.
"""
- if self._stopping:
- return
LOGGER.info('Scheduling next message for %0.1f seconds',
self.PUBLISH_INTERVAL)
self._connection.add_timeout(self.PUBLISH_INTERVAL,
@@ -302,7 +275,7 @@ publisher.py::
class.
"""
- if self._stopping:
+ if self._channel is None or not self._channel.is_open:
return
message = {u'مفتاح': u' قيمة',
@@ -320,21 +293,28 @@ publisher.py::
LOGGER.info('Published message # %i', self._message_number)
self.schedule_next_message()
- def close_channel(self):
- """Invoke this command to close the channel with RabbitMQ by sending
- the Channel.Close RPC command.
-
- """
- LOGGER.info('Closing the channel')
- if self._channel:
- self._channel.close()
-
def run(self):
"""Run the example code by connecting and then starting the IOLoop.
"""
- self._connection = self.connect()
- self._connection.ioloop.start()
+ while not self._stopping:
+ self._connection = None
+ self._deliveries = []
+ self._acked = 0
+ self._nacked = 0
+ self._message_number = 0
+
+ try:
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+ except KeyboardInterrupt:
+ self.stop()
+ if (self._connection is not None and
+ not self._connection.is_closed):
+ # Finish closing
+ self._connection.ioloop.start()
+
+ LOGGER.info('Stopped')
def stop(self):
"""Stop the example by closing the channel and connection. We
@@ -349,26 +329,30 @@ publisher.py::
self._stopping = True
self.close_channel()
self.close_connection()
- self._connection.ioloop.start()
- LOGGER.info('Stopped')
+
+ def close_channel(self):
+ """Invoke this command to close the channel with RabbitMQ by sending
+ the Channel.Close RPC command.
+
+ """
+ if self._channel is not None:
+ LOGGER.info('Closing the channel')
+ self._channel.close()
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
- LOGGER.info('Closing connection')
- self._closing = True
- self._connection.close()
+ if self._connection is not None:
+ LOGGER.info('Closing connection')
+ self._connection.close()
def main():
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
# Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
example = ExamplePublisher('amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat_interval=3600')
- try:
- example.run()
- except KeyboardInterrupt:
- example.stop()
+ example.run()
+
if __name__ == '__main__':
main()
-
Oops, something went wrong.

0 comments on commit 9f62cbe

Please sign in to comment.