Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] The topic might reference a closed ledger #22860

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
return new BrokerService(pulsar, ioEventLoopGroup);
}

@VisibleForTesting
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
this.transactionBufferProvider = transactionBufferProvider;
}

private CompactionServiceFactory loadCompactionServiceFactory() {
String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
var compactionServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,38 +1001,38 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
return getTopic(TopicName.get(topic), createIfMissing, properties);
}

/**
* Retrieves or creates a topic based on the specified parameters.
* 0. If disable PersistentTopics or NonPersistentTopics, it will return a failed future with NotAllowedException.
* 1. If topic future exists in the cache returned directly regardless of whether it fails or timeout.
* 2. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}.
* 3. If the topic metadata not exists, and {@code createIfMissing} is false,
* returns an empty Optional in a CompletableFuture. And this empty future not be added to the map.
* 4. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture.
* Any exceptions will remove the topicFuture from the map.
*
* @param topicName The name of the topic, potentially including partition information.
* @param createIfMissing If true, creates the topic if it does not exist.
* @param properties Topic configuration properties used during creation.
* @return CompletableFuture with an Optional of the topic if found or created, otherwise empty.
*/
public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, boolean createIfMissing,
Map<String, String> properties) {
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
topics.remove(topicName.toString(), topicFuture);
} else {
// a non-existing topic in the cache shouldn't prevent creating a topic
if (createIfMissing) {
if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
return topicFuture;
} else {
return topicFuture.thenCompose(value -> {
if (!value.isPresent()) {
// retry and create topic
return getTopic(topicName, createIfMissing, properties);
} else {
// in-progress future completed successfully
return CompletableFuture.completedFuture(value);
}
});
}
} else {
return topicFuture;
}
}
// If topic future exists in the cache returned directly regardless of whether it fails or timeout.
CompletableFuture<Optional<Topic>> tp = topics.get(topicName.toString());
if (tp != null) {
return tp;
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topicName);
}
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
.thenCompose(exists -> {
if (!exists && !createIfMissing) {
Expand All @@ -1047,44 +1047,48 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies));
} else {
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
shibd marked this conversation as resolved.
Show resolved Hide resolved
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
});
}
});
} else {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies));
}
});
});
} else {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topicName);
}
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
if (!topics.containsKey(topicName.toString())) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE);
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
}
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
topicEventsDispatcher
.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);

Expand All @@ -1095,11 +1099,13 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
topicEventsDispatcher
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
return res;
}
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
});
}
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);

CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name);
Expand All @@ -1109,11 +1115,15 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
topicEventsDispatcher
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
return res;
} else {
});
} else {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture == null) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
topicFuture = CompletableFuture.completedFuture(Optional.empty());
}
});
return topicFuture;
}
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topicName, e);
Expand Down Expand Up @@ -1252,15 +1262,9 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
return FutureUtil.failedFuture(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic;
try {
Expand All @@ -1283,7 +1287,6 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
}).exceptionally(ex -> {
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex);
});
return null;
Expand Down Expand Up @@ -1534,14 +1537,6 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
}
topicFuture.completeExceptionally(new NotAllowedException(
"Broker is unable to load persistent topic"));
return topicFuture;
}

checkTopicNsOwnership(topic)
.thenRun(() -> {
Expand All @@ -1556,6 +1551,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
// do not recreate topic if topic is already migrated and deleted by broker
// so, avoid creating a new topic if migration is already started
if (ex != null && (ex.getCause() instanceof TopicMigratedException)) {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex.getCause());
return null;
}
Expand All @@ -1570,6 +1566,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
}
}
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex.getCause());
return null;
});
Expand Down Expand Up @@ -1744,6 +1741,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
Expand All @@ -1760,6 +1758,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " Removing topic from topics list {}, {}", topic, ex);
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, closeEx) -> {
topics.remove(topic, topicFuture);
if (closeEx != null) {
log.warn("[{}] Get an error when closing topic.",
topic, closeEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -1434,13 +1432,6 @@ public void testCleanupTopic() throws Exception {
// Ok
}

final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
// timeout topic future should be removed from cache
shibd marked this conversation as resolved.
Show resolved Hide resolved
retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
1000);

assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
Expand All @@ -1452,6 +1443,7 @@ public void testCleanupTopic() throws Exception {
ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
mlFuture.complete(ml);

// Re-create topic will success.
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).subscribeAsync()
.get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
Expand Down
Loading
Loading