diff --git a/CHANGES.rst b/CHANGES.rst index e7dfea4d..4685d739 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,6 +4,14 @@ Changelog 0.3.x ~~~~~ +Version 0.3.11 +------------- + +Released on September 26, 2018. + +- support asyncio +- support tornado 5.x + Version 0.3.10 ------------- diff --git a/setup.py b/setup.py index 95030c06..fadf84d2 100644 --- a/setup.py +++ b/setup.py @@ -18,10 +18,17 @@ ] tornado_requires = [ - "tornado>=4.0,<5.0", - "toro>=0.6" + "tornado>=4.0,<6.0", ] +try: + from tornado import version as tornado_version + if tornado_version < '5.0': + tornado_requires.append("toro>=0.6") +except ImportError: + # tornado will now only get installed and we'll get the newer one + pass + dev_requires = [ "cython>=0.28.4", "flake8>=2.5", diff --git a/tests/test_framed_transport.py b/tests/test_framed_transport.py index 38ee2d90..0d7eefd0 100644 --- a/tests/test_framed_transport.py +++ b/tests/test_framed_transport.py @@ -20,6 +20,11 @@ from thriftpy.transport.framed import TFramedTransportFactory from thriftpy.protocol.binary import TBinaryProtocolFactory +try: + import asyncio +except ImportError: + asyncio = None + from thriftpy._compat import CYTHON logging.basicConfig(level=logging.INFO) @@ -55,14 +60,9 @@ class FramedTransportTestCase(TestCase): PROTOCOL_FACTORY = TBinaryProtocolFactory() def mk_server(self): - self.io_loop = ioloop.IOLoop() - server = make_server(addressbook.AddressBookService, - Dispatcher(self.io_loop), io_loop=self.io_loop) - - self.server = server sock = self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('localhost', 0)) + sock.bind(('127.0.0.1', 0)) sock.setblocking(0) self.port = sock.getsockname()[-1] self.server_thread = threading.Thread(target=self.listen) @@ -71,7 +71,14 @@ def mk_server(self): def listen(self): self.server_sock.listen(128) - self.server.add_socket(self.server_sock) + if asyncio: + # In Tornado 5.0+, the asyncio event loop will be used + # automatically by default + asyncio.set_event_loop(asyncio.new_event_loop()) + self.io_loop = ioloop.IOLoop.current() + server = make_server(addressbook.AddressBookService, + Dispatcher(self.io_loop), io_loop=self.io_loop) + server.add_socket(self.server_sock) self.io_loop.start() def mk_client(self): @@ -85,6 +92,9 @@ def setUp(self): time.sleep(0.1) self.client = self.mk_client() + def tearDown(self): + self.io_loop.stop() + @pytest.mark.skipif(sys.version_info[:2] == (2, 6), reason="not support") def test_able_to_communicate(self): dennis = addressbook.Person(name='Dennis Ritchie') diff --git a/thriftpy/__init__.py b/thriftpy/__init__.py index 28a00b6d..e466b902 100644 --- a/thriftpy/__init__.py +++ b/thriftpy/__init__.py @@ -5,7 +5,7 @@ from .hook import install_import_hook, remove_import_hook from .parser import load, load_module, load_fp -__version__ = '0.3.10' +__version__ = '0.3.11' __python__ = sys.version_info __all__ = ["install_import_hook", "remove_import_hook", "load", "load_module", "load_fp"] diff --git a/thriftpy/thrift.py b/thriftpy/thrift.py index f99bc76a..8ae8216a 100644 --- a/thriftpy/thrift.py +++ b/thriftpy/thrift.py @@ -287,9 +287,8 @@ def handle_exception(self, e, result): _, exc_name, exc_cls, _ = result.thrift_spec[k] if isinstance(e, exc_cls): setattr(result, exc_name, e) - break - else: - raise + return True + return False def process(self, iprot, oprot): api, seqid, result, call = self.process_in(iprot) @@ -301,7 +300,8 @@ def process(self, iprot, oprot): result.success = call() except Exception as e: # raise if api don't have throws - self.handle_exception(e, result) + if not self.handle_exception(e, result): + raise if not result.oneway: self.send_result(oprot, api, result, seqid) diff --git a/thriftpy/tornado.py b/thriftpy/tornado.py index fd53ebf4..b312ed4e 100644 --- a/thriftpy/tornado.py +++ b/thriftpy/tornado.py @@ -18,7 +18,8 @@ from __future__ import absolute_import from contextlib import contextmanager -from tornado import tcpserver, ioloop, iostream, gen +from tornado import tcpserver, iostream, gen +from tornado import version as tornado_version from io import BytesIO from datetime import timedelta @@ -32,7 +33,15 @@ import logging import socket import struct -import toro + +try: + from tornado.locks import Lock +except ImportError: + try: + from toro import Lock + except ImportError: + raise RuntimeError('With tornado {}, you need to install ' + '"toro"'.format(tornado_version)) logger = logging.getLogger(__name__) @@ -47,12 +56,12 @@ def __init__(self, host, port, stream=None, io_loop=None, ssl_options=None, read_timeout=DEFAULT_READ_TIMEOUT): self.host = host self.port = port - self.io_loop = io_loop or ioloop.IOLoop.current() + self.io_loop = io_loop self.read_timeout = read_timeout self.is_queuing_reads = False self.read_queue = [] self.__wbuf = BytesIO() - self._read_lock = toro.Lock() + self._read_lock = Lock() self.ssl_options = ssl_options # servers provide a ready-to-go stream @@ -60,8 +69,12 @@ def __init__(self, host, port, stream=None, io_loop=None, ssl_options=None, if self.stream is not None: self._set_close_callback() - def with_timeout(self, timeout, future): - return gen.with_timeout(timeout, future, self.io_loop) + if tornado_version >= '5.0': + def with_timeout(self, timeout, future): + return gen.with_timeout(timeout, future) + else: + def with_timeout(self, timeout, future): + return gen.with_timeout(timeout, future, self.io_loop) @gen.coroutine def open(self, timeout=DEFAULT_CONNECT_TIMEOUT): @@ -158,12 +171,15 @@ def __init__(self, processor, iprot_factory, oprot_factory=None, else iprot_factory) self.transport_read_timeout = transport_read_timeout + # `io_loop` has been deprecated since tornado 4.1 and removed in 5.0 + self.__io_loop = getattr(self, 'io_loop', None) + @gen.coroutine def handle_stream(self, stream, address): host, port = address trans = TTornadoStreamTransport( host=host, port=port, stream=stream, - io_loop=self.io_loop, read_timeout=self.transport_read_timeout) + io_loop=self.__io_loop, read_timeout=self.transport_read_timeout) try: oprot = self._oprot_factory.get_protocol(trans) iprot = self._iprot_factory.get_protocol(TMemoryBuffer()) @@ -214,9 +230,14 @@ def make_server( io_loop=None, ssl_options=None, transport_read_timeout=TTornadoStreamTransport.DEFAULT_READ_TIMEOUT): processor = TProcessor(service, handler) - server = TTornadoServer(processor, iprot_factory=proto_factory, - transport_read_timeout=transport_read_timeout, - io_loop=io_loop, ssl_options=ssl_options) + if tornado_version >= '5.0': + server = TTornadoServer(processor, iprot_factory=proto_factory, + transport_read_timeout=transport_read_timeout, + ssl_options=ssl_options) + else: + server = TTornadoServer(processor, iprot_factory=proto_factory, + transport_read_timeout=transport_read_timeout, + io_loop=io_loop, ssl_options=ssl_options) return server diff --git a/tox.ini b/tox.ini index 8e5fd5b8..72c69c38 100644 --- a/tox.ini +++ b/tox.ini @@ -7,14 +7,12 @@ changedir = tests commands = - py.test [] + py.test -W ignore [] deps = pytest - tornado==4.0 - toro + tornado>=4.0,<6.0 cython - py26: ordereddict py35,py36: pytest_asyncio [testenv:flake8]