Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Dispatch batch messages according consumer permits #7266

Merged
merged 2 commits into from
May 17, 2021

Conversation

rdhabalia
Copy link
Contributor

Motivation

Right now, dispatching of batch messages is not predictable for a shared-subscription. Dispatcher doesn't consider number of batch messages in an entry which can cause higher number of message delivery than consumer receiver-queue.
For example:
If a topic has message-entries with 100 batched message in an entry.
If consumer has receiver queue=40 and has available-permits=20. then broker dispatches 20 entries with total 2000 (20*100) messages in it. because of that consumer's will high number of messages than expected and we can see available-permits in negative.

Modification

  • Find out average number of batch messages in an entry
  • Dispatch messages considering total batch-messages in an entry

Result

It will avoid dispatching higher number of messages than consumer's available-permit and will fix -ve available-permits.

@codelipenghui
Copy link
Contributor

@rdhabalia Looks like the same purpose as #6719

@rdhabalia
Copy link
Contributor Author

@codelipenghui
I saw #6719 but it doesn't solve fundamental problem of dispatching. #6719 tries to predict how many messages should be read by considering total-available permits and total batch messages. but it's still not controlling while dispatching messages.
for example: there are 10 consumers connected with 100 permits each and one batch message contains 100 messages. Now, broker reads 10 messages based on both factors but broker doesn't know number of batch-messages in each message so, it will dispatch all 10 messages to the consumer which has 100 permits which is the actual issue because broker dispatched 10*100=1K message to the consumer.

@codelipenghui
Copy link
Contributor

@rdhabalia Thanks, I now know this problem that you want to fix.

Comment on lines 68 to 102
/**
* Get number of batch messages into the entry.
* @return
*/
int getNumberOfBatchMessages();

/**
* Set number of batch messages into the entry.
* @param numberOfBatchMessages
*/
void setNumberOfBatchMessages(int numberOfBatchMessages);

/**
* Set data size of the entry.
* @param totalSizeInBytes
*/
void setTotalSizeInBytes(int totalSizeInBytes);

/**
* Set total number of chunked messages in entry.
* @param totalChunkedMessages
*/
void setTotalChunkedMessages(int totalChunkedMessages);

/**
* Get total data size of the entry.
* @return
*/
int getTotalSizeInBytes();

/**
* Get total number of chuncked messages.
* @return
*/
int getTotalChunkedMessages();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we better not add message related concepts to the entry. Maybe we can add a context carrier in the entry. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @codelipenghui , could you please provide a little more detail on what you mean by a "context carrier"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @codelipenghui
it is unfortunate to add these methods here.
we are going to use them (and the underlying fields) only on the Dispatcher.

it would be better (for instance) to have some EntryWithMetatadata object in Dispatcher that wraps the Entry and adds these fields

@Huanli-Meng
Copy link
Contributor

I. think it should open a doc issue to update docs for this PR. @jennifer88huang , could you help open an doc issue for this PR?

@jiazhai
Copy link
Member

jiazhai commented Jun 28, 2020

Thanks @rdhabalia. overall lgtm. Would you please help on @codelipenghui 's last comments?

@Jennifer88huang-zz Jennifer88huang-zz added the doc-required Your PR changes impact docs and you will update later. label Jun 28, 2020
@Jennifer88huang-zz
Copy link
Contributor

Jennifer88huang-zz commented Jun 28, 2020

I. think it should open a doc issue to update docs for this PR. @jennifer88huang , could you help open an doc issue for this PR?

@Huanli-Meng Thanks for informing me on this. We can add a label "doc-required" in the Labels list, I've created the issue.

@Anonymitaet
Copy link
Member

@rdhabalia thanks for your work.

  1. Any progress for this PR?
  2. Could you please help add docs accordingly? Thanks

@Anonymitaet
Copy link
Member

@rdhabalia thanks for your work.

  1. Any progress for this PR?
  2. Could you please help add docs accordingly? Thanks

@rdhabalia
Copy link
Contributor Author

I will rebase this soon.

@Anonymitaet
Copy link
Member

@rdhabalia could you please help add docs accordingly? Then I would like to help review, thanks.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eolivelli
Copy link
Contributor

@merlimat do you want to take a look as well?

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari
Copy link
Member

lhotari commented May 17, 2021

Closing and reopening this PR to get the fix for a flaky test.

@lhotari lhotari closed this May 17, 2021
@lhotari lhotari reopened this May 17, 2021
@codelipenghui codelipenghui merged commit a6aed55 into apache:master May 17, 2021
eolivelli pushed a commit to datastax/pulsar that referenced this pull request May 17, 2021
…pache#7266)

Right now, dispatching of batch messages is not predictable for a shared-subscription. Dispatcher doesn't consider number of batch messages in an entry which can cause higher number of message delivery than consumer receiver-queue.
For example:
If a topic has message-entries with 100 batched message in an entry.
If consumer has receiver queue=40 and has available-permits=20. then broker dispatches 20 entries with total 2000 (20*100) messages in it. because of that consumer's will high number of messages than expected and we can see available-permits in negative.

- Find out average number of batch messages in an entry
- Dispatch messages considering total batch-messages in an entry

It will avoid dispatching higher number of messages than consumer's available-permit and will fix -ve available-permits.

(cherry picked from commit a6aed55)
eolivelli pushed a commit to datastax/pulsar that referenced this pull request May 17, 2021
…pache#7266)

Right now, dispatching of batch messages is not predictable for a shared-subscription. Dispatcher doesn't consider number of batch messages in an entry which can cause higher number of message delivery than consumer receiver-queue.
For example:
If a topic has message-entries with 100 batched message in an entry.
If consumer has receiver queue=40 and has available-permits=20. then broker dispatches 20 entries with total 2000 (20*100) messages in it. because of that consumer's will high number of messages than expected and we can see available-permits in negative.

- Find out average number of batch messages in an entry
- Dispatch messages considering total batch-messages in an entry

It will avoid dispatching higher number of messages than consumer's available-permit and will fix -ve available-permits.

(cherry picked from commit a6aed55)
eolivelli added a commit to datastax/pulsar that referenced this pull request May 17, 2021
@devinbost
Copy link
Contributor

@rdhabalia Nice work!

lhotari pushed a commit to lhotari/pulsar that referenced this pull request May 18, 2021
…pache#7266)

Right now, dispatching of batch messages is not predictable for a shared-subscription. Dispatcher doesn't consider number of batch messages in an entry which can cause higher number of message delivery than consumer receiver-queue.
For example:
If a topic has message-entries with 100 batched message in an entry.
If consumer has receiver queue=40 and has available-permits=20. then broker dispatches 20 entries with total 2000 (20*100) messages in it. because of that consumer's will high number of messages than expected and we can see available-permits in negative.

- Find out average number of batch messages in an entry
- Dispatch messages considering total batch-messages in an entry

It will avoid dispatching higher number of messages than consumer's available-permit and will fix -ve available-permits.

(cherry picked from commit a6aed55)
lhotari pushed a commit to datastax/pulsar that referenced this pull request May 18, 2021
…pache#7266)

Right now, dispatching of batch messages is not predictable for a shared-subscription. Dispatcher doesn't consider number of batch messages in an entry which can cause higher number of message delivery than consumer receiver-queue.
For example:
If a topic has message-entries with 100 batched message in an entry.
If consumer has receiver queue=40 and has available-permits=20. then broker dispatches 20 entries with total 2000 (20*100) messages in it. because of that consumer's will high number of messages than expected and we can see available-permits in negative.

- Find out average number of batch messages in an entry
- Dispatch messages considering total batch-messages in an entry

It will avoid dispatching higher number of messages than consumer's available-permit and will fix -ve available-permits.

(cherry picked from commit a6aed55)
codelipenghui added a commit that referenced this pull request Jun 4, 2021
Fixes #10813
The issue is introduced by #7266, it only affects the master branch.

### Motivation

1. Add wrapperOffset to make sure get the correct batch size from the metadata
2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer
   it should be (messages / avgBatchSizePerMsg)

### Verifying this change

     * The test case is to simulate dispatch batches with different batch size to the consumer.
     * 1. The consumer has 1000 available permits
     * 2. The producer send batches with different batch size
     *
     * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Jun 7, 2021
Fixes apache#10813
The issue is introduced by apache#7266, it only affects the master branch.

### Motivation

1. Add wrapperOffset to make sure get the correct batch size from the metadata
2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer
   it should be (messages / avgBatchSizePerMsg)

### Verifying this change

     * The test case is to simulate dispatch batches with different batch size to the consumer.
     * 1. The consumer has 1000 available permits
     * 2. The producer send batches with different batch size
     *
     * According the batch average size dispatching, the broker will dispatch all the batches to the consumer

(cherry picked from commit 4f23767)
yangl pushed a commit to yangl/pulsar that referenced this pull request Jun 23, 2021
…pache#7266)

### Motivation
Right now, dispatching of batch messages is not predictable for a shared-subscription. Dispatcher doesn't consider number of batch messages in an entry which can cause higher number of message delivery than consumer receiver-queue.
For example:
If a topic has message-entries with 100 batched message in an entry.
If consumer has receiver queue=40 and has available-permits=20. then broker dispatches 20 entries with total 2000 (20*100) messages in it. because of that consumer's will high number of messages than expected and we can see available-permits in negative.

### Modification
- Find out average number of batch messages in an entry
- Dispatch messages considering total batch-messages in an entry

### Result
It will avoid dispatching higher number of messages than consumer's available-permit and will fix -ve available-permits.
yangl pushed a commit to yangl/pulsar that referenced this pull request Jun 23, 2021
Fixes apache#10813
The issue is introduced by apache#7266, it only affects the master branch.

