Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #80: Use receiveAsync() to aggregate messages into shared queue for partitioned consumer #106

Merged

Conversation

merlimat
Copy link
Contributor

Motivation

Context can be found in #80. This is a slightly different approach from #83, using receiveAsync() and pausing and resuming reads depending on the shared queue utilization.

@yahoocla
Copy link

CLA is valid!

@merlimat merlimat self-assigned this Nov 11, 2016
@merlimat merlimat added this to the 1.16 milestone Nov 11, 2016
@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Nov 11, 2016
Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
ForkJoinPool.commonPool().execute(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use iothreads here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, changed that

this.consumers = Lists.newArrayListWithCapacity(numPartitions);
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better if the threshold was maxReceiverQueueSeize - partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having the threshold too tight would make flip-flopping easy: receiving a single message would make pausing the consumer and then when the application process it, resume the consumer. Using half-the queue size is to reduce that contention, and at the same time avoids the queue to get empty and starve the application

@merlimat merlimat force-pushed the issue-80-blocking-partitioned-consumer branch from 50adcf4 to ff8318b Compare November 14, 2016 03:38
@merlimat merlimat merged commit 72cc677 into apache:master Nov 14, 2016
@merlimat merlimat deleted the issue-80-blocking-partitioned-consumer branch November 17, 2016 00:05
massakam pushed a commit to massakam/pulsar that referenced this pull request Aug 6, 2020
Co-authored-by: yfuruta <yfuruta@yahoo-corp.jp>
hrsakai pushed a commit to hrsakai/pulsar that referenced this pull request Dec 10, 2020
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
Remove the paths restrict for the CI.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants