Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share EventLoopGroup between Broker and BK client #2603

Merged
merged 4 commits into from
May 20, 2021

Conversation

merlimat
Copy link
Contributor

Motivation

Share the Netty EventLoopGroup thread pool between Pulsar broker and BookKeeper client.

Since broker and bk client are running in same process, we can reduce the number of needed threads by having them sharing the same IO thread pool.

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Sep 17, 2018
@merlimat merlimat added this to the 2.3.0-incubating milestone Sep 17, 2018
@merlimat merlimat self-assigned this Sep 17, 2018
Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM..

@@ -28,6 +30,6 @@
* Provider of a new BookKeeper client instance
*/
public interface BookKeeperClientFactory {
BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException;
BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this will help in avoiding context switching while publish and dispatching messages. however, do you see contention issue where high dispatching can impact publish?

@ivankelly
Copy link
Contributor

Is this just about reducing the number of threads? I don't see how it would reduce context switches.

@merlimat merlimat modified the milestones: 2.3.0, 2.4.0 Feb 14, 2019
@sijie sijie modified the milestones: 2.4.0, 2.5.0 Jun 9, 2019
@sijie sijie modified the milestones: 2.5.0, 2.6.0 Nov 25, 2019
@codelipenghui
Copy link
Contributor

@merlimat Please take a look at the comment. Is this PR need to onboard the 2.6.0 release?

@codelipenghui
Copy link
Contributor

move to 2.7.0

@codelipenghui codelipenghui modified the milestones: 2.6.0, 2.7.0 May 25, 2020
@codelipenghui
Copy link
Contributor

move to 2.8.0

@codelipenghui codelipenghui modified the milestones: 2.7.0, 2.8.0 Nov 4, 2020
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM

I left a comment, probably I misunderstood some part of the story, please check

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eolivelli eolivelli merged commit 2845d47 into apache:master May 20, 2021
@merlimat merlimat deleted the share-io-threads branch May 20, 2021 13:45
BewareMyPower pushed a commit to streamnative/kop that referenced this pull request May 26, 2021
### Motivation
When running KoP on Pulsar build by 2.8.0-rc-202105202205 or later, it will throw the following exception.
```
java.lang.NoSuchMethodError: org.apache.pulsar.broker.service.BrokerService.executor()Ljava/util/concurrent/ScheduledExecutorService;
        at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.<init>(KafkaTopicManager.java:87) ~[?:?]
        at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.<init>(KafkaRequestHandler.java:246) ~[?:?]
        at io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer.initChannel(KafkaChannelInitializer.java:92) ~[?:?]
        at io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer.initChannel(KafkaChannelInitializer.java:34) ~[?:?]
        at io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.ChannelInitializer.handlerAdded(ChannelInitializer.java:112) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:938) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.DefaultChannelPipeline.access$100(DefaultChannelPipeline.java:46) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute(DefaultChannelPipeline.java:1463) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers(DefaultChannelPipeline.java:1115) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded(DefaultChannelPipeline.java:650) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:514) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486) [io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [io.netty-netty-transport-native-epoll-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
```
It was cause by apache/pulsar#2603, which changed the brokerService.executor() interface from `ScheduledExecutorService` to `EventLoopGroup`. 

### Modification
upgrade Pulsar dependency to 2.8.0-rc-202105251229 and resolve the exception.
yangl pushed a commit to yangl/pulsar that referenced this pull request Jun 23, 2021
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants