diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 066592a38ab35..9f85359234b58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -40,10 +40,8 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; -import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; @@ -780,19 +778,6 @@ private CompletableFuture provisionPartitionedTopicPath(AsyncResponse asyn return future; } - protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { - Throwable realCause = FutureUtil.unwrapCompletionException(throwable); - if (realCause instanceof WebApplicationException) { - asyncResponse.resume(realCause); - } else if (realCause instanceof BrokerServiceException.NotAllowedException) { - asyncResponse.resume(new RestException(Status.CONFLICT, realCause)); - } else if (realCause instanceof PulsarAdminException) { - asyncResponse.resume(new RestException(((PulsarAdminException) realCause))); - } else { - asyncResponse.resume(new RestException(realCause)); - } - } - protected CompletableFuture getSchemaCompatibilityStrategyAsync() { return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> { SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index ad4a6ffa9b4f6..15303b1b45d33 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -24,9 +24,12 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -47,9 +50,10 @@ import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -300,102 +304,133 @@ public void isReady(@Suspended AsyncResponse asyncResponse) { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")}) - public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception { + public void healthcheck(@Suspended AsyncResponse asyncResponse) { + validateSuperUserAccess(); + internalRunHealthCheck() + .thenAccept(__ -> { + LOG.info("[{}] Successfully run health check.", clientAppId()); + asyncResponse.resume("ok"); + }).exceptionally(ex -> { + LOG.error("[{}] Fail to run health check.", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + private CompletableFuture internalRunHealthCheck() { String topic; PulsarClient client; try { - validateSuperUserAccess(); String heartbeatNamespace = NamespaceService.getHeartbeatNamespace( pulsar().getAdvertisedAddress(), pulsar().getConfiguration()); topic = String.format("persistent://%s/%s", heartbeatNamespace, HEALTH_CHECK_TOPIC_SUFFIX); LOG.info("Running healthCheck with topic={}", topic); client = pulsar().getClient(); - } catch (Exception e) { - LOG.error("Error getting heathcheck topic info", e); - throw new PulsarServerException(e); + } catch (PulsarServerException e) { + LOG.error("[{}] Fail to run health check while get client.", clientAppId()); + throw new RestException(e); } String messageStr = UUID.randomUUID().toString(); + final String subscriptionName = "healthCheck-" + messageStr; // create non-partitioned topic manually and close the previous reader if present. - try { - pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> { - t.getSubscriptions().forEach((__, value) -> { - try { - value.deleteForcefully(); - } catch (Exception e) { - LOG.warn("Failed to delete previous subscription {} for health check", value.getName(), e); - } - }); - }); - } catch (Exception e) { - LOG.warn("Failed to try to delete subscriptions for health check", e); - } - CompletableFuture> producerFuture = - client.newProducer(Schema.STRING).topic(topic).createAsync(); - CompletableFuture> readerFuture = client.newReader(Schema.STRING) - .topic(topic).startMessageId(MessageId.latest).createAsync(); - - CompletableFuture completePromise = new CompletableFuture<>(); - - CompletableFuture.allOf(producerFuture, readerFuture).whenComplete( - (ignore, exception) -> { - if (exception != null) { - completePromise.completeExceptionally(exception); - } else { - producerFuture.thenCompose((producer) -> producer.sendAsync(messageStr)) - .whenComplete((ignore2, exception2) -> { - if (exception2 != null) { - completePromise.completeExceptionally(exception2); - } + return pulsar().getBrokerService().getTopic(topic, true).thenCompose(topicOptional -> { + if (!topicOptional.isPresent()) { + LOG.error("[{}] Fail to run health check while get topic {}. because get null value.", + clientAppId(), topic); + throw new RestException(Status.NOT_FOUND, + String.format("Topic [%s] not found after create.", topic)); + } + CompletableFuture resultFuture = new CompletableFuture<>(); + client.newProducer(Schema.STRING).topic(topic).createAsync() + .thenCompose(producer -> client.newReader(Schema.STRING) + .topic(topic) + .subscriptionName(subscriptionName) + .startMessageId(MessageId.latest) + .createAsync() + .exceptionally(createException -> { + producer.closeAsync().exceptionally(ex -> { + LOG.error("[{}] Close producer fail while heath check.", clientAppId()); + return null; }); + throw FutureUtil.wrapToCompletionException(createException); + }) + .thenCompose(reader -> producer.sendAsync(messageStr) + .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr)) + .whenComplete((__, ex) -> { + closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName) + .whenComplete((unused, innerEx) -> { + if (ex != null) { + resultFuture.completeExceptionally(ex); + } else { + resultFuture.complete(null); + } + }); + })) + ).exceptionally(ex -> { + resultFuture.completeExceptionally(ex); + return null; + }); + return resultFuture; + }); + } - healthcheckReadLoop(readerFuture, completePromise, messageStr); - - // timeout read loop after 10 seconds - FutureUtil.addTimeoutHandling(completePromise, - HEALTHCHECK_READ_TIMEOUT, pulsar().getExecutor(), - () -> FutureUtil.createTimeoutException("Timed out reading", getClass(), - "healthcheck(...)")); + private CompletableFuture healthCheckRecursiveReadNext(Reader reader, String content) { + return reader.readNextAsync() + .thenCompose(msg -> { + if (!Objects.equals(content, msg.getValue())) { + return healthCheckRecursiveReadNext(reader, content); } + return CompletableFuture.completedFuture(null); }); - - completePromise.whenComplete((ignore, exception) -> { - producerFuture.thenAccept((producer) -> { - producer.closeAsync().whenComplete((ignore2, exception2) -> { - if (exception2 != null) { - LOG.warn("Error closing producer for healthcheck", exception2); - } - }); - }); - readerFuture.thenAccept((reader) -> { - reader.closeAsync().whenComplete((ignore2, exception2) -> { - if (exception2 != null) { - LOG.warn("Error closing reader for healthcheck", exception2); - } - }); - }); - if (exception != null) { - asyncResponse.resume(new RestException(exception)); - } else { - asyncResponse.resume("ok"); - } - }); } - private void healthcheckReadLoop(CompletableFuture> readerFuture, - CompletableFuture completablePromise, - String messageStr) { - readerFuture.thenAccept((reader) -> { - CompletableFuture> readFuture = reader.readNextAsync() - .whenComplete((m, exception) -> { - if (exception != null) { - completablePromise.completeExceptionally(exception); - } else if (m.getValue().equals(messageStr)) { - completablePromise.complete(null); - } else { - healthcheckReadLoop(readerFuture, completablePromise, messageStr); - } - }); - }); + /** + * Close producer and reader and then to re-check if this operation is success. + * + * Re-check + * - Producer: If close fails we will print error log to notify user. + * - Consumer: If close fails we will force delete subscription. + * + * @param producer Producer + * @param reader Reader + * @param topic Topic + * @param subscriptionName Subscription name + */ + private CompletableFuture closeAndReCheck(Producer producer, Reader reader, + Topic topic, String subscriptionName) { + // no matter exception or success, we still need to + // close producer/reader + CompletableFuture producerFuture = producer.closeAsync(); + CompletableFuture readerFuture = reader.closeAsync(); + List> futures = new ArrayList<>(2); + futures.add(producerFuture); + futures.add(readerFuture); + return FutureUtil.waitForAll(Collections.unmodifiableList(futures)) + .exceptionally(closeException -> { + if (readerFuture.isCompletedExceptionally()) { + LOG.error("[{}] Close reader fail while heath check.", clientAppId()); + Subscription subscription = + topic.getSubscription(subscriptionName); + // re-check subscription after reader close + if (subscription != null) { + LOG.warn("[{}] Force delete subscription {} " + + "when it still exists after the" + + " reader is closed.", + clientAppId(), subscription); + subscription.deleteForcefully() + .exceptionally(ex -> { + LOG.error("[{}] Force delete subscription fail" + + " while health check", + clientAppId(), ex); + return null; + }); + } + } else { + // producer future fail. + LOG.error("[{}] Close producer fail while heath check.", clientAppId()); + } + return null; + }); } private synchronized void deleteDynamicConfigurationOnZk(String configName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 71eae45fc94a6..8fe63753b177c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -43,6 +43,7 @@ import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -67,6 +68,8 @@ import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.ResourceGroupResources; import org.apache.pulsar.broker.resources.TenantResources; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.common.naming.Constants; @@ -1207,4 +1210,17 @@ public static List listSubTreeBFS(BaseResources resources, final String } return tree; } + + protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) { + Throwable realCause = FutureUtil.unwrapCompletionException(exception); + if (realCause instanceof WebApplicationException) { + asyncResponse.resume(realCause); + } else if (realCause instanceof BrokerServiceException.NotAllowedException) { + asyncResponse.resume(new RestException(Status.CONFLICT, realCause)); + } else if (realCause instanceof PulsarAdminException) { + asyncResponse.resume(new RestException(((PulsarAdminException) realCause))); + } else { + asyncResponse.resume(new RestException(realCause)); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java new file mode 100644 index 0000000000000..c44d771d62ddf --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import com.google.common.collect.Sets; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.compaction.Compactor; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest { + + @BeforeMethod + @Override + public void setup() throws Exception { + resetConfig(); + super.internalSetup(); + admin.clusters().createCluster("test", + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl( + Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("pulsar", tenantInfo); + admin.namespaces().createNamespace("pulsar/system", Sets.newHashSet("test")); + admin.tenants().createTenant("public", tenantInfo); + admin.namespaces().createNamespace("public/default", Sets.newHashSet("test")); + } + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testHealthCheckup() throws Exception { + final int times = 30; + CompletableFuture future = new CompletableFuture<>(); + pulsar.getExecutor().execute(() -> { + try { + for (int i = 0; i < times; i++) { + admin.brokers().healthcheck(); + } + future.complete(null); + } catch (PulsarAdminException e) { + future.completeExceptionally(e); + } + }); + for (int i = 0; i < times; i++) { + admin.brokers().healthcheck(); + } + // To ensure we don't have any subscription + final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck", + pulsar.getConfig().getWebServicePort().get()); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse(future.isCompletedExceptionally()); + }); + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(CollectionUtils.isEmpty(admin.topics() + .getSubscriptions(testHealthCheckTopic).stream() + // All system topics are using compaction, even though is not explicitly set in the policies. + .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION)) + .collect(Collectors.toList()) + )) + ); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 45fd7f79c34d8..7d9fd79aa7fbb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -78,8 +78,7 @@ public static CompletableFuture waitForAllAndSupportCancel(List Optional getException(CompletableFuture future) } return Optional.empty(); } + + /** + * Wrap throwable exception to CompletionException if that exception is not an instance of CompletionException. + * + * @param throwable Exception + * @return CompletionException + */ + public static CompletionException wrapToCompletionException(Throwable throwable) { + if (throwable instanceof CompletionException) { + return (CompletionException) throwable; + } else { + return new CompletionException(throwable); + } + } }