Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] avoid backpressure by skipping dispatching if consumer channel is not writable #6740

Merged
merged 2 commits into from
Apr 15, 2020

Conversation

rdhabalia
Copy link
Contributor

@rdhabalia rdhabalia commented Apr 14, 2020

Motivation

Broker can crash with OutOfMemory when it has higher dispatch rate with large size messages. High message rate out saturates network and broker will try to write on the channel which is not writable which buffers the message and eventually broker sees OOM and shutdown with below error:

java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]

Modification

In order to reduce backpressure, broker should slow down dispatching if consumer cnx-channel is writable. Broker does it for replicator and Exclusive consumer but not doing for shared consumer. So, add similar check for shared subscription to avoid OOM for high dispatch rate. It might be helpful for #5896 as well.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. The Key_Shared consumer also needs this change.

@rdhabalia
Copy link
Contributor Author

The Key_Shared consumer also needs this change.

@codelipenghui added check for Key_Shared as well.

@sijie sijie added this to the 2.6.0 milestone Apr 15, 2020
@sijie sijie merged commit 1fd1b2b into apache:master Apr 15, 2020
@rdhabalia rdhabalia deleted the cons_writable branch April 17, 2020 19:39
addisonj pushed a commit to instructure/pulsar that referenced this pull request May 7, 2020
…r channel is not writable (apache#6740)

### Motivation
Recently we are seeing broker is crashing with OutOfMemory when it has higher dispatch rate with large size messages. High message rate out saturates network and broker will try to write on the channel which is not writable which buffers the message and eventually broker sees OOM and shutdown with below error:
```
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
```

### Modification
In order to reduce backpressure, broker should slow down dispatching if consumer cnx-channel is writable. Broker does it for replicator and Exclusive consumer but not doing for shared consumer. So, add similar check for shared subscription to avoid OOM for high dispatch rate. It might be helpful for apache#5896 as well.
jiazhai pushed a commit that referenced this pull request May 8, 2020
…r channel is not writable (#6740)

### Motivation
Recently we are seeing broker is crashing with OutOfMemory when it has higher dispatch rate with large size messages. High message rate out saturates network and broker will try to write on the channel which is not writable which buffers the message and eventually broker sees OOM and shutdown with below error:
```
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
```

### Modification
In order to reduce backpressure, broker should slow down dispatching if consumer cnx-channel is writable. Broker does it for replicator and Exclusive consumer but not doing for shared consumer. So, add similar check for shared subscription to avoid OOM for high dispatch rate. It might be helpful for #5896 as well.
(cherry picked from commit 1fd1b2b)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
…r channel is not writable (apache#6740)

### Motivation
Recently we are seeing broker is crashing with OutOfMemory when it has higher dispatch rate with large size messages. High message rate out saturates network and broker will try to write on the channel which is not writable which buffers the message and eventually broker sees OOM and shutdown with below error:
```
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
```

### Modification
In order to reduce backpressure, broker should slow down dispatching if consumer cnx-channel is writable. Broker does it for replicator and Exclusive consumer but not doing for shared consumer. So, add similar check for shared subscription to avoid OOM for high dispatch rate. It might be helpful for apache#5896 as well.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants