Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Cache unack-messageId into openRangeSet #3819

Merged
merged 11 commits into from
May 21, 2019

Conversation

rdhabalia
Copy link
Contributor

@rdhabalia rdhabalia commented Mar 14, 2019

Motivation

This PR is on top of #3818. With this PR changes, ML can use OpenRangeSet to cache unack-messageIds. It also provides option managedLedgerUnackedRangesOpenCacheSetEnabled to switch back to existing RangeSet so, user can flip back to original behavior to avoid any rollback. We will remove managedLedgerUnackedRangesOpenCacheSetEnabled option in future release once it will not be experimental.

Result

Broker will not face gc-pauses in case client generates large number of unack messages into broker.

I performed perf and functional test on the changes:

  1. with existing guava Range Data-structure and large unack msg usecase:
  • we can see high gc-pause
  • and over 9M PositionImpl objects
  • CPU usage around 40%
    image
  1. With OpenRangeSet data-structure and large unack msg usecase:
  • we can see very low gc-pause
  • and around 50K PositionImpl objects into memory
  • and CPU usage increases to 70%
    image

@rdhabalia rdhabalia added this to the 2.4.0 milestone Mar 14, 2019
@rdhabalia rdhabalia self-assigned this Mar 14, 2019
@rdhabalia
Copy link
Contributor Author

rerun java8 tests

4 similar comments
@rdhabalia
Copy link
Contributor Author

rerun java8 tests

@rdhabalia
Copy link
Contributor Author

rerun java8 tests

@rdhabalia
Copy link
Contributor Author

rerun java8 tests

@rdhabalia
Copy link
Contributor Author

rerun java8 tests

@codelipenghui
Copy link
Contributor

run java8 tests

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.assertj.core.util.Lists;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be java.utils

@@ -39,6 +39,7 @@
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import shouldn't be needed here

private final long ledgerId;
private final long entryId;
protected long ledgerId;
protected long entryId;

public static PositionImpl earliest = new PositionImpl(-1, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also make earliest/latest as final here? (Just happened to realize they were not already...)

@@ -1252,34 +1276,41 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}

long getNumIndividualDeletedEntriesToSkip(long numEntries) {
long totalEntriesToSkip = 0;
long deletedMessages = 0;
AtomicLong totalEntriesToSkip = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the 2 AtomicLong and 2 AtomicReference instances, we could use thread locals here, since it's just to be able to update them from within the lambda.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

@@ -188,6 +200,9 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
this.config = config;
this.ledger = ledger;
this.name = cursorName;
this.individualDeletedMessages = config.isUnackedRangesOpenCacheSetEnabled()
? new ConcurrentOpenLongPairRangeSet<PositionImpl>(4096, positionRangeConverter)
: new LongPairRangeSet.DefaultRangeSet<>(positionRangeConverter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are both cases being tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes..I have tested both.. however, let me data-providers to test both scenarios in existing test cases.

@@ -62,13 +65,13 @@ public void testGetManagedLedgerInfoWithClose() throws Exception {
assertEquals(cursorInfo.markDelete.ledgerId, 3);
assertEquals(cursorInfo.markDelete.entryId, -1);

assertEquals(cursorInfo.individualDeletedMessages.size(), 1);
assertEquals(cursorInfo.individualDeletedMessages.size(), 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this due to a change in behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ConcurrentOpenLongPairRangeSet has separate BitSet for every ledger and while creating ranges it doesn't merge range from different ledgers (doesn't merge last entry of previous ledger with first entry of next ledger) so, instead one combined ledger-range it returns multiple ranges when ledgers are different.

@rdhabalia
Copy link
Contributor Author

rerun integration tests
rerun java8 tests

@rdhabalia
Copy link
Contributor Author

@merlimat addressed your comments, added data-provider in existing ManagedCursor tests to test both usecases and also added separate test which test ack-holes and markDelete with both usecases.

@rdhabalia
Copy link
Contributor Author

rerun cpp tests

1 similar comment
@rdhabalia
Copy link
Contributor Author

rerun cpp tests

@rdhabalia rdhabalia merged commit 5556f1b into apache:master May 21, 2019
sijie pushed a commit that referenced this pull request Jan 18, 2020
### Motivation
Some parameters are added in the `broker.conf` and `standalone.conf` files. However, those parameters are not updated in the docs.
See the following PRs for details: #4150, #4066, #4197, #3819, #4261, #4273, #4320.

### Modifications
Add those parameter info, and sync docs with the code.

Does not update the description quite much, there are two reasons for this:
1. Keep doc content consistent with code. We need to update the description for those parameters in the code first, and then sync them in docs.
2. Will adopt a generator to generate those content automatically in the near future.
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
### Motivation
Some parameters are added in the `broker.conf` and `standalone.conf` files. However, those parameters are not updated in the docs.
See the following PRs for details: apache#4150, apache#4066, apache#4197, apache#3819, apache#4261, apache#4273, apache#4320.

### Modifications
Add those parameter info, and sync docs with the code.

Does not update the description quite much, there are two reasons for this:
1. Keep doc content consistent with code. We need to update the description for those parameters in the code first, and then sync them in docs.
2. Will adopt a generator to generate those content automatically in the near future.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants