Skip to content

Commit d28392d

Browse files
BewareMyPowersrinath-ctds
authored andcommitted
[fix][broker][branch-4.0] Fix failed testFinishTakeSnapshotWhenTopicLoading due to topic future cache conflicts (apache#24947)
(cherry picked from commit 6e3d5d8)
1 parent 1510ecf commit d28392d

File tree

1 file changed

+25
-26
lines changed

1 file changed

+25
-26
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,31 +1111,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11111111
// The topic level policies are not needed now, but the meaning of calling
11121112
// "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization.
11131113
getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY)
1114-
.thenCompose(optionalTopicPolicies -> {
1115-
if (topicName.isPartitioned()) {
1116-
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
1117-
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
1118-
.thenCompose((metadata) -> {
1119-
// Allow creating non-partitioned persistent topic that name includes
1120-
// `partition`
1121-
if (metadata.partitions == 0
1122-
|| topicName.getPartitionIndex() < metadata.partitions) {
1123-
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
1124-
loadOrCreatePersistentTopic(context));
1125-
} else {
1126-
final String errorMsg =
1127-
String.format("Illegal topic partition name %s with max allowed "
1128-
+ "%d partitions", topicName, metadata.partitions);
1129-
log.warn(errorMsg);
1130-
return FutureUtil.failedFuture(
1131-
new BrokerServiceException.NotAllowedException(errorMsg));
1132-
}
1133-
});
1134-
} else {
1135-
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
1136-
loadOrCreatePersistentTopic(context));
1137-
}
1138-
}).thenRun(() -> {
1114+
.thenRun(() -> {
11391115
final var inserted = new MutableBoolean(false);
11401116
final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> {
11411117
inserted.setTrue();
@@ -1672,9 +1648,32 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> c
16721648
* loading and puts them into queue once in-process topics are created.
16731649
*/
16741650
protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(TopicLoadingContext context) {
1651+
final var topicName = context.getTopicName();
16751652
final var topic = context.getTopicName().toString();
1653+
final CompletableFuture<Void> ownedFuture;
1654+
if (topicName.isPartitioned()) {
1655+
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
1656+
ownedFuture = fetchPartitionedTopicMetadataAsync(topicNameEntity)
1657+
.thenCompose((metadata) -> {
1658+
// Allow creating non-partitioned persistent topic that name includes
1659+
// `partition`
1660+
if (metadata.partitions == 0
1661+
|| topicName.getPartitionIndex() < metadata.partitions) {
1662+
return checkTopicNsOwnership(topic);
1663+
} else {
1664+
final String errorMsg =
1665+
String.format("Illegal topic partition name %s with max allowed "
1666+
+ "%d partitions", topicName, metadata.partitions);
1667+
log.warn(errorMsg);
1668+
return FutureUtil.failedFuture(
1669+
new BrokerServiceException.NotAllowedException(errorMsg));
1670+
}
1671+
});
1672+
} else {
1673+
ownedFuture = checkTopicNsOwnership(topic);
1674+
}
16761675
final var topicFuture = context.getTopicFuture();
1677-
checkTopicNsOwnership(topic)
1676+
ownedFuture
16781677
.thenRun(() -> {
16791678
final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();
16801679

0 commit comments

Comments
 (0)