-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Configurable data source for offloaded messages #8717
Changes from 33 commits
2b654e8
7f8ebff
7a6942d
ec43abe
9cd6b87
a697370
61a9561
969d164
0ba829c
d70bd00
a1031a6
fd224e1
1cb61da
d3148f4
7ee26cb
1fe3a3d
ad5c187
5eeafb4
0b6a92e
369d48c
52db96e
38d071f
a118a20
d9f08b2
d0505bf
32658b6
a486080
4d22a47
9c99302
142e1ed
abf10ab
bbd54c8
7506000
280715c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,19 +18,20 @@ | |
*/ | ||
package org.apache.bookkeeper.mledger.impl; | ||
|
||
import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.anyLong; | ||
import static org.mockito.ArgumentMatchers.anyMap; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.atLeastOnce; | ||
import static org.mockito.Mockito.never; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
|
||
import static org.testng.Assert.assertEquals; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.collect.Lists; | ||
|
||
import io.netty.buffer.ByteBuf; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -39,7 +40,6 @@ | |
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.apache.bookkeeper.client.api.DigestType; | ||
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; | ||
import org.apache.bookkeeper.client.api.LedgerEntries; | ||
|
@@ -52,10 +52,12 @@ | |
import org.apache.bookkeeper.mledger.LedgerOffloader; | ||
import org.apache.bookkeeper.mledger.ManagedCursor; | ||
import org.apache.bookkeeper.mledger.ManagedLedgerConfig; | ||
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; | ||
import org.apache.bookkeeper.mledger.util.MockClock; | ||
import org.apache.bookkeeper.net.BookieId; | ||
import org.apache.bookkeeper.net.BookieSocketAddress; | ||
import org.apache.bookkeeper.test.MockedBookKeeperTestCase; | ||
import org.apache.pulsar.common.policies.data.OffloadPolicies; | ||
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority; | ||
import org.testng.Assert; | ||
import org.testng.annotations.Test; | ||
|
||
|
@@ -69,53 +71,140 @@ public void testOffloadRead() throws Exception { | |
config.setRetentionTime(10, TimeUnit.MINUTES); | ||
config.setRetentionSizeInMB(10); | ||
config.setLedgerOffloader(offloader); | ||
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); | ||
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config); | ||
|
||
for (int i = 0; i < 25; i++) { | ||
String content = "entry-" + i; | ||
ledger.addEntry(content.getBytes()); | ||
} | ||
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3); | ||
assertEquals(ledger.getLedgersInfoAsList().size(), 3); | ||
|
||
ledger.offloadPrefix(ledger.getLastConfirmedEntry()); | ||
|
||
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3); | ||
Assert.assertEquals(ledger.getLedgersInfoAsList().stream() | ||
.filter(e -> e.getOffloadContext().getComplete()).count(), 2); | ||
assertEquals(ledger.getLedgersInfoAsList().size(), 3); | ||
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete()); | ||
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete()); | ||
|
||
UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(), | ||
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb()); | ||
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb()); | ||
UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(), | ||
ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb()); | ||
ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb()); | ||
|
||
ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest); | ||
int i = 0; | ||
for (Entry e : cursor.readEntries(10)) { | ||
Assert.assertEquals(new String(e.getData()), "entry-" + i++); | ||
assertEquals(new String(e.getData()), "entry-" + i++); | ||
} | ||
verify(offloader, times(1)) | ||
.readOffloaded(anyLong(), any(), anyMap()); | ||
verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap()); | ||
|
||
for (Entry e : cursor.readEntries(10)) { | ||
Assert.assertEquals(new String(e.getData()), "entry-" + i++); | ||
assertEquals(new String(e.getData()), "entry-" + i++); | ||
} | ||
verify(offloader, times(2)) | ||
.readOffloaded(anyLong(), any(), anyMap()); | ||
.readOffloaded(anyLong(), any(), anyMap()); | ||
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap()); | ||
|
||
for (Entry e : cursor.readEntries(5)) { | ||
Assert.assertEquals(new String(e.getData()), "entry-" + i++); | ||
assertEquals(new String(e.getData()), "entry-" + i++); | ||
} | ||
verify(offloader, times(2)) | ||
.readOffloaded(anyLong(), any(), anyMap()); | ||
.readOffloaded(anyLong(), any(), anyMap()); | ||
} | ||
|
||
@Test | ||
public void testBookkeeperFirstOffloadRead() throws Exception { | ||
MockLedgerOffloader offloader = spy(new MockLedgerOffloader()); | ||
MockClock clock = new MockClock(); | ||
offloader.getOffloadPolicies() | ||
.setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST); | ||
//delete after 5 minutes | ||
offloader.getOffloadPolicies() | ||
.setManagedLedgerOffloadDeletionLagInMillis(300000L); | ||
ManagedLedgerConfig config = new ManagedLedgerConfig(); | ||
config.setMaxEntriesPerLedger(10); | ||
config.setMinimumRolloverTime(0, TimeUnit.SECONDS); | ||
config.setRetentionTime(10, TimeUnit.MINUTES); | ||
config.setRetentionSizeInMB(10); | ||
config.setLedgerOffloader(offloader); | ||
config.setClock(clock); | ||
|
||
|
||
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_bookkeeper_first_test_ledger", config); | ||
|
||
for (int i = 0; i < 25; i++) { | ||
String content = "entry-" + i; | ||
ledger.addEntry(content.getBytes()); | ||
} | ||
assertEquals(ledger.getLedgersInfoAsList().size(), 3); | ||
|
||
ledger.offloadPrefix(ledger.getLastConfirmedEntry()); | ||
|
||
assertEquals(ledger.getLedgersInfoAsList().size(), 3); | ||
assertEquals(ledger.getLedgersInfoAsList().stream() | ||
.filter(e -> e.getOffloadContext().getComplete()).count(), 2); | ||
|
||
LedgerInfo firstLedger = ledger.getLedgersInfoAsList().get(0); | ||
Assert.assertTrue(firstLedger.getOffloadContext().getComplete()); | ||
LedgerInfo secondLedger; | ||
secondLedger = ledger.getLedgersInfoAsList().get(1); | ||
Assert.assertTrue(secondLedger.getOffloadContext().getComplete()); | ||
|
||
UUID firstLedgerUUID = new UUID(firstLedger.getOffloadContext().getUidMsb(), | ||
firstLedger.getOffloadContext().getUidLsb()); | ||
UUID secondLedgerUUID = new UUID(secondLedger.getOffloadContext().getUidMsb(), | ||
secondLedger.getOffloadContext().getUidLsb()); | ||
|
||
ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest); | ||
int i = 0; | ||
for (Entry e : cursor.readEntries(10)) { | ||
Assert.assertEquals(new String(e.getData()), "entry-" + i++); | ||
} | ||
// For offloaded first and not deleted ledgers, they should be read from bookkeeper. | ||
verify(offloader, never()) | ||
.readOffloaded(anyLong(), any(), anyMap()); | ||
|
||
// Delete offladed message from bookkeeper | ||
assertEventuallyTrue(() -> bkc.getLedgers().contains(firstLedger.getLedgerId())); | ||
assertEventuallyTrue(() -> bkc.getLedgers().contains(secondLedger.getLedgerId())); | ||
clock.advance(6, TimeUnit.MINUTES); | ||
CompletableFuture<Void> promise = new CompletableFuture<>(); | ||
ledger.internalTrimConsumedLedgers(promise); | ||
promise.join(); | ||
|
||
// assert bk ledger is deleted | ||
assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedger.getLedgerId())); | ||
assertEventuallyTrue(() -> !bkc.getLedgers().contains(secondLedger.getLedgerId())); | ||
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted()); | ||
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted()); | ||
|
||
for (Entry e : cursor.readEntries(10)) { | ||
Assert.assertEquals(new String(e.getData()), "entry-" + i++); | ||
} | ||
|
||
// Ledgers deleted from bookkeeper, now should read from offloader | ||
verify(offloader, atLeastOnce()) | ||
.readOffloaded(anyLong(), any(), anyMap()); | ||
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap()); | ||
|
||
} | ||
|
||
|
||
static class MockLedgerOffloader implements LedgerOffloader { | ||
ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>(); | ||
|
||
|
||
OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "", | ||
null, null, | ||
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, | ||
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, | ||
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, | ||
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, | ||
OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY); | ||
|
||
|
||
@Override | ||
public String getOffloadDriverName() { | ||
return "mock"; | ||
|
@@ -150,7 +239,7 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, | |
|
||
@Override | ||
public OffloadPolicies getOffloadPolicies() { | ||
return null; | ||
return offloadPolicies; | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,22 +23,19 @@ | |
import static org.testng.Assert.assertNotEquals; | ||
import static org.testng.Assert.assertTrue; | ||
import static org.testng.Assert.fail; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the blank line may not delete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The checkstyle https://github.com/apache/pulsar/blob/cc64889abe94d47f048e2f8e8fb10d6c37e695ec/buildtools/src/main/resources/pulsar/checkstyle.xml need us to leave no blank line between imports, though it's not enabled in all modules yet, but it will. So I think it's better to remove blank lines here. |
||
import com.google.common.collect.ImmutableSet; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the blank line may not delete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
import java.lang.reflect.Field; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.concurrent.CompletionException; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionException; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.BooleanSupplier; | ||
import java.util.stream.Collectors; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same with upper. |
||
import org.apache.bookkeeper.client.BKException; | ||
import org.apache.bookkeeper.client.api.ReadHandle; | ||
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; | ||
|
@@ -50,11 +47,9 @@ | |
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; | ||
import org.apache.bookkeeper.test.MockedBookKeeperTestCase; | ||
import org.apache.commons.lang3.tuple.Pair; | ||
|
||
import org.apache.pulsar.common.policies.data.OffloadPolicies; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import org.testng.annotations.Test; | ||
|
||
public class OffloadPrefixTest extends MockedBookKeeperTestCase { | ||
|
@@ -995,7 +990,8 @@ Set<Long> deletedOffloads() { | |
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, | ||
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, | ||
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, | ||
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS); | ||
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, | ||
OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY); | ||
|
||
@Override | ||
public String getOffloadDriverName() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,9 +21,7 @@ | |
|
||
import com.google.common.collect.Lists; | ||
import com.google.common.collect.Sets; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may not delete the new line. |
||
import io.netty.util.internal.PlatformDependent; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
|
@@ -35,14 +33,15 @@ | |
import lombok.Setter; | ||
import org.apache.bookkeeper.client.api.DigestType; | ||
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; | ||
import org.apache.pulsar.common.nar.NarClassLoader; | ||
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; | ||
import org.apache.pulsar.common.policies.data.TopicType; | ||
import org.apache.pulsar.common.protocol.Commands; | ||
import org.apache.pulsar.common.configuration.Category; | ||
import org.apache.pulsar.common.configuration.FieldContext; | ||
import org.apache.pulsar.common.configuration.PulsarConfiguration; | ||
import org.apache.pulsar.common.nar.NarClassLoader; | ||
import org.apache.pulsar.common.policies.data.BacklogQuota; | ||
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; | ||
import org.apache.pulsar.common.policies.data.OffloadPolicies; | ||
import org.apache.pulsar.common.policies.data.TopicType; | ||
import org.apache.pulsar.common.protocol.Commands; | ||
import org.apache.pulsar.common.sasl.SaslConstants; | ||
|
||
/** | ||
|
@@ -1459,17 +1458,22 @@ public class ServiceConfiguration implements PulsarConfiguration { | |
+ "Of course, this may degrade consumption throughput. Default is 10ms.") | ||
private int managedLedgerNewEntriesCheckDelayInMillis = 10; | ||
|
||
@FieldContext(category = CATEGORY_STORAGE_ML, | ||
doc = "Read priority when ledgers exists in both bookkeeper and the second layer storage.") | ||
private String managedLedgerDataReadPriority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST | ||
.getValue(); | ||
|
||
/*** --- Load balancer --- ****/ | ||
@FieldContext( | ||
category = CATEGORY_LOAD_BALANCER, | ||
doc = "Enable load balancer" | ||
category = CATEGORY_LOAD_BALANCER, | ||
doc = "Enable load balancer" | ||
) | ||
private boolean loadBalancerEnabled = true; | ||
@Deprecated | ||
@FieldContext( | ||
category = CATEGORY_LOAD_BALANCER, | ||
deprecated = true, | ||
doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)" | ||
category = CATEGORY_LOAD_BALANCER, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no blank |
||
deprecated = true, | ||
doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)" | ||
) | ||
private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
OffloadPolicies
you get fromconfig
may not be updated as namespace or topic policy update.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, I will try to figure it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hangc0276
I think
org.apache.pulsar.broker.admin.v2.PersistentTopics#setOffloadPolicies
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase#internalSetOffloadPolicies
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase#internalUpdateOffloadPolicies
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl#setConfig
and
org.apache.pulsar.zookeeper.ZooKeeperCache#process(org.apache.zookeeper.WatchedEvent, org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater<T>)
org.apache.pulsar.zookeeper.ZooKeeperChildrenCache#reloadCache
org.apache.pulsar.broker.service.BrokerService#onUpdate
org.apache.pulsar.broker.service.persistent.PersistentTopic#onPoliciesUpdate
org.apache.pulsar.broker.service.persistent.PersistentTopic#checkPersistencePolicies
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl#setConfig
are assurable chains to keep config up to date