Skip to content

Commit

Permalink
Merge pull request #6 from gmr/master
Browse files Browse the repository at this point in the history
Updated rejected compatibility
  • Loading branch information
dave-shawley committed Feb 18, 2017
2 parents e0d1955 + 231bfac commit 481fa98
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 111 deletions.
4 changes: 4 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ Configuration Related
Release History
===============

`Next Release`_
---------------
- Updated to work against rejected 3.17

`0.2.0`_ (10-Jan-2017)
----------------------
- Added ``url`` keyword to
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ with-coverage = 1
cover-package = vetoes
cover-erase = 1
cover-branches = 1
verbosity=2

[upload_docs]
upload-dir = build/sphinx/html
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
coverage
mock
nose
rejected<=3.12.2
rejected>=3.17,<3.18
164 changes: 71 additions & 93 deletions tests/service_mixin_tests.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import json
import socket

from rejected import consumer
from tornado import concurrent, gen, httpclient, testing
import helper.config
from rejected import consumer, testing
from tornado import concurrent, gen
import mock

from vetoes import service


class Consumer(service.HTTPServiceMixin):
class Consumer(service.HTTPServiceMixin,
consumer.Consumer):

def __init__(self, *args, **kwargs):
kwargs['service_map'] = {'fetch-stats': 'httpbin'}
super(Consumer, self).__init__(*args, **kwargs)
self.method = 'GET'
self.request_body = None
self.request_json = None

@gen.coroutine
def process(self):
yield self.call_http_service('fetch-stats', 'GET', 'stats')
yield self.call_http_service('fetch-stats', self.method, 'stats',
**{'body': self.request_body,
'json': self.request_json})

def get_service_url(self, service, *path, **kwargs):
return 'http://httpbin.org/status/200'
Expand All @@ -27,134 +32,107 @@ class HTTPServiceMixinTests(testing.AsyncTestCase):

def setUp(self):
super(HTTPServiceMixinTests, self).setUp()
self.rejected_process = mock.Mock()
self.consumer_config = helper.config.Data()
self.consumer = Consumer(self.consumer_config, self.rejected_process)

self.statsd_add_timing = mock.Mock()
self.statsd_incr = mock.Mock()
self.consumer._set_statsd(mock.Mock(incr=self.statsd_incr,
add_timing=self.statsd_add_timing))

self.sentry_client = mock.Mock()
self.rejected_process.sentry_client = self.sentry_client
self.sentry_client.tags = mock.Mock()

self.channel = mock.Mock()
self.channel.connection.ioloop = self.io_loop
self.consumer._channel = self.channel

self.http = mock.Mock()
self.consumer.http = mock.Mock()
self.http_response = mock.Mock(code=200, request_time=0)
self.consumer.http = self.http
response = concurrent.Future()
response.set_result(self.http_response)
self.http.fetch.return_value = response
self.consumer.http.fetch.return_value = concurrent.Future()
self.consumer.http.fetch.return_value.set_result(self.http_response)

@testing.gen_test
def run_consumer(self, message_body=None, correlation_id=None):
self.consumer._clear()
self.consumer._message = mock.Mock()
self.consumer._message.body = message_body or {}
self.consumer._message.properties.correlation_id = correlation_id
try:
maybe_future = self.consumer.prepare()
if concurrent.is_future(maybe_future):
yield maybe_future
if not self.consumer._finished:
maybe_future = self.consumer.process()
if concurrent.is_future(maybe_future):
yield maybe_future
finally:
if not self.consumer._finished:
self.consumer.finish()
def get_consumer(self):
return Consumer

@testing.gen_test
def test_that_sentry_context_is_managed(self):
self.run_consumer()
self.sentry_client.tags_context.assert_called_once_with(
{'service_invoked': 'httpbin'})
self.sentry_client.tags.pop.assert_called_once_with(
'service_invoked', None)
with mock.patch.multiple(self.consumer,
set_sentry_context=mock.DEFAULT,
unset_sentry_context=mock.DEFAULT) as context:
yield self.process_message()
context['set_sentry_context'].assert_called_once_with(
'service_invoked', 'httpbin')
context['unset_sentry_context'].assert_called_once_with(
'service_invoked')

@testing.gen_test
def test_that_metrics_are_emitted(self):
self.run_consumer()
self.statsd_add_timing.assert_any_call(
'http.fetch-stats.200', self.http_response.request_time)
measurement = yield self.process_message()
self.assertIn('http.fetch-stats.200', measurement.values)
self.assertEqual(measurement.values['http.fetch-stats.200'],
self.http_response.request_time)

@testing.gen_test
def test_that_timeout_result_in_processing_exceptions(self):
self.http_response.code = 599
with self.assertRaises(consumer.ProcessingException):
self.run_consumer()
self.statsd_add_timing.assert_any_call(
'http.fetch-stats.599', self.http_response.request_time)

measurement = yield self.process_message()
self.assertEqual(measurement.values['http.fetch-stats.599'],
self.http_response.request_time)
@testing.gen_test
def test_that_rate_limiting_result_in_processing_exceptions(self):
self.http_response.code = 429
with self.assertRaises(consumer.ProcessingException):
self.run_consumer(mock.Mock())
self.statsd_add_timing.assert_any_call(
'http.fetch-stats.429', self.http_response.request_time)
measurement = yield self.process_message()
self.assertEqual(measurement.values['http.fetch-stats.429'],
self.http_response.request_time)

@testing.gen_test
def test_that_call_http_service_accepts_body(self):
yield self.consumer.call_http_service('fetch-stats', 'POST',
body=mock.sentinel.body)
self.http.fetch.assert_called_once_with(
self.consumer.method = 'POST'
self.consumer.request_body = mock.sentinel.body
yield self.process_message()
self.consumer.http.fetch.assert_called_once_with(
self.consumer.get_service_url('fetch-stats'),
headers={'Correlation-Id': self.correlation_id},
method='POST', body=mock.sentinel.body, raise_error=False)

@testing.gen_test
def test_that_call_http_service_jsonifies(self):
yield self.consumer.call_http_service('fetch-stats', 'POST',
json={'one': 1})
self.http.fetch.assert_called_once_with(
self.consumer.method = 'POST'
self.consumer.request_json = {'one': 1}
yield self.process_message()
self.consumer.http.fetch.assert_called_once_with(
self.consumer.get_service_url('fetch-stats'),
method='POST', body=json.dumps({'one': 1}).encode('utf-8'),
headers={'Content-Type': 'application/json'}, raise_error=False)
headers={'Content-Type': 'application/json',
'Correlation-Id': self.correlation_id},
raise_error=False)

@testing.gen_test
def test_that_socket_errors_result_in_processing_exception(self):
future = concurrent.Future()
future.set_exception(socket.error(42, 'message'))
self.http.fetch.return_value = future
self.consumer.http.fetch.return_value = future

with self.assertRaises(consumer.ProcessingException):
self.run_consumer(mock.Mock())
self.statsd_add_timing.assert_any_call(
'http.fetch-stats.timeout', mock.ANY)
self.statsd_incr.assert_any_call('errors.socket.42', 1)

def test_that_correlation_id_from_message_is_passed_through(self):
self.run_consumer(correlation_id=mock.sentinel.correlation_id)
posn, kwargs = self.http.fetch.call_args_list[0]
self.assertEqual(kwargs['headers']['Correlation-ID'],
mock.sentinel.correlation_id)

def test_that_correlation_id_from_consumer_is_passed_through(self):
setattr(self.consumer, '_correlation_id', mock.sentinel.correlation_id)
self.run_consumer()
posn, kwargs = self.http.fetch.call_args_list[0]
self.assertEqual(kwargs['headers']['Correlation-ID'],
mock.sentinel.correlation_id)
yield self.process_message()
self.assertGreater(
self.consumer._measurement.values['http.fetch-stats.timeout'],
self.http_response.request_time)
self.assertEqual(
self.consumer._measurement.counters['errors.socket.42'], 1)

@testing.gen_test
def test_that_raise_error_can_be_overridden(self):
self.http_response.code = 500
self.http_response.rethrow.side_effect = RuntimeError

response = yield self.consumer.call_http_service(
'fetch-stats', 'GET', raise_error=False)

self.http.fetch.assert_called_once_with(
self.consumer.http.fetch.assert_called_once_with(
self.consumer.get_service_url('fetch-stats'),
method='GET', raise_error=False)
self.assertIs(response, self.http_response)

@testing.gen_test
def test_that_url_kwarg_skips_service_lookup(self):
response = yield self.consumer.call_http_service(
'frobinicate', 'GET', url='https://google.com')

self.http.fetch.assert_called_once_with(
'https://google.com', method='GET', raise_error=False)
self.assertIs(response, self.http_response)
self.sentry_client.tags_context.assert_called_once_with(
{'service_invoked': 'frobinicate'})
with mock.patch.multiple(self.consumer,
set_sentry_context=mock.DEFAULT,
unset_sentry_context=mock.DEFAULT) as context:
response = yield self.consumer.call_http_service(
'frobinicate', 'GET', url='https://google.com')
self.consumer.http.fetch.assert_called_once_with(
'https://google.com', method='GET', raise_error=False)
self.assertIs(response, self.http_response)
context['set_sentry_context'].assert_called_once_with(
'service_invoked', 'frobinicate')
context['unset_sentry_context'].assert_called_once_with(
'service_invoked')
71 changes: 54 additions & 17 deletions vetoes/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import socket

from rejected import consumer
from tornado import gen, httpclient, httputil
from tornado import gen, httpclient, httputil, ioloop


class HTTPServiceMixin(consumer.Consumer):
Expand Down Expand Up @@ -32,6 +32,37 @@ class HTTPServiceMixin(consumer.Consumer):
the **HTTP service** which is passed into :meth:`.get_service_url` to
construct the request URL.
HTTP client behavior is controlled via consumer level configuration under
the ``vetoes`` key. The following options are available:
+-----------------+-----------------------------------------------------+
| Key | Description |
+=================+=====================================================+
| max_clients | The max # of simultaneous requests that can be made |
+-----------------+-----------------------------------------------------+
| connect_timeout | Timeout for initial connection in seconds |
+-----------------+-----------------------------------------------------+
| request_time | Timeout for entire request in seconds |
+-----------------+-----------------------------------------------------+
*Example Configuration:*
.. code:: yaml
Application:
Consumers:
example:
consumer: rejected.example.Consumer
connections:
- name: rabbitmq1
qty: 2
queue: generated_messages
config:
vetoes:
max_clients: 10
connect_timeout: 5.0
request_timeout: 30.0
.. attribute:: http_headers
:class:`tornado.httputil.HTTPHeaders` instance of headers
Expand All @@ -49,9 +80,15 @@ def __init__(self, *args, **kwargs):
self.__service_map = kwargs.pop('service_map')
super(HTTPServiceMixin, self).__init__(*args, **kwargs)
self.http_headers = httputil.HTTPHeaders()
self.http = httpclient.AsyncHTTPClient()
self.http.defaults['connect_timeout'] = 5.0
self.http.defaults['request_timeout'] = 30.0
settings = self._settings.get('vetoes', {})
self.http = httpclient.AsyncHTTPClient(
force_instance=True,
max_clients=settings.get('max_clients'))
self.http.defaults['connect_timeout'] = \
settings.get('connect_timeout', 5.0)
self.http.defaults['request_timeout'] = \
settings.get('request_timeout', 30.0)
self.io_loop = ioloop.IOLoop.current()

@gen.coroutine
def call_http_service(self, function, method, *path, **kwargs):
Expand Down Expand Up @@ -87,16 +124,16 @@ def call_http_service(self, function, method, *path, **kwargs):
"""
headers = httputil.HTTPHeaders()
cid = getattr(self, '_correlation_id', self.correlation_id)
if cid:
headers['Correlation-ID'] = cid
if self.correlation_id:
headers['Correlation-Id'] = self.correlation_id
headers.update(self.http_headers)
headers.update(kwargs.pop('headers', {}))

if 'json' in kwargs:
body = json.dumps(kwargs.pop('json')).encode('utf-8')
kwargs['body'] = body
headers.setdefault('Content-Type', 'application/json')
if kwargs['json'] is not None:
headers.setdefault('Content-Type', 'application/json')
kwargs['body'] = json.dumps(kwargs['json']).encode('utf-8')
kwargs.pop('json', None)

if headers:
kwargs['headers'] = headers
Expand All @@ -109,31 +146,31 @@ def call_http_service(self, function, method, *path, **kwargs):
url = self.get_service_url(
service, *path, query_args=kwargs.pop('query_args', None))

self.sentry_client.tags_context({'service_invoked': service})
self.set_sentry_context('service_invoked', service)

self.logger.debug('sending %s request to %s', method, url)
raise_error = kwargs.pop('raise_error', True)
start_time = self._channel.connection.ioloop.time()
start_time = self.io_loop.time()

try:
response = yield self.http.fetch(url, method=method,
raise_error=False, **kwargs)
self.statsd_add_timing(
self.stats_add_timing(
'http.{0}.{1}'.format(function, response.code),
response.request_time)

except (OSError, select.error, socket.error) as e:
self.logger.exception('%s to %s failed', method, url)
self.statsd_add_timing(
self.stats_add_timing(
'http.{0}.timeout'.format(function),
self._channel.connection.ioloop.time() - start_time)
self.statsd_incr(
self.io_loop.time() - start_time)
self.stats_incr(
'errors.socket.{0}'.format(getattr(e, 'errno', 'unknown')))
raise consumer.ProcessingException(
'{0} connection failure - {1}'.format(service, e))

finally:
self.sentry_client.tags.pop('service_invoked', None)
self.unset_sentry_context('service_invoked')

if response.code == 429:
raise consumer.ProcessingException(
Expand Down

0 comments on commit 481fa98

Please sign in to comment.