Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-client] Process partitioned-topic messages on different listener-threads #10017

Merged
merged 2 commits into from
Mar 24, 2021

Conversation

rdhabalia
Copy link
Contributor

Motivation

Right now, Consumer of partitioned topic always uses only 1 listener thread to call listener on received message even though pulsar-client is created with multiple listener-theads. So, if user creates Pulsar-client with 10 listener threads and creates consumer for partitoned-topic with 10 partitions then consumer will use only 1 listener threads to serve all 10 partitions instead utilizing all 10 threads. This creates a bottleneck if one of the partition is slow, CPU spins on only one thread and there is no way to make overall process faster.
Therefore, consumer should use each partition's listener-thread to process message received by that partition. This helps:

  • slow partition processing will not impact other partition message processing
  • distribute loads across multiple listener-threads
  • still provide ordering guarantee per partition
  • utilize CPU and make overall process faster

Modification

allow listener-processing of each partition on its dedicated listener-thread instead on common partitioned-topic's listener thread.

@rdhabalia rdhabalia added this to the 2.8.0 milestone Mar 23, 2021
@rdhabalia rdhabalia self-assigned this Mar 23, 2021
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM

I left a couple of comments

@@ -32,11 +32,15 @@

private final Message<T> msg;
private final TopicMessageIdImpl messageId;
// consumer if this message is received by that consumer
ConsumerImpl receivedByconsumer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final ?

this.topicPartitionName = topicPartitionName;
this.receivedByconsumer = receivedByConsumer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add an assertion that receivedByConsumer is non null ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receivedByConsumer can be null if you want to create a message without consumer for some other purpose. so, it's intentional.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rdhabalia
Copy link
Contributor Author

/pulsarbot run-failure-checks

@eolivelli eolivelli merged commit 8568054 into apache:master Mar 24, 2021
@rdhabalia rdhabalia deleted the part_listener branch March 24, 2021 17:26
@rdhabalia rdhabalia restored the part_listener branch March 25, 2021 17:44
@michaeljmarshall
Copy link
Member

Cherry picking this to branch-2.7 because #11455 relies on part of it, and it simplifies the conflict resolution for the other commit.

michaeljmarshall pushed a commit that referenced this pull request Dec 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants