Skip to content

Conversation

@gortiz
Copy link
Contributor

@gortiz gortiz commented Feb 9, 2026

We have found some issues in the way SSE handles exceptions thrown/caught in the netty channel handlers.

A Netty handler should always either call the next handler in the chain or abort the execution with a response. But there were several cases where we just caught the exception, logged it and did nothing with the channel. If the exception occurs during a channelRead, the channel will remain idle until we receive a new message, which may never happen. The other, even more problematic case was in DataTransferHandler.exceptionCaught, which just logged without calling the next handler. As a result, the channel stays in an undefined state, which may leak byte buffers.

I also found another issue: Since we introduced PooledByteBufAllocatorWithLimits, brokers have been creating one allocator per server connection. This means the actual memory limit used was num_servers * max_memory_limit, which is obviously wrong. Also, since we recorded the stats at that time, we exported only the stats for the last server. Before the introduction of PooledByteBufAllocatorWithLimits, that was not an issue because even if we overrode the old stats, the value used was the same. I fixed that problem as well, although TBH after #16939 we shouldn't need to use PooledByteBufAllocatorWithLimits, as these limits can be set globally with netty JAVA_OPTs. The reason we introudced PooledByteBufAllocatorWithLimits is because we these JAVA_OPTS were ignored and we didn't know why (as proven in #16939, the reason why is because we needed to change the properties to add the shanding prefix we use)

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes Netty SSE channel handler behavior by ensuring exceptions are propagated down the pipeline (so channels are closed / resources released) and adjusts ByteBuf allocator usage to avoid per-connection allocators and incorrect memory limiting/metrics.

Changes:

  • Share a single PooledByteBufAllocatorWithLimits across broker-to-server channels and register gauges once per ServerChannels.
  • Propagate exceptions in several Netty handlers to avoid idle/broken channels and potential buffer leaks.
  • Remove disconnected channels from _allChannels to avoid leaking channel references.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java Creates and reuses a shared pooled allocator with limits; registers allocator metrics once; uses shared allocator for all server channels.
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java Adds documentation warning about allocator/metrics behavior when multiple servers run in one JVM.
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java Propagates exceptions after deserialization/handling failures to let the pipeline close and release resources.
pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java Removes inactive channels from the shared channel set to avoid leaking references.
pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java Propagates exceptions to ensure channel close/cleanup and avoid hanging queries.

@codecov-commenter
Copy link

codecov-commenter commented Feb 9, 2026

Codecov Report

❌ Patch coverage is 83.33333% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.22%. Comparing base (4ef8e3e) to head (278effc).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...e/pinot/core/transport/InstanceRequestHandler.java 0.00% 2 Missing ⚠️
.../apache/pinot/core/transport/DataTableHandler.java 50.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17667      +/-   ##
============================================
- Coverage     63.25%   63.22%   -0.03%     
  Complexity     1499     1499              
============================================
  Files          3174     3174              
  Lines        190323   190327       +4     
  Branches      29080    29081       +1     
============================================
- Hits         120381   120336      -45     
- Misses        60606    60648      +42     
- Partials       9336     9343       +7     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.20% <83.33%> (-0.01%) ⬇️
java-21 63.19% <83.33%> (-0.03%) ⬇️
temurin 63.22% <83.33%> (-0.03%) ⬇️
unittests 63.22% <83.33%> (-0.03%) ⬇️
unittests1 55.60% <83.33%> (-0.03%) ⬇️
unittests2 34.08% <61.11%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@gortiz gortiz requested review from yashmayya February 9, 2026 17:39
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.DATA_TABLE_DESERIALIZATION_EXCEPTIONS, 1);
// Propagate so the pipeline closes the channel and channelInactive runs (markServerDown), otherwise
// the query would hang waiting for a response that will never be delivered.
ctx.fireExceptionCaught(e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe isntead of closing the channel here we want to tell the router that the query has finished incorrectly. But we need to do something beyond dropping the message we weren't able to parse

_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESPONSE_FETCH_EXCEPTIONS, 1);
// Propagate so the pipeline can close the channel and release resources; otherwise the channel
// may be left in an inconsistent state and leak.
ctx.fireExceptionCaught(cause);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case we run out of memory upstream, this method is called. We need to ensure that memory is released:

ERROR [DataTableHandler] [nioEventLoopGroup-x-y] Caught exception while handling response from server: ZZZZZZ
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate X byte(s) of direct memory (used: Y, max: Z)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:880)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:809)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:718)
        at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:707)
        at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:224)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:142)
        at io.netty.buffer.PoolArena.reallocate(PoolArena.java:317)
        at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:123)
        at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:305)
        at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:280)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1103)
        at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:105)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:288)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1519)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1377)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1428)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:1583)

Here I decided to close the channel. I'm open to other alternatives, but we need to do something as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants