1818 */
1919package org .apache .pulsar .broker .service ;
2020
21+ import com .google .common .util .concurrent .MoreExecutors ;
22+ import io .netty .util .concurrent .DefaultThreadFactory ;
2123import java .time .Duration ;
2224import java .util .ArrayList ;
23- import java .util .Collections ;
2425import java .util .HashSet ;
2526import java .util .List ;
2627import java .util .Objects ;
2728import java .util .Optional ;
2829import java .util .Set ;
2930import java .util .UUID ;
3031import java .util .concurrent .CompletableFuture ;
32+ import java .util .concurrent .Executors ;
33+ import java .util .concurrent .ScheduledExecutorService ;
34+ import java .util .concurrent .TimeUnit ;
3135import java .util .concurrent .TimeoutException ;
3236import lombok .extern .slf4j .Slf4j ;
3337import org .apache .bookkeeper .mledger .ManagedLedgerException ;
5155 * The HealthChecker class provides functionality to monitor and verify the health of a Pulsar broker.
5256 * It performs health checks by creating test topics, producing and consuming messages to verify broker functionality.
5357 * This class implements AutoCloseable to ensure proper cleanup of resources when the broker is shut down.
58+ * Tests are in AdminApiHealthCheckTest class.
5459 */
5560@ Slf4j
5661public class HealthChecker implements AutoCloseable {
@@ -102,6 +107,11 @@ public class HealthChecker implements AutoCloseable{
102107 */
103108 private final Set <CompletableFuture <Void >> pendingFutures = new HashSet <>();
104109
110+ /**
111+ * Executor for health check operations.
112+ */
113+ private final ScheduledExecutorService healthCheckExecutor ;
114+
105115 private final Duration timeout = DEFAULT_HEALTH_CHECK_READ_TIMEOUT ;
106116
107117 public HealthChecker (PulsarService pulsar ) throws PulsarServerException {
@@ -112,6 +122,8 @@ public HealthChecker(PulsarService pulsar) throws PulsarServerException {
112122 new ScheduledExecutorProvider (1 , "health-checker-client-lookup-executor" );
113123 this .scheduledExecutorProvider =
114124 new ScheduledExecutorProvider (1 , "health-checker-client-scheduled-executor" );
125+ this .healthCheckExecutor =
126+ Executors .newSingleThreadScheduledExecutor (new DefaultThreadFactory ("health-checker-executor" ));
115127 try {
116128 this .client = pulsar .createClientImpl (builder -> {
117129 builder .lookupExecutorProvider (lookupExecutor );
@@ -145,23 +157,30 @@ public CompletableFuture<Void> checkHealth(TopicVersion topicVersion, String cli
145157 log .info ("[{}] Running healthCheck with topic={}" , clientAppId , topicName );
146158 final String messageStr = UUID .randomUUID ().toString ();
147159 final String subscriptionName = "healthCheck-" + messageStr ;
148- // create non-partitioned topic manually and close the previous reader if present.
160+
149161 CompletableFuture <Void > resultFuture = new CompletableFuture <>();
162+ healthCheckExecutor .execute (
163+ () -> doHealthCheck (clientAppId , resultFuture , topicName , subscriptionName , messageStr ));
164+ return resultFuture ;
165+ }
166+
167+ private void doHealthCheck (String clientAppId , CompletableFuture <Void > resultFuture , String topicName ,
168+ String subscriptionName , String messageStr ) {
150169 addToPending (resultFuture );
151- resultFuture .whenComplete ((result , ex ) -> {
170+ resultFuture .whenCompleteAsync ((result , ex ) -> {
152171 removeFromPending (resultFuture );
153- });
172+ }, healthCheckExecutor );
154173 try {
155174 pulsar .getBrokerService ().getTopic (topicName , true )
156- .thenCompose (topicOptional -> {
175+ .thenComposeAsync (topicOptional -> {
157176 if (!topicOptional .isPresent ()) {
158177 log .error ("[{}] Fail to run health check while get topic {}. because get null value." ,
159178 clientAppId , topicName );
160179 return CompletableFuture .failedFuture (new BrokerServiceException .TopicNotFoundException (
161180 String .format ("Topic [%s] not found after create." , topicName )));
162181 }
163182 return doHealthCheck (clientAppId , topicName , subscriptionName , messageStr , resultFuture );
164- }).whenComplete ((result , t ) -> {
183+ }, healthCheckExecutor ).whenComplete ((result , t ) -> {
165184 if (t != null ) {
166185 resultFuture .completeExceptionally (t );
167186 } else {
@@ -175,7 +194,6 @@ public CompletableFuture<Void> checkHealth(TopicVersion topicVersion, String cli
175194 clientAppId , topicName , e );
176195 resultFuture .completeExceptionally (e );
177196 }
178- return resultFuture ;
179197 }
180198
181199 private synchronized void addToPending (CompletableFuture <Void > resultFuture ) {
@@ -188,34 +206,38 @@ private synchronized void removeFromPending(CompletableFuture<Void> resultFuture
188206
189207 private CompletableFuture <Void > doHealthCheck (String clientAppId , String topicName , String subscriptionName ,
190208 String messageStr , CompletableFuture <Void > resultFuture ) {
191- return client .newProducer (Schema .STRING ).topic (topicName ).createAsync ()
192- .thenCompose (producer -> client .newReader (Schema .STRING ).topic (topicName )
209+ return client .newProducer (Schema .STRING )
210+ .topic (topicName )
211+ .sendTimeout ((int ) timeout .toMillis (), TimeUnit .MILLISECONDS )
212+ .enableBatching (false )
213+ .createAsync ()
214+ .thenCompose (producer -> client .newReader (Schema .STRING )
215+ .topic (topicName )
193216 .subscriptionName (subscriptionName )
194217 .startMessageId (MessageId .latest )
195- .createAsync ().exceptionally (createException -> {
218+ .createAsync ()
219+ .exceptionally (createException -> {
196220 producer .closeAsync ().exceptionally (ex -> {
197221 log .error ("[{}] Close producer fail while heath check." , clientAppId );
198222 return null ;
199223 });
200224 throw FutureUtil .wrapToCompletionException (createException );
201- }).thenCompose (reader -> producer .sendAsync (messageStr )
225+ })
226+ .thenCompose (reader -> producer .sendAsync (messageStr )
202227 .thenCompose (__ -> FutureUtil .addTimeoutHandling (
203228 healthCheckRecursiveReadNext (reader , messageStr ),
204- timeout , pulsar . getBrokerService (). executor () ,
229+ timeout , healthCheckExecutor ,
205230 () -> HEALTH_CHECK_TIMEOUT_EXCEPTION ))
206- .whenComplete ((__ , ex ) -> {
207- closeAndReCheck (producer , reader , topicName ,
208- subscriptionName ,
209- clientAppId )
210- .whenComplete ((unused , innerEx ) -> {
211- if (ex != null ) {
212- resultFuture .completeExceptionally (ex );
213- } else {
214- resultFuture .complete (null );
215- }
216- });
217- }
218- ))
231+ .whenCompleteAsync ((__ , ex ) -> {
232+ closeAndReCheck (producer , reader , topicName , subscriptionName , clientAppId )
233+ .whenComplete ((unused , innerEx ) -> {
234+ if (ex != null ) {
235+ resultFuture .completeExceptionally (ex );
236+ } else {
237+ resultFuture .complete (null );
238+ }
239+ });
240+ }, healthCheckExecutor ))
219241 ).exceptionally (ex -> {
220242 resultFuture .completeExceptionally (ex );
221243 return null ;
@@ -237,14 +259,14 @@ private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reade
237259 String topicName , String subscriptionName , String clientAppId ) {
238260 // no matter exception or success, we still need to
239261 // close producer/reader
240- CompletableFuture <Void > producerFuture = producer .closeAsync ();
241- CompletableFuture <Void > readerFuture = reader .closeAsync ();
262+ CompletableFuture <Void > producerCloseFuture = producer .closeAsync ();
263+ CompletableFuture <Void > readerCloseFuture = reader .closeAsync ();
242264 List <CompletableFuture <Void >> futures = new ArrayList <>(2 );
243- futures .add (producerFuture );
244- futures .add (readerFuture );
245- return FutureUtil .waitForAll (Collections . unmodifiableList ( futures ) )
246- .exceptionally (closeException -> {
247- if (readerFuture .isCompletedExceptionally ()) {
265+ futures .add (producerCloseFuture );
266+ futures .add (readerCloseFuture );
267+ return FutureUtil .waitForAll (futures )
268+ .exceptionallyAsync (closeException -> {
269+ if (readerCloseFuture .isCompletedExceptionally ()) {
248270 log .error ("[{}] Close reader fail while health check." , clientAppId );
249271 Optional <Topic > topic = pulsar .getBrokerService ().getTopicReference (topicName );
250272 if (topic .isPresent ()) {
@@ -270,7 +292,7 @@ private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reade
270292 log .error ("[{}] Close producer fail while heath check." , clientAppId );
271293 }
272294 return null ;
273- });
295+ }, healthCheckExecutor );
274296 }
275297
276298 private static CompletableFuture <Void > healthCheckRecursiveReadNext (Reader <String > reader , String content ) {
@@ -290,14 +312,15 @@ private void deleteHeartbeatTopics() {
290312 log .info ("finish forcefully deleting heartbeat topics" );
291313 }
292314
293- private void deleteTopic (String heartbeatTopicV1 ) {
315+ private void deleteTopic (String topicName ) {
294316 try {
295- pulsar .getBrokerService ().deleteTopic (heartbeatTopicV1 , true ).get ();
317+ pulsar .getBrokerService ().deleteTopic (topicName , true )
318+ .get (pulsar .getConfiguration ().getMetadataStoreOperationTimeoutSeconds (), TimeUnit .SECONDS );
296319 } catch (Exception e ) {
297320 Throwable realCause = e .getCause ();
298321 if (!(realCause instanceof ManagedLedgerException .MetadataNotFoundException
299322 || realCause instanceof MetadataStoreException .NotFoundException )) {
300- log .error ("Errors in deleting heartbeat topic [{}]" , heartbeatTopicV1 , e );
323+ log .error ("Errors in deleting heartbeat topic [{}]" , topicName , e );
301324 }
302325 }
303326 }
@@ -321,10 +344,20 @@ public synchronized void close() throws Exception {
321344 }
322345 for (CompletableFuture <Void > pendingFuture : new ArrayList <>(pendingFutures )) {
323346 if (!pendingFuture .isDone ()) {
324- pendingFuture .completeExceptionally (
325- new PulsarClientException .AlreadyClosedException ("HealthChecker is closed" ));
347+ healthCheckExecutor .submit (() -> {
348+ try {
349+ pendingFuture .completeExceptionally (
350+ new PulsarClientException .AlreadyClosedException ("HealthChecker is closed" ));
351+ } catch (Exception e ) {
352+ log .warn ("Failed to complete pending future" , e );
353+ }
354+ });
326355 }
327356 }
357+ boolean terminated = MoreExecutors .shutdownAndAwaitTermination (healthCheckExecutor , 10 , TimeUnit .SECONDS );
358+ if (!terminated ) {
359+ log .warn ("Failed to shutdown health check executor in 10 seconds" );
360+ }
328361 deleteHeartbeatTopics ();
329362 }
330363}
0 commit comments