Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix broker dispatch byte rate limiter. #11135

Merged

Conversation

congbobo184
Copy link
Contributor

Motivation

fix #11044
now dispatcher byte rate limit don't limit every cursor read. When cursor read always use ServiceConfiguration.dispatcherMaxReadSizeBytes to read. It will cause that dispatcher read entries by ServiceConfiguration.dispatcherMaxReadSizeBytes to read every time.

implement

when cursor read entries size need to calculate, the calculate result by dispatcher bytes limiter.

Verifying this change

Add the tests for it

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)

@@ -276,8 +278,10 @@ public synchronized void readMoreEntries() {
}
}

protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
// left pair is messagesToRead, right pair is bytesToRead
protected MutablePair<Integer, Integer> calculateToRead(int currentTotalAvailablePermits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need MutablePair here? we'd better do not allow to modify it outside?

@@ -349,7 +352,7 @@ protected void readMoreEntries(Consumer consumer) {
}
}

protected int calculateNumOfMessageToRead(Consumer consumer) {
protected MutablePair<Integer, Integer> calculateToRead(Consumer consumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move to AbstractBaseDispatcher? Looks many duplicated code with the PersistentDispatcherMultipleConsumers.

for (int i = 0; i < numProducedMessages; i++) {
producer.send(new byte[byteRate / 10]);
producer.send(new byte[99]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use asynchronous API? If we use synchronous API, this means for each message we need to ensure the publish latency is lower than 50ms, otherwise it should be a flaky test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And what's the reason change to new byte[99]

}

latch.await();
Assert.assertEquals(totalReceived.get(), numProducedMessages);
Awaitility.await().atLeast(3, TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need atLeast here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to check when the time less than 3 second consumer con't receive messages more than 6.

@codelipenghui codelipenghui added this to the 2.9.0 milestone Jun 29, 2021
@eolivelli
Copy link
Contributor

@lhotari PTAL

Copy link
Contributor

@315157973 315157973 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ImmutablePair ?

@@ -234,6 +238,19 @@ public void resetCloseFuture() {
// noop
}

protected static Pair<Integer, Integer> calculateToRead(int messagesToRead, int availablePermitsOnMsg,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a better name like computeReadLimits ?
Can parameters be ordered as: messagesToRead, availableMsgPermits, bytesToRead, availableBytePermits just to have a consistent grouping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion.

}

if (availablePermitsOnByte > 0) {
bytesToRead = availablePermitsOnByte;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatcherMaxReadSizeBytes was used to limit max number of bytes that can be read from bookies in one cursor read op. With this change if user configures a dispatch rate of say 100MB/s, it will mean that bytesToRead can be 100MB (since we do not do Math.min here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

}
Pair<Integer, Integer> calculateResult = calculateToRead(messagesToRead,
(int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
(int) topicRateLimiter.getAvailableDispatchRateLimitOnByte(), bytesToRead);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do type conversion for bytes from long to int? Could cause bugs when user sets dispatch rate higher than 2GB (unlikely but possible)

@@ -263,8 +264,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
this,
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytesToRead can potentially be -1 now (like at line 315 when new MutablePair<>(-1, -1); is returned) when dispatch limit is not set. This changes the default behaviour because earlier it was capped by serviceConfig.getDispatcherMaxReadSizeBytes()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in line 230, when bytesToRead or messageToRead is -1, it will not process read operation.

Copy link
Contributor Author

@congbobo184 congbobo184 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank for you review, I left some question and change the code. please review again. thanks.

@@ -234,6 +238,19 @@ public void resetCloseFuture() {
// noop
}

protected static Pair<Integer, Integer> calculateToRead(int messagesToRead, int availablePermitsOnMsg,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion.

}

if (availablePermitsOnByte > 0) {
bytesToRead = availablePermitsOnByte;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@@ -263,8 +264,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
this,
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in line 230, when bytesToRead or messageToRead is -1, it will not process read operation.

@congbobo184
Copy link
Contributor Author

Why not use ImmutablePair ?

changed

Comment on lines 316 to 318
Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
(int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
topicRateLimiter.getAvailableDispatchRateLimitOnByte(), bytesToRead);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have not changed the order of arguments here (after changing the order in the function definition). It should be

 computeReadLimits(
    messagesToRead,
    (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
    bytesToRead,
    topicRateLimiter.getAvailableDispatchRateLimitOnByte());

Please fix at other places too where this function is called.

Copy link
Contributor Author

@congbobo184 congbobo184 Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, could you please review again? if don't have any other comment, please approve this pr then we will merge it :)

@codelipenghui
Copy link
Contributor

@anvinjain Do you want to take a final look?

@codelipenghui codelipenghui merged commit ce6be12 into apache:master Jul 6, 2021
codelipenghui pushed a commit that referenced this pull request Jul 7, 2021
## Motivation
fix #11044
now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that  dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.

(cherry picked from commit ce6be12)
@codelipenghui codelipenghui added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Jul 7, 2021
congbobo184 pushed a commit that referenced this pull request Jul 9, 2021
The PR #11135 couldn't be cherry-picked to branch-2.7, because there are too many conflicts.

## Motivation

fix #11044
now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that  dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.

## implement

when cursor read entries size need to calculate, the calculate result by dispatcher bytes limiter.
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Jul 12, 2021
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
## Motivation
fix apache#11044
now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that  dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.
@congbobo184 congbobo184 deleted the congbobo184_fix_dispatcher_byte_limit branch March 24, 2022 04:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life cherry-picked/branch-2.8 Archived: 2.8 is end of life release/2.7.3 release/2.8.1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dispatch rate discrepancies when byte rates used
6 participants