Fix server outbound write failure creating zombie channels#17845
Conversation
When writeAndFlush() fails (e.g. direct memory OOM), the channel is half-closed by Netty but channelInactive() never fires, creating a zombie channel that accepts queries but never sends responses. The broker sees silent timeouts and keeps routing to the broken channel. Add f.isSuccess() check in the writeAndFlush listener in InstanceRequestHandler.sendResponse(). On failure: log the error, increment NETTY_CONNECTION_SEND_RESPONSE_EXCEPTIONS metric, and close the channel via ctx.close() to trigger proper cleanup. Since sendErrorResponse() delegates to sendResponse(), this covers all outbound writes.
❌ 1 Tests Failed:
View the top 2 failed test(s) by shortest run time
View the full list of 1 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
Pull request overview
Fixes a server-side Netty edge case where outbound writeAndFlush() failures can leave channels half-closed (“zombie” channels) that still accept requests but never return responses, by explicitly closing the channel on write failure and tracking the failure via a new metric.
Changes:
- Add a
ChannelFuturelistener success/failure branch inInstanceRequestHandler.sendResponse(); on failure, log, meter, andctx.close(). - Introduce a new server meter
NETTY_CONNECTION_SEND_RESPONSE_FAILURES. - Add a unit test asserting the channel is closed when the write future completes unsuccessfully.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java |
Close the Netty channel and record a metric when response writes fail, preventing “zombie” channels. |
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java |
Add a new server meter for outbound send-response failures. |
pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java |
Add unit coverage for the failed-write listener behavior (verifies ctx.close()). |
| Throwable cause = f.cause(); | ||
| LOGGER.error("Failed to send response for request: {} table: {}", requestId, tableNameWithType, cause); | ||
| _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_SEND_RESPONSE_FAILURES, 1); | ||
| ctx.close(); |
There was a problem hiding this comment.
The PR description mentions incrementing NETTY_CONNECTION_SEND_RESPONSE_EXCEPTIONS, but the implementation adds/increments NETTY_CONNECTION_SEND_RESPONSE_FAILURES. Please align the metric name (either rename the meter for consistency with existing *_EXCEPTIONS meters, or update the PR description and any dashboards/alerts that expect the old name).
| @BeforeClass | ||
| public void setUp() { | ||
| PinotMetricUtils.init(new PinotConfiguration()); | ||
| PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry(); | ||
| ServerMetrics.register(new ServerMetrics(registry)); | ||
| } |
There was a problem hiding this comment.
PinotMetricUtils.init(...) performs global metrics initialization (including default registration listeners such as the JMX reporter) and ServerMetrics.register(...) mutates a global singleton. This test doesn’t assert on metrics, and InstanceRequestHandler works with the NOOP ServerMetrics by default, so this setup appears unnecessary and can leak state across the test suite. Consider removing this setup, or adding an @AfterClass cleanup (e.g., ServerMetrics.deregister() and PinotMetricUtils.cleanUp()) to avoid cross-test interference.
| ChannelFuture writeFuture = mock(ChannelFuture.class); | ||
| when(ctx.writeAndFlush(any())).thenReturn(writeFuture); | ||
|
|
||
| ArgumentCaptor<GenericFutureListener> listenerCaptor = ArgumentCaptor.forClass(GenericFutureListener.class); |
There was a problem hiding this comment.
In testWriteFailureClosesChannel, the test uses a raw GenericFutureListener captor and suppresses unchecked warnings. It would be safer and clearer to parameterize the captor with the expected Netty listener type (matching ChannelFuture.addListener(...)) so the compiler can enforce the correct future type and the suppression can be removed.
| ArgumentCaptor<GenericFutureListener> listenerCaptor = ArgumentCaptor.forClass(GenericFutureListener.class); | |
| ArgumentCaptor<GenericFutureListener<Future<Void>>> listenerCaptor = | |
| ArgumentCaptor.forClass((Class<GenericFutureListener<Future<Void>>>) (Class<?>) GenericFutureListener.class); |
Summary
writeAndFlush()fails on the server (e.g. direct memory OOM), Netty half-closes the channel viashutdownOutput()butchannelInactive()never fires, creating a zombie channel that accepts queries but never sends responsesf.isSuccess()check in thewriteAndFlushlistener inInstanceRequestHandler.sendResponse(). On failure: log the error, incrementNETTY_CONNECTION_SEND_RESPONSE_EXCEPTIONSmetric, and close the channel viactx.close()to trigger proper cleanupsendErrorResponse()delegates tosendResponse(), this single change covers all outbound writesTest plan
testWriteFailureClosesChannelunit test that captures the write listener, invokes it with a failed future, and verifiesctx.close()is calledtestCancelQuerytest passes (no regression)InstanceRequestHandlerTest: 2 tests, 0 failures