8080import java .util .function .Consumer ;
8181import java .util .function .Predicate ;
8282import lombok .AccessLevel ;
83- import lombok .AllArgsConstructor ;
8483import lombok .Getter ;
8584import lombok .Setter ;
8685import org .apache .bookkeeper .common .util .OrderedExecutor ;
9695import org .apache .bookkeeper .mledger .util .Futures ;
9796import org .apache .commons .collections4 .MapUtils ;
9897import org .apache .commons .lang3 .StringUtils ;
98+ import org .apache .commons .lang3 .mutable .MutableBoolean ;
9999import org .apache .commons .lang3 .tuple .ImmutablePair ;
100100import org .apache .pulsar .bookie .rackawareness .IsolatedBookieEnsemblePlacementPolicy ;
101101import org .apache .pulsar .broker .PulsarServerException ;
195195import org .apache .pulsar .policies .data .loadbalancer .NamespaceBundleStats ;
196196import org .apache .pulsar .transaction .coordinator .TransactionMetadataStore ;
197197import org .jspecify .annotations .NonNull ;
198+ import org .jspecify .annotations .Nullable ;
198199import org .slf4j .Logger ;
199200import org .slf4j .LoggerFactory ;
200201
@@ -1042,7 +1043,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
10421043 }
10431044
10441045 public CompletableFuture <Optional <Topic >> getTopic (final String topic , boolean createIfMissing ,
1045- Map <String , String > properties ) {
1046+ @ Nullable Map <String , String > properties ) {
10461047 return getTopic (TopicName .get (topic ), createIfMissing , properties );
10471048 }
10481049
@@ -1062,7 +1063,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
10621063 * @return CompletableFuture with an Optional of the topic if found or created, otherwise empty.
10631064 */
10641065 public CompletableFuture <Optional <Topic >> getTopic (final TopicName topicName , boolean createIfMissing ,
1065- Map <String , String > properties ) {
1066+ @ Nullable Map <String , String > properties ) {
10661067 try {
10671068 // If topic future exists in the cache returned directly regardless of whether it fails or timeout.
10681069 CompletableFuture <Optional <Topic >> tp = topics .get (topicName .toString ());
@@ -1078,13 +1079,31 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
10781079 return FutureUtil .failedFuture (new NotAllowedException (
10791080 "Broker is unable to load persistent topic" ));
10801081 }
1081- return checkNonPartitionedTopicExists (topicName ).thenCompose (exists -> {
1082+ final CompletableFuture <Optional <Topic >> topicFuture = FutureUtil .createFutureWithTimeout (
1083+ Duration .ofSeconds (pulsar .getConfiguration ().getTopicLoadTimeoutSeconds ()), executor (),
1084+ () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION );
1085+ final var context = new TopicLoadingContext (topicName , createIfMissing , topicFuture );
1086+ if (properties != null ) {
1087+ context .setProperties (properties );
1088+ }
1089+ topicFuture .exceptionally (t -> {
1090+ final var now = System .nanoTime ();
1091+ if (FutureUtil .unwrapCompletionException (t ) instanceof TimeoutException ) {
1092+ log .warn ("Failed to load {} after {} ms" , topicName , context .latencyMs (now ));
1093+ } else {
1094+ log .warn ("Failed to load {} after {} ms" , topicName , context .latencyString (now ), t );
1095+ }
1096+ pulsarStats .recordTopicLoadFailed ();
1097+ return Optional .empty ();
1098+ });
1099+ checkNonPartitionedTopicExists (topicName ).thenAccept (exists -> {
10821100 if (!exists && !createIfMissing ) {
1083- return CompletableFuture .completedFuture (Optional .empty ());
1101+ topicFuture .complete (Optional .empty ());
1102+ return ;
10841103 }
10851104 // The topic level policies are not needed now, but the meaning of calling
10861105 // "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization.
1087- return getTopicPoliciesBypassSystemTopic (topicName , TopicPoliciesService .GetType .LOCAL_ONLY )
1106+ getTopicPoliciesBypassSystemTopic (topicName , TopicPoliciesService .GetType .LOCAL_ONLY )
10881107 .exceptionally (ex -> {
10891108 final Throwable rc = FutureUtil .unwrapCompletionException (ex );
10901109 final String errorInfo = String .format ("Topic creation encountered an exception by initialize"
@@ -1093,7 +1112,6 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
10931112 log .error (errorInfo , rc );
10941113 throw FutureUtil .wrapToCompletionException (new ServiceUnitNotReadyException (errorInfo ));
10951114 }).thenCompose (optionalTopicPolicies -> {
1096- final TopicPolicies topicPolicies = optionalTopicPolicies .orElse (null );
10971115 if (topicName .isPartitioned ()) {
10981116 final TopicName topicNameEntity = TopicName .get (topicName .getPartitionedTopicName ());
10991117 return fetchPartitionedTopicMetadataAsync (topicNameEntity )
@@ -1103,8 +1121,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11031121 if (metadata .partitions == 0
11041122 || topicName .getPartitionIndex () < metadata .partitions ) {
11051123 return topics .computeIfAbsent (topicName .toString (), (tpName ) ->
1106- loadOrCreatePersistentTopic (tpName ,
1107- createIfMissing , properties ));
1124+ loadOrCreatePersistentTopic (context ));
11081125 } else {
11091126 final String errorMsg =
11101127 String .format ("Illegal topic partition name %s with max allowed "
@@ -1116,10 +1133,31 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11161133 });
11171134 } else {
11181135 return topics .computeIfAbsent (topicName .toString (), (tpName ) ->
1119- loadOrCreatePersistentTopic (tpName , createIfMissing , properties ));
1136+ loadOrCreatePersistentTopic (context ));
1137+ }
1138+ }).thenRun (() -> {
1139+ final var inserted = new MutableBoolean (false );
1140+ final var cachedFuture = topics .computeIfAbsent (topicName .toString (), ___ -> {
1141+ inserted .setTrue ();
1142+ return loadOrCreatePersistentTopic (context );
1143+ });
1144+ if (inserted .isFalse ()) {
1145+ // This case should happen rarely when the same topic is loaded concurrently because we
1146+ // checked if the `topics` cache includes this topic before, so the latency is not the
1147+ // actual loading latency that should not be recorded in metrics.
1148+ log .info ("[{}] Finished loading from other concurrent loading task (latency: {})" ,
1149+ topicName , context .latencyString (System .nanoTime ()));
1150+ cachedFuture .whenComplete ((optTopic , e ) -> {
1151+ if (e == null ) {
1152+ topicFuture .complete (optTopic );
1153+ } else {
1154+ topicFuture .completeExceptionally (e );
1155+ }
1156+ });
11201157 }
11211158 });
11221159 });
1160+ return topicFuture ;
11231161 } else {
11241162 if (!pulsar .getConfiguration ().isEnableNonPersistentTopics ()) {
11251163 if (log .isDebugEnabled ()) {
@@ -1619,29 +1657,16 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> c
16191657 /**
16201658 * It creates a topic async and returns CompletableFuture. It also throttles down configured max-concurrent topic
16211659 * loading and puts them into queue once in-process topics are created.
1622- *
1623- * @param topic persistent-topic name
1624- * @return CompletableFuture<Topic>
1625- * @throws RuntimeException
16261660 */
1627- protected CompletableFuture <Optional <Topic >> loadOrCreatePersistentTopic (final String topic ,
1628- boolean createIfMissing , Map <String , String > properties ) {
1629- final CompletableFuture <Optional <Topic >> topicFuture = FutureUtil .createFutureWithTimeout (
1630- Duration .ofSeconds (pulsar .getConfiguration ().getTopicLoadTimeoutSeconds ()), executor (),
1631- () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION );
1632-
1633- topicFuture .exceptionally (t -> {
1634- pulsarStats .recordTopicLoadFailed ();
1635- return null ;
1636- });
1637-
1661+ protected CompletableFuture <Optional <Topic >> loadOrCreatePersistentTopic (TopicLoadingContext context ) {
1662+ final var topic = context .getTopicName ().toString ();
1663+ final var topicFuture = context .getTopicFuture ();
16381664 checkTopicNsOwnership (topic )
16391665 .thenRun (() -> {
16401666 final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore .get ();
16411667
16421668 if (topicLoadSemaphore .tryAcquire ()) {
1643- checkOwnershipAndCreatePersistentTopic (topic , createIfMissing , topicFuture ,
1644- properties );
1669+ checkOwnershipAndCreatePersistentTopic (context );
16451670 topicFuture .handle ((persistentTopic , ex ) -> {
16461671 // release permit and process pending topic
16471672 topicLoadSemaphore .release ();
@@ -1656,8 +1681,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
16561681 return null ;
16571682 });
16581683 } else {
1659- pendingTopicLoadingQueue .add (new TopicLoadingContext (topic ,
1660- createIfMissing , topicFuture , properties ));
1684+ pendingTopicLoadingQueue .add (context );
16611685 if (log .isDebugEnabled ()) {
16621686 log .debug ("topic-loading for {} added into pending queue" , topic );
16631687 }
@@ -1700,23 +1724,23 @@ protected CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(Topic
17001724 }
17011725 }
17021726
1703- private void checkOwnershipAndCreatePersistentTopic (final String topic , boolean createIfMissing ,
1704- CompletableFuture < Optional < Topic >> topicFuture ,
1705- Map < String , String > properties ) {
1706- TopicName topicName = TopicName . get ( topic );
1727+ private void checkOwnershipAndCreatePersistentTopic (TopicLoadingContext context ) {
1728+ TopicName topicName = context . getTopicName ();
1729+ final var topic = topicName . toString ();
1730+ final var topicFuture = context . getTopicFuture ( );
17071731 checkTopicNsOwnership (topic ).thenRun (() -> {
17081732 CompletableFuture <Map <String , String >> propertiesFuture ;
1709- if (properties == null ) {
1733+ if (context . getProperties () == null ) {
17101734 //Read properties from storage when loading topic.
17111735 propertiesFuture = fetchTopicPropertiesAsync (topicName );
17121736 } else {
1713- propertiesFuture = CompletableFuture .completedFuture (properties );
1737+ propertiesFuture = CompletableFuture .completedFuture (context . getProperties () );
17141738 }
1715- propertiesFuture .thenAccept (finalProperties ->
1716- //TODO add topicName in properties?
1717- createPersistentTopic0 ( topic , createIfMissing , topicFuture ,
1718- finalProperties )
1719- ).exceptionally (throwable -> {
1739+ propertiesFuture .thenAccept (finalProperties -> {
1740+ context . setProperties ( finalProperties );
1741+ //TODO add topicName in properties?
1742+ createPersistentTopic0 ( context );
1743+ } ).exceptionally (throwable -> {
17201744 log .warn ("[{}] Read topic property failed" , topic , throwable );
17211745 pulsar .getExecutor ().execute (() -> topics .remove (topic , topicFuture ));
17221746 topicFuture .completeExceptionally (throwable );
@@ -1730,11 +1754,11 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
17301754 }
17311755
17321756 @ VisibleForTesting
1733- public void createPersistentTopic0 (final String topic , boolean createIfMissing ,
1734- CompletableFuture < Optional < Topic >> topicFuture ,
1735- Map < String , String > properties ) {
1736- TopicName topicName = TopicName . get ( topic );
1737- final long topicCreateTimeMs = TimeUnit . NANOSECONDS . toMillis ( System . nanoTime () );
1757+ public void createPersistentTopic0 (TopicLoadingContext context ) {
1758+ TopicName topicName = context . getTopicName ();
1759+ final var topic = topicName . toString ();
1760+ final var topicFuture = context . getTopicFuture ( );
1761+ final var createIfMissing = context . isCreateIfMissing ( );
17381762
17391763 if (isTransactionInternalName (topicName )) {
17401764 String msg = String .format ("Can not create transaction system topic %s" , topic );
@@ -1768,7 +1792,9 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing,
17681792 new ManagedLedgerInterceptorImpl (interceptors , brokerEntryPayloadProcessors ));
17691793 }
17701794 managedLedgerConfig .setCreateIfMissing (createIfMissing );
1771- managedLedgerConfig .setProperties (properties );
1795+ if (context .getProperties () != null ) {
1796+ managedLedgerConfig .setProperties (context .getProperties ());
1797+ }
17721798 String shadowSource = managedLedgerConfig .getShadowSource ();
17731799 if (shadowSource != null ) {
17741800 managedLedgerConfig .setShadowSourceName (TopicName .get (shadowSource ).getPersistenceNamingEncoding ());
@@ -1813,10 +1839,11 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
18131839 return persistentTopic .checkDeduplicationStatus ();
18141840 })
18151841 .thenRun (() -> {
1816- log .info ("Created topic {} - dedup is {}" , topic ,
1817- persistentTopic .isDeduplicationEnabled () ? "enabled" : "disabled" );
1818- long topicLoadLatencyMs = TimeUnit .NANOSECONDS .toMillis (System .nanoTime ())
1819- - topicCreateTimeMs ;
1842+ long nowInNanos = System .nanoTime ();
1843+ long topicLoadLatencyMs = context .latencyMs (nowInNanos );
1844+ log .info ("Created topic {} - dedup is {} (latency: {})" , topic ,
1845+ persistentTopic .isDeduplicationEnabled () ? "enabled" : "disabled" ,
1846+ context .latencyString (nowInNanos ));
18201847 pulsarStats .recordTopicLoadTimeValue (topic , topicLoadLatencyMs );
18211848 if (!topicFuture .complete (Optional .of (persistentTopic ))) {
18221849 // Check create persistent topic timeout.
@@ -3211,15 +3238,13 @@ private void createPendingLoadTopic() {
32113238 return ;
32123239 }
32133240
3214- final String topic = pendingTopic .getTopic ();
3241+ pendingTopic .polledFromQueue ();
3242+ final String topic = pendingTopic .getTopicName ().toString ();
32153243 checkTopicNsOwnership (topic ).thenRun (() -> {
32163244 CompletableFuture <Optional <Topic >> pendingFuture = pendingTopic .getTopicFuture ();
32173245 final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore .get ();
32183246 final boolean acquiredPermit = topicLoadSemaphore .tryAcquire ();
3219- checkOwnershipAndCreatePersistentTopic (topic ,
3220- pendingTopic .isCreateIfMissing (),
3221- pendingFuture ,
3222- pendingTopic .getProperties ());
3247+ checkOwnershipAndCreatePersistentTopic (pendingTopic );
32233248 pendingFuture .handle ((persistentTopic , ex ) -> {
32243249 // release permit and process next pending topic
32253250 if (acquiredPermit ) {
@@ -3787,13 +3812,4 @@ private TopicFactory createPersistentTopicFactory() throws Exception {
37873812 public void setPulsarChannelInitializerFactory (PulsarChannelInitializer .Factory factory ) {
37883813 this .pulsarChannelInitFactory = factory ;
37893814 }
3790-
3791- @ AllArgsConstructor
3792- @ Getter
3793- private static class TopicLoadingContext {
3794- private final String topic ;
3795- private final boolean createIfMissing ;
3796- private final CompletableFuture <Optional <Topic >> topicFuture ;
3797- private final Map <String , String > properties ;
3798- }
37993815}
0 commit comments