Skip to content

Commit

Permalink
[Branch-2.8] Fix Broker HealthCheck Endpoint Exposes Race Conditions. (
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Mar 10, 2022
1 parent 0a5d41f commit 32f8065
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
Expand Down Expand Up @@ -780,19 +778,6 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyn
return future;
}

protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
Throwable realCause = FutureUtil.unwrapCompletionException(throwable);
if (realCause instanceof WebApplicationException) {
asyncResponse.resume(realCause);
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
} else if (realCause instanceof PulsarAdminException) {
asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
} else {
asyncResponse.resume(new RestException(realCause));
}
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -47,9 +50,10 @@
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -300,102 +304,133 @@ public void isReady(@Suspended AsyncResponse asyncResponse) {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception {
public void healthcheck(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccess();
internalRunHealthCheck()
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
asyncResponse.resume("ok");
}).exceptionally(ex -> {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private CompletableFuture<Void> internalRunHealthCheck() {
String topic;
PulsarClient client;
try {
validateSuperUserAccess();
String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
topic = String.format("persistent://%s/%s", heartbeatNamespace, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("Running healthCheck with topic={}", topic);
client = pulsar().getClient();
} catch (Exception e) {
LOG.error("Error getting heathcheck topic info", e);
throw new PulsarServerException(e);
} catch (PulsarServerException e) {
LOG.error("[{}] Fail to run health check while get client.", clientAppId());
throw new RestException(e);
}
String messageStr = UUID.randomUUID().toString();
final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader if present.
try {
pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
t.getSubscriptions().forEach((__, value) -> {
try {
value.deleteForcefully();
} catch (Exception e) {
LOG.warn("Failed to delete previous subscription {} for health check", value.getName(), e);
}
});
});
} catch (Exception e) {
LOG.warn("Failed to try to delete subscriptions for health check", e);
}
CompletableFuture<Producer<String>> producerFuture =
client.newProducer(Schema.STRING).topic(topic).createAsync();
CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING)
.topic(topic).startMessageId(MessageId.latest).createAsync();

CompletableFuture<Void> completePromise = new CompletableFuture<>();

CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
(ignore, exception) -> {
if (exception != null) {
completePromise.completeExceptionally(exception);
} else {
producerFuture.thenCompose((producer) -> producer.sendAsync(messageStr))
.whenComplete((ignore2, exception2) -> {
if (exception2 != null) {
completePromise.completeExceptionally(exception2);
}
return pulsar().getBrokerService().getTopic(topic, true).thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
clientAppId(), topic);
throw new RestException(Status.NOT_FOUND,
String.format("Topic [%s] not found after create.", topic));
}
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
client.newProducer(Schema.STRING).topic(topic).createAsync()
.thenCompose(producer -> client.newReader(Schema.STRING)
.topic(topic)
.subscriptionName(subscriptionName)
.startMessageId(MessageId.latest)
.createAsync()
.exceptionally(createException -> {
producer.closeAsync().exceptionally(ex -> {
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
return null;
});
throw FutureUtil.wrapToCompletionException(createException);
})
.thenCompose(reader -> producer.sendAsync(messageStr)
.thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
.whenComplete((__, ex) -> {
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
.whenComplete((unused, innerEx) -> {
if (ex != null) {
resultFuture.completeExceptionally(ex);
} else {
resultFuture.complete(null);
}
});
}))
).exceptionally(ex -> {
resultFuture.completeExceptionally(ex);
return null;
});
return resultFuture;
});
}

healthcheckReadLoop(readerFuture, completePromise, messageStr);

// timeout read loop after 10 seconds
FutureUtil.addTimeoutHandling(completePromise,
HEALTHCHECK_READ_TIMEOUT, pulsar().getExecutor(),
() -> FutureUtil.createTimeoutException("Timed out reading", getClass(),
"healthcheck(...)"));
private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
return reader.readNextAsync()
.thenCompose(msg -> {
if (!Objects.equals(content, msg.getValue())) {
return healthCheckRecursiveReadNext(reader, content);
}
return CompletableFuture.completedFuture(null);
});

completePromise.whenComplete((ignore, exception) -> {
producerFuture.thenAccept((producer) -> {
producer.closeAsync().whenComplete((ignore2, exception2) -> {
if (exception2 != null) {
LOG.warn("Error closing producer for healthcheck", exception2);
}
});
});
readerFuture.thenAccept((reader) -> {
reader.closeAsync().whenComplete((ignore2, exception2) -> {
if (exception2 != null) {
LOG.warn("Error closing reader for healthcheck", exception2);
}
});
});
if (exception != null) {
asyncResponse.resume(new RestException(exception));
} else {
asyncResponse.resume("ok");
}
});
}

