Migrate from tornado gen.coroutine to native async/await#54
Conversation
- Replace @gen.coroutine + yield with async def + await throughout - Replace tornado.locks.Lock/Event with asyncio.Lock/Event - Replace tornado.ioloop.IOLoop with asyncio event loop - Replace tornado.concurrent.Future with asyncio.Future - Replace pika.adapters.tornado_connection with asyncio_connection - Replace tornado.iostream.IOStream with stdlib socket (statsd TCP) - Replace tornado.testing.AsyncTestCase + gen_test with unittest.IsolatedAsyncioTestCase - Fix asyncTearDown deadlock in TCPTestCase: close statsd socket before calling server.wait_closed() - Remove tornado from dependencies Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 13 minutes and 0 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughThe codebase undergoes a comprehensive migration from Tornado to native asyncio. Core library modules—consumer, process, mixins, and statsd—convert all async primitives to native async/await syntax, asyncio locks, and asyncio event loops. Test infrastructure is rebased to unittest.IsolatedAsyncioTestCase, and the Tornado dependency is removed from the project configuration. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Process
participant Consumer
participant EventLoop
participant RabbitMQ
Note over Process,EventLoop: Asyncio-Based Message Processing Flow (New)
RabbitMQ->>Process: on_message_callback(message)
Process->>EventLoop: asyncio.ensure_future(invoke_consumer)
activate EventLoop
EventLoop->>Process: invoke_consumer()
activate Process
Process->>Process: async with consumer_lock
Process->>Consumer: await execute(message)
activate Consumer
Consumer->>Consumer: await prepare()
Consumer->>Consumer: await process()
Consumer->>Consumer: await on_finish()
Consumer-->>Process: return
deactivate Consumer
Process-->>EventLoop: complete
deactivate Process
deactivate EventLoop
EventLoop->>Client: task complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~80 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…async - Consumer.initialize, prepare, process, finish, on_finish, on_blocked, and on_unblocked are now async def - initialize() is no longer called from __init__ (can't await there); execute() calls await self.initialize() lazily on first invocation, tracked by self._initialized - execute() now directly awaits prepare(), process(), and finish() instead of using the iscoroutine/isfuture guard pattern - finish() awaits on_finish(); on_finish() remains overridable as async - _get_exc_info(result) replaced with sys.exc_info() since exceptions now propagate normally through await rather than via Future state - Remove io_loop property (asyncio.get_event_loop() is deprecated; use asyncio.get_running_loop() directly where needed) - process.on_connection_blocked/unblocked schedule consumer callbacks via asyncio.ensure_future() since pika callbacks are sync - Fix pre-existing bug: on_connection_unblocked was calling on_blocked instead of on_unblocked - mixins.GarbageCollectionMixin.on_finish is now async def - testing.asyncTearDown awaits consumer.finish() - Tests updated: TestConsumer/TestSmartConsumer.process → async def; mock.patch.object for process uses AsyncMock; initialize test updated to check _initialized flag; process raises test uses async assertRaises Co-Authored-By: Gavin M. Roy <gavinmroy@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rejected/statsd.py (1)
146-172:⚠️ Potential issue | 🟠 Major
_tcp_on_closed()is orphan code—TCP reconnection after disconnect is not implemented.The
_tcp_on_closed()method is defined but never invoked anywhere in the codebase. In the previous Tornado implementation, this would have been wired viaIOStream.set_close_callback(). After migration to raw sockets, this callback integration is missing, so TCP reconnection on disconnect will not occur.When
_tcp_socket()fails and returnsNone(line 169),_tcp_on_closed()assigns it directly to_tcp_writerwithout checking (line 151). However, this is handled safely in_send()by the guard clause on line 119 (if self._tcp_writer:), which falls back to UDP when_tcp_writerisNone. Still, once a TCP connection fails, the system silently switches to UDP instead of attempting to reconnect.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rejected/statsd.py` around lines 146 - 172, _tcp_on_closed is never invoked after migrating away from Tornado, so TCP reconnection never happens; update the TCP path so socket-close detection triggers _tcp_on_closed and make _tcp_on_closed perform retry/backoff reconnects instead of simply assigning the return of _tcp_socket to _tcp_writer. Specifically, wire a reader/monitor or non-blocking recv/selector callback (or spawn a short-lived thread) that detects socket shutdown and calls _tcp_on_closed, modify _tcp_on_closed to cancel/clean the old socket, schedule repeated attempts to call _tcp_socket with exponential backoff until it returns a socket (setting self._tcp_writer only on success), and change _tcp_socket so connection failures do not call the global shutdown callback immediately but instead return None (or raise a controlled exception) so the reconnection logic can handle retries while _send continues to fall back to UDP.
🧹 Nitpick comments (3)
rejected/statsd.py (2)
119-127: TCP error handling invokes_failure_callback()but does not clean up_tcp_writer.When an
OSErroroccurs during_send()on the TCP path,_failure_callback()is called but_tcp_writeris not cleared. This leaves a potentially broken socket reference. Subsequent sends would continue to fail and repeatedly call_failure_callback().Consider clearing
_tcp_writerwhen an error occurs:♻️ Proposed fix
except OSError as error: # pragma: nocover if self._connected: LOGGER.exception('Error sending statsd metric: %s', error) self._connected = False + if self._tcp_writer: + try: + self._tcp_writer.close() + except OSError: + pass + self._tcp_writer = None self._failure_callback()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rejected/statsd.py` around lines 119 - 127, The OSError handler in _send() clears _connected and calls _failure_callback() but leaves the broken _tcp_writer reference; update the except block to also clean up the TCP writer (e.g., close/terminate and set self._tcp_writer = None) when self._tcp_writer was being used, so future sends don't reuse a bad socket; ensure the logic touches the same symbols (_tcp_writer, _connected, _failure_callback, _send) and still calls _failure_callback() after safely disposing the writer.
159-172: Blockingsocket.connect()will stall the event loop if reconnection is restored.If the
_tcp_on_closed()reconnection logic is fixed to be invoked (e.g., via error handling), the synchronoussocket.connect()call will block the asyncio event loop. Consider usingloop.sock_connect()for non-blocking TCP connections, or document that TCP statsd is best-effort and connection failures trigger shutdown.Given that
_failure_callback()triggers process shutdown (per context snippet inrejected/process.py:1129-1133), the current behavior may be intentional—failure means shutdown, no reconnection. If so,_tcp_on_closed()can be removed as dead code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rejected/statsd.py` around lines 159 - 172, The current synchronous socket.connect(self._address) in the TCP connection path blocks the asyncio event loop; change the implementation in the statsd TCP connect routine (referenced by self._address, self._failure_callback and _tcp_on_closed) to perform a non-blocking connect via the event loop (use loop = asyncio.get_running_loop(), set the socket non-blocking and await loop.sock_connect(sock, self._address)), catch exceptions and call self._failure_callback() the same way, set self._connected and return the socket after the await; alternatively, if TCP failures are intended to be terminal, add a clear comment and remove the now-dead _tcp_on_closed reconnection logic instead of leaving blocking connect in place.rejected/consumer.py (1)
230-234: Minor docstring inconsistency: still mentions Future semantics.The docstring mentions "If this method returns a coroutine or
asyncio.Future" but nowprepare()is always anasync defand must be awaited. Consider simplifying the docstring since the method is now natively async.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rejected/consumer.py` around lines 230 - 234, The docstring for prepare() still references "If this method returns a coroutine or asyncio.Future" which is outdated because prepare() is now always defined as async; update the prepare() docstring to remove the Future/coroutine conditional and instead state that prepare() is an async def method that must be awaited and runs before consumption starts (keep the "Asynchronous support" note but simplify wording to reflect native async semantics). Make the change in the prepare() method's docstring so the language clarifies it's natively asynchronous and requires await.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/test_statsd.py`:
- Around line 225-226: The test currently bypasses the client's reconnect logic
by directly assigning self.statsd._tcp_writer = self.statsd._tcp_socket() before
calling set_gauge('bar', 10); instead, simulate a broken connection (e.g.,
replace _tcp_writer with a writer that will fail on write or close it) and let
set_gauge('bar', 10) trigger the client's reconnect-on-send path, or monkeypatch
_tcp_socket() to provide a new socket when the client attempts to reconnect; do
not directly reassign _tcp_writer in the test so the client's own
broken-connection detection and reconnect logic (methods involving _tcp_writer
and _tcp_socket()) are exercised.
- Around line 152-164: The test currently reserves an ephemeral port by creating
and closing a temporary socket (sock.bind(('127.0.0.1', 0))) before calling
loop.create_server, which opens a race; instead let create_server bind to port 0
directly and then read the assigned port from the server socket(s). Replace the
temporary-socket block with calling loop.create_server(lambda:
self._server_protocol, '127.0.0.1', 0), await it to set self._server, and then
set self.port = self._server.sockets[0].getsockname()[1] so the test uses the
actual bound port; keep using StatsdServer, self._server_protocol and
self._server as-is.
- Around line 171-172: Remove the nondeterministic asyncio.sleep calls and
replace them with deterministic event waits: delete the short await
asyncio.sleep(0.01) after creating the statsd Client because _tcp_socket() is
synchronous and the connection is already established; for the later 2-second
sleep (after the server receives the 'reconnect' packet and reconnect_receive is
set), replace it with an explicit await on the concrete synchronization
primitive (e.g., await asyncio.wait_for(self._server_protocol.event.wait(),
timeout=...) or await asyncio.wait_for(reconnect_receive.wait(), timeout=...))
so the test waits for the actual state change instead of wall-clock time.
---
Outside diff comments:
In `@rejected/statsd.py`:
- Around line 146-172: _tcp_on_closed is never invoked after migrating away from
Tornado, so TCP reconnection never happens; update the TCP path so socket-close
detection triggers _tcp_on_closed and make _tcp_on_closed perform retry/backoff
reconnects instead of simply assigning the return of _tcp_socket to _tcp_writer.
Specifically, wire a reader/monitor or non-blocking recv/selector callback (or
spawn a short-lived thread) that detects socket shutdown and calls
_tcp_on_closed, modify _tcp_on_closed to cancel/clean the old socket, schedule
repeated attempts to call _tcp_socket with exponential backoff until it returns
a socket (setting self._tcp_writer only on success), and change _tcp_socket so
connection failures do not call the global shutdown callback immediately but
instead return None (or raise a controlled exception) so the reconnection logic
can handle retries while _send continues to fall back to UDP.
---
Nitpick comments:
In `@rejected/consumer.py`:
- Around line 230-234: The docstring for prepare() still references "If this
method returns a coroutine or asyncio.Future" which is outdated because
prepare() is now always defined as async; update the prepare() docstring to
remove the Future/coroutine conditional and instead state that prepare() is an
async def method that must be awaited and runs before consumption starts (keep
the "Asynchronous support" note but simplify wording to reflect native async
semantics). Make the change in the prepare() method's docstring so the language
clarifies it's natively asynchronous and requires await.
In `@rejected/statsd.py`:
- Around line 119-127: The OSError handler in _send() clears _connected and
calls _failure_callback() but leaves the broken _tcp_writer reference; update
the except block to also clean up the TCP writer (e.g., close/terminate and set
self._tcp_writer = None) when self._tcp_writer was being used, so future sends
don't reuse a bad socket; ensure the logic touches the same symbols
(_tcp_writer, _connected, _failure_callback, _send) and still calls
_failure_callback() after safely disposing the writer.
- Around line 159-172: The current synchronous socket.connect(self._address) in
the TCP connection path blocks the asyncio event loop; change the implementation
in the statsd TCP connect routine (referenced by self._address,
self._failure_callback and _tcp_on_closed) to perform a non-blocking connect via
the event loop (use loop = asyncio.get_running_loop(), set the socket
non-blocking and await loop.sock_connect(sock, self._address)), catch exceptions
and call self._failure_callback() the same way, set self._connected and return
the socket after the await; alternatively, if TCP failures are intended to be
terminal, add a clear comment and remove the now-dead _tcp_on_closed
reconnection logic instead of leaving blocking connect in place.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e533143b-b7db-470c-8b14-08c0c3109b40
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
pyproject.tomlrejected/consumer.pyrejected/mixins.pyrejected/process.pyrejected/statsd.pyrejected/testing.pytests/test_consumer.pytests/test_process.pytests/test_statsd.pytests/test_testing.py
💤 Files with no reviewable changes (1)
- pyproject.toml
…ry sleep - Bind the test server directly to port 0 and read the assigned port from `self._server.sockets[0].getsockname()[1]`, eliminating the TOCTOU window where a temporary socket was closed before the server could bind. - Remove the `asyncio.sleep(0.01)` after constructing the statsd Client since `_tcp_socket()` is a synchronous blocking connect; the connection is fully established before the constructor returns. Co-Authored-By: Gavin M. Roy <gavinmroy@gmail.com>
|
🤖 This comment was posted by Claude on behalf of @gmr Response to CodeRabbit's ReviewRe: Fixed in 7d2fbfd. The server now binds directly to port 0 and the assigned port is read from Re: Also fixed in 7d2fbfd. Since Re: Respectfully disagreeing here. The statsd Re: This import block is pre-existing code unchanged by this PR. The PR's scope is migrating |
Summary
Remove the tornado dependency and replace all tornado-specific async patterns with Python's native asyncio throughout the library.
Problem
The library used tornado's
@gen.coroutine/yieldpattern for async execution, which predates Python 3.5'sasync def/awaitsyntax. This created a hard dependency on tornado for consumers that don't otherwise need it, and prevented interoperability with pure asyncio codebases.Solution
@gen.coroutine+yieldwithasync def+awaittornado.locks.Lock/Eventwithasyncio.Lock/asyncio.Eventtornado.ioloop.IOLoopwith the asyncio event looptornado.concurrent.Futurewithasyncio.Futurepika.adapters.tornado_connectionwithpika.adapters.asyncio_connectiontornado.iostream.IOStreamwith stdlibsocketfor the statsd TCP pathtornado.testing.AsyncTestCase+gen_testwithunittest.IsolatedAsyncioTestCasetornadofrom dependencies entirelyAlso fixed a deadlock in the statsd TCP test teardown:
wait_closed()was called before the client socket was closed, causing it to hang indefinitely.Impact
Breaking change for consumers that subclass
ConsumerorSmartConsumerusing@gen.coroutine/yield— these must be updated toasync def/await. The public API shape is otherwise identical.Consumers that were already using
async def/await(compatible with tornado since Python 3.5) are unaffected.All 225 tests pass.
Summary by CodeRabbit