Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Consumer updates and misc documentation updates

Consumer updates:
 - Remove _initialize method
 - Change Consumer.publish_message to Consumer._publish_message
 - Change Consumer.reply to Consumer._reply
test_consumer.py:
 - Remove test_initialize_called
 - Update a few tests
LICENSE:
 - Reflect Insider Guides, Inc. -> MeetMe, Inc. name change
 - Update copyright duration
setup.py:
 - Change extras to main requirements
Others:
 - Change my email address
 - Bump version
  • Loading branch information...
commit dad7bb8de431fb057e25ddd8764764b74c8cb971 1 parent e39123a
Gavin M. Roy authored
View
34 LICENSE
@@ -1,25 +1,25 @@
-Copyright (c) 2009, Insider Guides, Inc
+Copyright (c) 2009-2012, MeetMe, Inc.
All rights reserved.
-Redistribution and use in source and binary forms, with or without modification,
+Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
- * Redistributions of source code must retain the above copyright notice, this
+ * Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
- * Neither the name of the Insider Guides, Inc. nor the names of its
- contributors may be used to endorse or promote products derived from this
+ * Neither the name of the MeetMe, Inc. nor the names of its
+ contributors may be used to endorse or promote products derived from this
software without specific prior written permission.
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
-IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
-BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
-OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
-ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
View
4 rejected/__init__.py
@@ -2,6 +2,6 @@
Rejected is a Python RabbitMQ Consumer Framework and Controller Daemon
"""
-__author__ = 'Gavin M. Roy <gmr@myyearbook.com>'
+__author__ = 'Gavin M. Roy <gmr@meetme.com>'
__since__ = "2009-09-10"
-__version__ = "3.0.27"
+__version__ = "3.0.28"
View
238 rejected/consumer.py
@@ -87,33 +87,11 @@ def __init__(self, configuration):
# Default channel attribute
self._channel = None
-
+
# Each message received will be carried as an attribute
self._message = None
self._message_body = None
- # Call the initialize method
- self._initialize()
-
- def _initialize(self):
- """Extend this method for actions to take in initializing the Consumer
- instead of extending __init__.
-
- """
- pass
-
- def _process(self):
- """Extend this method for implementing your Consumer logic.
-
- If the message can not be processed and the Consumer should stop after
- n failures to process messages, raise the ConsumerException.
-
- :raises: ConsumerException
- :raises: NotImplementedError
-
- """
- raise NotImplementedError
-
def _auto_encode(self, content_encoding, value):
"""Based upon the value of the content_encoding, encode the value.
@@ -416,6 +394,119 @@ def _load_yaml_value(self, value):
"""
return yaml.load(value)
+ def _process(self):
+ """Extend this method for implementing your Consumer logic.
+
+ If the message can not be processed and the Consumer should stop after
+ n failures to process messages, raise the ConsumerException.
+
+ :raises: ConsumerException
+ :raises: NotImplementedError
+
+ """
+ raise NotImplementedError
+
+ def _publish_message(self, exchange, routing_key, properties, body,
+ no_serialization=False, no_encoding=False):
+ """Publish a message to RabbitMQ on the same channel the original
+ message was received on.
+
+ By default, if you pass a non-string object to the body and the
+ properties have a supported content-type set, the body will be
+ auto-serialized in the specified content-type.
+
+ If the properties do not have a timestamp set, it will be set to the
+ current time.
+
+ If you specify a content-encoding in the properties and the encoding is
+ supported, the body will be auto-encoded.
+
+ Both of these behaviors can be disabled by setting no_serialization or
+ no_encoding to True.
+
+ :param str exchange: The exchange to publish to
+ :param str routing_key: The routing key to publish with
+ :param rejected.data.Properties: The message properties
+ :param no_serialization: Turn off auto-serialization of the body
+ :param no_encoding: Turn off auto-encoding of the body
+
+ """
+ # Convert the rejected.data.Properties object to a pika.BasicProperties
+ logger.debug('Converting properties')
+ properties_out = self._get_pika_properties(properties)
+
+ # Auto-serialize the content if needed
+ if (not no_serialization and not isinstance(body, basestring) and
+ properties.content_type):
+ logger.debug('Auto-serializing message body')
+ body = self._auto_serialize(properties.content_type, body)
+
+ # Auto-encode the message body if needed
+ if not no_encoding and properties.content_encoding:
+ logger.debug('Auto-encoding message body')
+ body = self._auto_encode(properties.content_encoding, body)
+
+ # Publish the message
+ logger.debug('Publishing message to %s:%s', exchange, routing_key)
+ self._channel.basic_publish(exchange=exchange,
+ routing_key=routing_key,
+ properties=properties_out,
+ body=body)
+
+ def _reply(self, response_body, properties, auto_id=True,
+ exchange=None, reply_to=None):
+ """Reply to the received message.
+
+ If auto_id is True, a new uuid4 value will be generated for the
+ message_id and correlation_id will be set to the message_id of the
+ original message. In addition, the timestamp will be assigned the
+ current time of the message. If auto_id is False, neither the message_id
+ or the correlation_id will be changed in the properties.
+
+ If exchange is not set, the exchange the message was received on will
+ be used.
+
+ If reply_to is set in the original properties,
+ it will be used as the routing key. If the reply_to is not set
+ in the properties and it is not passed in, a ValueException will be
+ raised. If reply to is set in the properties, it will be cleared out
+ prior to the message being republished.
+
+ Like with the publish method, if you pass in a non-String object and
+ content-type is set to a supported content type, the content will
+ be auto-serialized. In addition, if the content-encoding is set to a
+ supported encoding, it will be auto-encoded.
+
+ :param any response_body: The message body to send
+ :param rejected.data.Properties properties: Message properties to use
+ :param bool auto_id: Automatically shuffle message_id and correlation_id
+ :param str reply_to: Override the reply_to in the properties
+ :raises: ValueError
+
+ """
+ if not properties.reply_to and not reply_to:
+ raise ValueError('Missing reply_to in properties or as argument')
+
+ if auto_id and properties.message_id:
+ properties.app_id = __name__
+ properties.correlation_id = properties.message_id
+ properties.message_id = str(uuid.uuid4())
+ properties.timestamp = int(time.time())
+ logger.debug('New message_id: %s', properties.message_id)
+ logger.debug('Correlation_id: %s', properties.correlation_id)
+
+ # Redefine the reply to if needed
+ reply_to = reply_to or properties.reply_to
+
+ # Wipe out reply_to if it's set
+ if properties.reply_to:
+ properties.reply_to = None
+
+ self._publish_message(exchange or self._message.exchange,
+ reply_to,
+ properties,
+ response_body)
+
@property
def message_app_id(self):
"""Return the app-id from the message properties.
@@ -658,107 +749,6 @@ def properties(self):
"""
return copy.copy(self._message.properties)
- def publish_message(self, exchange, routing_key, properties, body,
- no_serialization=False, no_encoding=False):
- """Publish a message to RabbitMQ on the same channel the original
- message was received on.
-
- By default, if you pass a non-string object to the body and the
- properties have a supported content-type set, the body will be
- auto-serialized in the specified content-type.
-
- If the properties do not have a timestamp set, it will be set to the
- current time.
-
- If you specify a content-encoding in the properties and the encoding is
- supported, the body will be auto-encoded.
-
- Both of these behaviors can be disabled by setting no_serialization or
- no_encoding to True.
-
- :param str exchange: The exchange to publish to
- :param str routing_key: The routing key to publish with
- :param rejected.data.Properties: The message properties
- :param no_serialization: Turn off auto-serialization of the body
- :param no_encoding: Turn off auto-encoding of the body
-
- """
- # Convert the rejected.data.Properties object to a pika.BasicProperties
- logger.debug('Converting properties')
- properties_out = self._get_pika_properties(properties)
-
- # Auto-serialize the content if needed
- if (not no_serialization and not isinstance(body, basestring) and
- properties.content_type):
- logger.debug('Auto-serializing message body')
- body = self._auto_serialize(properties.content_type, body)
-
- # Auto-encode the message body if needed
- if not no_encoding and properties.content_encoding:
- logger.debug('Auto-encoding message body')
- body = self._auto_encode(properties.content_encoding, body)
-
- # Publish the message
- logger.debug('Publishing message to %s:%s', exchange, routing_key)
- self._channel.basic_publish(exchange=exchange,
- routing_key=routing_key,
- properties=properties_out,
- body=body)
-
- def reply(self, response_body, properties, auto_id=True,
- exchange=None, reply_to=None):
- """Reply to the received message.
-
- If auto_id is True, a new uuid4 value will be generated for the
- message_id and correlation_id will be set to the message_id of the
- original message. In addition, the timestamp will be assigned the
- current time of the message. If auto_id is False, neither the message_id
- or the correlation_id will be changed in the properties.
-
- If exchange is not set, the exchange the message was received on will
- be used.
-
- If reply_to is set in the original properties,
- it will be used as the routing key. If the reply_to is not set
- in the properties and it is not passed in, a ValueException will be
- raised. If reply to is set in the properties, it will be cleared out
- prior to the message being republished.
-
- Like with the publish method, if you pass in a non-String object and
- content-type is set to a supported content type, the content will
- be auto-serialized. In addition, if the content-encoding is set to a
- supported encoding, it will be auto-encoded.
-
- :param any response_body: The message body to send
- :param rejected.data.Properties properties: Message properties to use
- :param bool auto_id: Automatically shuffle message_id and correlation_id
- :param str reply_to: Override the reply_to in the properties
- :raises: ValueError
-
- """
- if not properties.reply_to and not reply_to:
- raise ValueError('Missing reply_to in properties or as argument')
-
- if auto_id and properties.message_id:
- properties.app_id = __name__
- properties.correlation_id = properties.message_id
- properties.message_id = str(uuid.uuid4())
- properties.timestamp = int(time.time())
- logger.debug('New message_id: %s', properties.message_id)
- logger.debug('Correlation_id: %s', properties.correlation_id)
-
- # Redefine the reply to if needed
- reply_to = reply_to or properties.reply_to
-
- # Wipe out reply_to if it's set
- if properties.reply_to:
- properties.reply_to = None
-
- self.publish_message(exchange or self._message.exchange,
- reply_to,
- properties,
- response_body)
-
def set_channel(self, channel):
"""Assign the _channel attribute to the channel that was passed in.
View
12 setup.py
@@ -18,17 +18,17 @@
],
keywords='amqp rabbitmq',
author='Gavin M. Roy',
- author_email='gmr@myyearbook.com',
+ author_email='gmr@meetme.com',
url='http://github.com/gmr/rejected',
license='BSD',
packages=['rejected'],
- install_requires=['clihelper',
+ install_requires=['beautifulsoup4',
+ 'clihelper',
'pika',
+ 'pgsql_wrapper',
+ 'pyyaml',
+ 'redis',
'tornado'],
- extras_require={'HTML': 'beautifulsoup4',
- 'PostgreSQL': 'pgsql_wrapper',
- 'Redis': 'redis',
- 'YAML': 'pyyaml'},
tests_require=['mock', 'nose', 'unittests2'],
entry_points=dict(console_scripts=['rejected=rejected.controller:main']),
zip_safe=True)
View
17 tests/test_consumer.py
@@ -1,10 +1,7 @@
# coding=utf-8
"""Tests for rejected.consumer"""
import sys
-try:
- import bs4
-except ImportError:
- bs4 = None
+import bs4
import bz2
import csv
import datetime
@@ -149,15 +146,9 @@ def setUp(self):
def tearDown(self):
del self._obj
- def test_initialize_called(self):
- @mock.patch.object(consumer.Consumer, '_initialize')
- def validate_method_called(mock_method=None):
- obj = consumer.Consumer({})
- mock_method.assertCalled()
- validate_method_called()
-
def test_underscore_process_raises_exception(self):
- self.assertRaises(NotImplementedError, self._obj._process)
+ consumer_ = consumer.Consumer({'config': True})
+ self.assertRaises(NotImplementedError, consumer_._process)
def test_bz2_decompress(self):
value = ('BZh91AY&SY\xe7\xdex,\x00\x00E\x9f\x80\x10\x07\x7f\xf0\x00'
@@ -566,5 +557,5 @@ def test_message_body_called_twice(self):
message.body = self._PLIST
message.properties.content_type = None
self._obj.process(message)
- unused_body_for_first_call = self._obj.message_body
+ _first_call = self._obj.message_body
self.assertEqual(self._obj.message_body, self._PLIST)
Please sign in to comment.
Something went wrong with that request. Please try again.