private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture,
CompletableFuture<?> completablePromise,
String messageStr) {
readerFuture.thenAccept((reader) -> {
CompletableFuture<Message<String>> readFuture = reader.readNextAsync()
.whenComplete((m, exception) -> {
if (exception != null) {
completablePromise.completeExceptionally(exception);
} else if (m.getValue().equals(messageStr)) {
completablePromise.complete(null);
} else {
healthcheckReadLoop(readerFuture, completablePromise, messageStr);
}
});
});
/**
* Close producer and reader and then to re-check if this operation is success.
*
* Re-check
* - Producer: If close fails we will print error log to notify user.
* - Consumer: If close fails we will force delete subscription.
*
* @param producer Producer
* @param reader Reader
* @param topic Topic
* @param subscriptionName Subscription name
*/
private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName) {
// no matter exception or success, we still need to
// close producer/reader
CompletableFuture<Void> producerFuture = producer.closeAsync();
CompletableFuture<Void> readerFuture = reader.closeAsync();
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
futures.add(producerFuture);
futures.add(readerFuture);
return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
.exceptionally(closeException -> {
if (readerFuture.isCompletedExceptionally()) {
LOG.error("[{}] Close reader fail while heath check.", clientAppId());
Subscription subscription =
topic.getSubscription(subscriptionName);
// re-check subscription after reader close
if (subscription != null) {
LOG.warn("[{}] Force delete subscription {} "
+ "when it still exists after the"
+ " reader is closed.",
clientAppId(), subscription);
subscription.deleteForcefully()
.exceptionally(ex -> {
LOG.error("[{}] Force delete subscription fail"
+ " while health check",
clientAppId(), ex);
return null;
});
}
} else {
// producer future fail.
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
}
return null;
});
}

private synchronized void deleteDynamicConfigurationOnZk(String configName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
Expand All @@ -67,6 +68,8 @@
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.common.naming.Constants;
Expand Down Expand Up @@ -1207,4 +1210,17 @@ public static List<String> listSubTreeBFS(BaseResources resources, final String
}
return tree;
}

protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
Throwable realCause = FutureUtil.unwrapCompletionException(exception);
if (realCause instanceof WebApplicationException) {
asyncResponse.resume(realCause);
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
} else if (realCause instanceof PulsarAdminException) {
asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
} else {
asyncResponse.resume(new RestException(realCause));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {

@BeforeMethod
@Override
public void setup() throws Exception {
resetConfig();
super.internalSetup();
admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(
Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("pulsar", tenantInfo);
admin.namespaces().createNamespace("pulsar/system", Sets.newHashSet("test"));
admin.tenants().createTenant("public", tenantInfo);
admin.namespaces().createNamespace("public/default", Sets.newHashSet("test"));
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testHealthCheckup() throws Exception {
final int times = 30;
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar.getExecutor().execute(() -> {
try {
for (int i = 0; i < times; i++) {
admin.brokers().healthcheck();
}
future.complete(null);
} catch (PulsarAdminException e) {
future.completeExceptionally(e);
}
});
for (int i = 0; i < times; i++) {
admin.brokers().healthcheck();
}
// To ensure we don't have any subscription
final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
pulsar.getConfig().getWebServicePort().get());
Awaitility.await().untilAsserted(() -> {
Assert.assertFalse(future.isCompletedExceptionally());
});
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
.getSubscriptions(testHealthCheckTopic).stream()
// All system topics are using compaction, even though is not explicitly set in the policies.
.filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
.collect(Collectors.toList())
))
);
}
}
Loading

0 comments on commit 32f8065

Please sign in to comment.