### Motivation

1. Add wrapperOffset to make sure get the correct batch size from the metadata
2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer
   it should be (messages / avgBatchSizePerMsg)

### Verifying this change

     * The test case is to simulate dispatch batches with different batch size to the consumer.
     * 1. The consumer has 1000 available permits
     * 2. The producer send batches with different batch size
     *
     * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…pache#7266)

### Motivation
Right now, dispatching of batch messages is not predictable for a shared-subscription. Dispatcher doesn't consider number of batch messages in an entry which can cause higher number of message delivery than consumer receiver-queue.
For example:
If a topic has message-entries with 100 batched message in an entry.
If consumer has receiver queue=40 and has available-permits=20. then broker dispatches 20 entries with total 2000 (20*100) messages in it. because of that consumer's will high number of messages than expected and we can see available-permits in negative.

### Modification
- Find out average number of batch messages in an entry
- Dispatch messages considering total batch-messages in an entry

### Result
It will avoid dispatching higher number of messages than consumer's available-permit and will fix -ve available-permits.
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
Fixes apache#10813
The issue is introduced by apache#7266, it only affects the master branch.

### Motivation

1. Add wrapperOffset to make sure get the correct batch size from the metadata
2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer
   it should be (messages / avgBatchSizePerMsg)

### Verifying this change

     * The test case is to simulate dispatch batches with different batch size to the consumer.
     * 1. The consumer has 1000 available permits
     * 2. The producer send batches with different batch size
     *
     * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 8, 2022
… instead

### Motivation

apache#7266 introduced the `EntryWrapper`
to store the `Entry` object and the associated `MessageMetadata` if it
exists. However, the `getEntry` field of `EntryWrapper` is never used.
There is no need to allocate memory for `EntryWrapper`, even if it's
allocated from the recycler pool.

### Modifications

- Calculate the remaining messages without creating `EntryWrapper`
  object, just iterate over the parsed message metadata list.
- Pass an optional `MessageMetadata` array to `filterEntriesForConsumer`
  and add the JavaDocs for these two parameters.

After that, Remove unused `EntryWrapper` and `updateEntryWrapperWithMetadata`.
This PR uses functional programming style to make code more simple and clear.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as
`testBatchMessageDispatchingAccordingToPermits`.

### Documentation

Check the box below or label this PR directly.

Need to update docs?

- [ ] `doc-required`
(Your PR needs to update docs and you will update later)

- [x] `doc-not-needed`
(Please explain why)

- [ ] `doc`
(Your PR contains doc changes)

- [ ] `doc-complete`
(Docs have been already added)
BewareMyPower added a commit that referenced this pull request Jun 8, 2022
… instead (#15967)

### Motivation

#7266 introduced the `EntryWrapper`
to store the `Entry` object and the associated `MessageMetadata` if it
exists. However, the `getEntry` field of `EntryWrapper` is never used.
There is no need to allocate memory for `EntryWrapper`, even if it's
allocated from the recycler pool.

### Modifications

- Calculate the remaining messages without creating `EntryWrapper`
  object, just iterate over the parsed message metadata list.
- Pass an optional `MessageMetadata` array to `filterEntriesForConsumer`
  and add the JavaDocs for these two parameters.

After that, Remove unused `EntryWrapper` and `updateEntryWrapperWithMetadata`.
This PR uses functional programming style to make code more simple and clear.
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Sep 16, 2022
… instead (apache#15967)

apache#7266 introduced the `EntryWrapper`
to store the `Entry` object and the associated `MessageMetadata` if it
exists. However, the `getEntry` field of `EntryWrapper` is never used.
There is no need to allocate memory for `EntryWrapper`, even if it's
allocated from the recycler pool.

- Calculate the remaining messages without creating `EntryWrapper`
  object, just iterate over the parsed message metadata list.
- Pass an optional `MessageMetadata` array to `filterEntriesForConsumer`
  and add the JavaDocs for these two parameters.

After that, Remove unused `EntryWrapper` and `updateEntryWrapperWithMetadata`.
This PR uses functional programming style to make code more simple and clear.

(cherry picked from commit c33b12d)
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Sep 16, 2022
… instead (apache#15967)

apache#7266 introduced the `EntryWrapper`
to store the `Entry` object and the associated `MessageMetadata` if it
exists. However, the `getEntry` field of `EntryWrapper` is never used.
There is no need to allocate memory for `EntryWrapper`, even if it's
allocated from the recycler pool.

- Calculate the remaining messages without creating `EntryWrapper`
  object, just iterate over the parsed message metadata list.
- Pass an optional `MessageMetadata` array to `filterEntriesForConsumer`
  and add the JavaDocs for these two parameters.

After that, Remove unused `EntryWrapper` and `updateEntryWrapperWithMetadata`.
This PR uses functional programming style to make code more simple and clear.

(cherry picked from commit c33b12d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants