Skip to content

Commit

Permalink
[fix][broker] avoid offload system topic (apache#22497)
Browse files Browse the repository at this point in the history
Co-authored-by: 道君 <daojun@apache.org>
(cherry picked from commit 3114199)
  • Loading branch information
mattisonchao authored and lhotari committed May 14, 2024
1 parent e5515c5 commit 20483f5
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,13 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
if (NamespaceService.isSystemServiceNamespace(namespace.toString())
|| SystemTopicNames.isSystemTopic(topicName)) {
/*
Avoid setting broker internal system topics using off-loader because some of them are the
preconditions of other topics. The slow replying log speed will cause a delay in all the topic
loading.(timeout)
*/
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
} else {
if (topicLevelOffloadPolicies != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
Expand Down Expand Up @@ -111,6 +114,8 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -1772,4 +1777,92 @@ public void testMetricsNonPersistentTopicLoadFails() throws Exception {
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}


@Test
public void testOffloadConfShouldNotAppliedForSystemTopic() throws PulsarAdminException {
final String driver = "aws-s3";
final String region = "test-region";
final String bucket = "test-bucket";
final String role = "test-role";
final String roleSessionName = "test-role-session-name";
final String credentialId = "test-credential-id";
final String credentialSecret = "test-credential-secret";
final String endPoint = "test-endpoint";
final Integer maxBlockSizeInBytes = 5;
final Integer readBufferSizeInBytes = 2;
final Long offloadThresholdInBytes = 10L;
final Long offloadThresholdInSeconds = 1000L;
final Long offloadDeletionLagInMillis = 5L;

final OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(
driver,
region,
bucket,
endPoint,
role,
roleSessionName,
credentialId,
credentialSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
offloadThresholdInSeconds,
offloadDeletionLagInMillis,
OffloadedReadPriority.TIERED_STORAGE_FIRST
);

var fakeOffloader = new LedgerOffloader() {
@Override
public String getOffloadDriverName() {
return driver;
}

@Override
public CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, Map<String, String> extraMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public OffloadPoliciesImpl getOffloadPolicies() {
return offloadPolicies;
}

@Override
public void close() {
}
};

final BrokerService brokerService = pulsar.getBrokerService();
final String namespace = "prop/" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);

// Inject the cache to avoid real load off-loader jar
final Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = pulsar.getLedgerOffloaderMap();
ledgerOffloaderMap.put(NamespaceName.get(namespace), fakeOffloader);

// (1) test normal topic
final String normalTopic = "persistent://" + namespace + "/" + UUID.randomUUID();
var managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(normalTopic)).join();

Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), fakeOffloader);

// (2) test system topic
for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) {
managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), NullLedgerOffloader.INSTANCE);
}
}
}

0 comments on commit 20483f5

Please sign in to comment.