Skip to content

Commit

Permalink
[fix][broker] avoid offload system topic (apache#22497)
Browse files Browse the repository at this point in the history
Co-authored-by: 道君 <daojun@apache.org>
  • Loading branch information
mattisonchao and dao-jun committed May 8, 2024
1 parent 80d4675 commit 3114199
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1963,7 +1963,13 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
if (NamespaceService.isSystemServiceNamespace(namespace.toString())
|| SystemTopicNames.isSystemTopic(topicName)) {
/*
Avoid setting broker internal system topics using off-loader because some of them are the
preconditions of other topics. The slow replying log speed will cause a delay in all the topic
loading.(timeout)
*/
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
} else {
if (topicLevelOffloadPolicies != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -112,6 +115,9 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -1745,4 +1751,92 @@ public void testUnsubscribeNonDurableSub() throws Exception {
fail("Unsubscribe failed");
}
}


@Test
public void testOffloadConfShouldNotAppliedForSystemTopic() throws PulsarAdminException {
final String driver = "aws-s3";
final String region = "test-region";
final String bucket = "test-bucket";
final String role = "test-role";
final String roleSessionName = "test-role-session-name";
final String credentialId = "test-credential-id";
final String credentialSecret = "test-credential-secret";
final String endPoint = "test-endpoint";
final Integer maxBlockSizeInBytes = 5;
final Integer readBufferSizeInBytes = 2;
final Long offloadThresholdInBytes = 10L;
final Long offloadThresholdInSeconds = 1000L;
final Long offloadDeletionLagInMillis = 5L;

final OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(
driver,
region,
bucket,
endPoint,
role,
roleSessionName,
credentialId,
credentialSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
offloadThresholdInSeconds,
offloadDeletionLagInMillis,
OffloadedReadPriority.TIERED_STORAGE_FIRST
);

var fakeOffloader = new LedgerOffloader() {
@Override
public String getOffloadDriverName() {
return driver;
}

@Override
public CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, Map<String, String> extraMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public OffloadPolicies getOffloadPolicies() {
return offloadPolicies;
}

@Override
public void close() {
}
};

final BrokerService brokerService = pulsar.getBrokerService();
final String namespace = "prop/" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);

// Inject the cache to avoid real load off-loader jar
final Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = pulsar.getLedgerOffloaderMap();
ledgerOffloaderMap.put(NamespaceName.get(namespace), fakeOffloader);

// (1) test normal topic
final String normalTopic = "persistent://" + namespace + "/" + UUID.randomUUID();
var managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(normalTopic)).join();

Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), fakeOffloader);

// (2) test system topic
for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) {
managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), NullLedgerOffloader.INSTANCE);
}
}
}

0 comments on commit 3114199

Please sign in to comment.