Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Make sure that errors from user code bubble up correctly.

  • Loading branch information...
commit fd946bc681b12c9531ed2321162d546a7d0fab3f 1 parent 7410ced
Dan Fairs authored
View
2  datasift/__init__.py
@@ -588,7 +588,7 @@ def _on_error(self, message):
self._event_handler.on_error(self, message)
def _on_warning(self, message):
- self._event_handler.on_error(self, message)
+ self._event_handler.on_warning(self, message)
def _on_disconnect(self):
self._event_handler.on_disconnect(self)
View
2  datasift/streamconsumer_http.py
@@ -107,7 +107,7 @@ def run(self):
self._consumer._on_error('Received %s response, no more retries' % (resp_code))
break
self._consumer._on_warning('Received %s response, retrying in %s seconds' % (resp_code, connection_delay))
- except Exception, exception:
+ except urllib2.HTTPError:
if connection_delay < 16:
connection_delay += 1
self._consumer._on_warning('Connection failed (%s), retrying in %s seconds' % (exception, connection_delay))
View
2  setup.py
@@ -1,6 +1,5 @@
# encoding: utf-8
-import os
from setuptools import setup
from datasift import __version__
@@ -15,6 +14,7 @@
license = "Copyright (C) 2012 by MediaSift Ltd. All Rights Reserved. See LICENSE for the full license.",
url = "https://github.com/datasift/datasift-python",
packages=['datasift', 'tests'],
+ tests_require=['mock>=0.8.0'],
include_package_data = True,
platforms='any',
classifiers=[
View
1  tests/run_tests.py
@@ -4,6 +4,7 @@
# Import the tests
from test_user import TestUser
from test_definition import TestDefinition
+from test_http_stream import TestHttpStreamErrors
# Run the tests
unittest.main()
View
138 tests/test_http_stream.py
@@ -0,0 +1,138 @@
+import datasift
+import mock
+import unittest
+
+
+class TestHttpStreamErrors(unittest.TestCase):
+ """ Tests to ensure that the HTTP streamer implementation does not
+ swallow errors raised by user-supplied event handler classes."""
+
+ def _make_stream(self, broken_method, is_running):
+ from datasift.streamconsumer_http import (StreamConsumer_HTTP,
+ StreamConsumer_HTTP_Thread)
+ import testdata
+ user = datasift.User('fake', 'user')
+ client = datasift.mockapiclient.MockApiClient()
+ response = {
+ 'response_code': 200,
+ 'data': {
+ 'hash': testdata.definition_hash,
+ 'created_at': '2011-12-13 14:15:16',
+ 'dpu': 10,
+ },
+ 'rate_limit': 200,
+ 'rate_limit_remaining': 150,
+ }
+ client.set_response(response)
+ user.set_api_client(client)
+ definition = datasift.Definition(user, 'some cdsl')
+ handler = BrokenHandler(broken_method)
+ consumer = StreamConsumer_HTTP(user, definition, handler)
+ if is_running:
+ consumer._state = consumer.STATE_RUNNING
+ return StreamConsumer_HTTP_Thread(consumer)
+
+ def _check(self, sc):
+ # Prefer self.assertRaises in future Python versions
+ try:
+ sc.run()
+ except UserException:
+ pass
+ else:
+ self.fail('UserException not raised')
+
+ def _setup_mocks(self, request, urlopen):
+ request.return_value = mock.Mock(name='request')
+ response = mock.Mock(name='response')
+ urlopen.return_value = response
+ response.getcode.return_value = 200
+ return response
+
+ @mock.patch('urllib2.urlopen')
+ @mock.patch('urllib2.Request')
+ def test_connect_exception(self, request, urlopen):
+ self._setup_mocks(request, urlopen)
+ sc = self._make_stream('on_connect', True)
+ self._check(sc)
+
+ @mock.patch('urllib2.urlopen')
+ @mock.patch('urllib2.Request')
+ def test_interaction_exception(self, request, urlopen):
+ response = self._setup_mocks(request, urlopen)
+ sc = self._make_stream('on_interaction', True)
+ response.readline.return_value = '{"interaction": "json"}'
+ self._check(sc)
+
+ @mock.patch('urllib2.urlopen')
+ @mock.patch('urllib2.Request')
+ def test_deleted_exception(self, request, urlopen):
+ response = self._setup_mocks(request, urlopen)
+ sc = self._make_stream('on_deleted', True)
+ response.readline.return_value = '{"interaction": "x", "deleted": "1"}'
+ self._check(sc)
+
+ @mock.patch('urllib2.urlopen')
+ @mock.patch('urllib2.Request')
+ def test_warning_exception(self, request, urlopen):
+ response = self._setup_mocks(request, urlopen)
+ sc = self._make_stream('on_warning', True)
+ response.readline.return_value = (
+ '{"status": "warning", "message":'' "foo"}'
+ )
+ self._check(sc)
+
+ @mock.patch('urllib2.urlopen')
+ @mock.patch('urllib2.Request')
+ def test_error_exception(self, request, urlopen):
+ response = self._setup_mocks(request, urlopen)
+ sc = self._make_stream('on_error', True)
+ response.readline.return_value = (
+ '{"status": "error", "message":'' "foo"}'
+ )
+ self._check(sc)
+
+ @mock.patch('urllib2.urlopen')
+ @mock.patch('urllib2.Request')
+ def test_disconnect_exception(self, request, urlopen):
+ self._setup_mocks(request, urlopen)
+ sc = self._make_stream('on_disconnect', False)
+ self._check(sc)
+
+
+class UserException(Exception):
+ """ Custom exception that we can explicitly test for """
+ pass
+
+
+class BrokenHandler(datasift.StreamConsumerEventHandler):
+
+ def __init__(self, broken_method):
+ self.broken_method = broken_method
+
+ def on_connect(self, consumer):
+ if self.broken_method == 'on_connect':
+ raise UserException()
+
+ def on_interaction(self, consumer, interaction, hash):
+ if self.broken_method == 'on_interaction':
+ raise UserException()
+
+ def on_deleted(self, consumer, interaction, hash):
+ if self.broken_method == 'on_deleted':
+ raise UserException()
+
+ def on_warning(self, consumer, msg):
+ if self.broken_method == 'on_warning':
+ raise UserException()
+
+ def on_error(self, consumer, msg):
+ if self.broken_method == 'on_error':
+ raise UserException()
+
+ def on_disconnect(self, consumer):
+ if self.broken_method == 'on_disconnect':
+ raise UserException()
+
+
+if __name__ == '__main__':
+ unittest.main()
Please sign in to comment.
Something went wrong with that request. Please try again.