Skip to content

Commit

Permalink
Avoid potentially blocking calls to metadata on critical threads (#12339
Browse files Browse the repository at this point in the history
)

* Avoid potentially blocking calls to metadata on critical threads

* Fixed log arguments order

* Addressed comments

* Fixed mock in PersistentSubscriptionTest

* Fixed issue in mocked tests

* Fixed test that was force policies modification under the hood
  • Loading branch information
merlimat committed Oct 14, 2021
1 parent ba7469b commit 37aca83
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.base.Function;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
Expand All @@ -46,6 +47,7 @@
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -322,48 +324,43 @@ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName n
return updateSubscriptionPermissionAsync(namespace, subscriptionName, Collections.singleton(role), true);
}

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles,
boolean remove) {
CompletableFuture<Void> result = new CompletableFuture<>();

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
Set<String> roles,
boolean remove) {
try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
result.completeExceptionally(e);
return FutureUtil.failedFuture(e);
}

try {
Policies policies = pulsarResources.getNamespaceResources().getPolicies(namespace)
.orElseThrow(() -> new NotFoundException(namespace + " not found"));
if (remove) {
if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) {
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
}else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, subscriptionName, roles);
result.completeExceptionally(new IllegalArgumentException("couldn't find subscription"));
return result;
}
} else {
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles);
}
pulsarResources.getNamespaceResources().setPolicies(namespace, (data)->policies);
CompletableFuture<Void> future =
pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies -> {
if (remove) {
Set<String> subscriptionAuth =
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName);
if (subscriptionAuth != null) {
subscriptionAuth.removeAll(roles);
} else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace,
roles, subscriptionName);
throw new IllegalArgumentException("couldn't find subscription");
}
} else {
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles);
}
return policies;
}).thenRun(() -> {
log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName,
roles);
});

log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles);
result.complete(null);
} catch (NotFoundException e) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", subscriptionName, namespace);
result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace));
} catch (BadVersionException e) {
log.warn("[{}] Failed to set permissions for {} on namespace {}: concurrent modification", subscriptionName, roles, namespace);
result.completeExceptionally(new IllegalStateException(
"Concurrent modification on metadata path: " + namespace + ", " + e.getMessage()));
} catch (Exception e) {
log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace, e);
result.completeExceptionally(
new IllegalStateException("Failed to get permissions for namespace " + namespace));
}
future.exceptionally(ex -> {
log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace,
ex);
return null;
});

return result;
return future;
}

private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,31 +146,18 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration()
.getMaxMessageSizeCheckIntervalInSeconds());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
updatePublishDispatcher(Optional.empty());
}

protected boolean isProducersExceeded() {
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);

if (maxProducers == null) {
Policies policies;
try {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
} catch (Exception e) {
policies = new Policies();
}
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
Expand Down Expand Up @@ -208,21 +195,12 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {
Policies policies;
try {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic,
e.getMessage());
policies = new Policies();
}
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
Expand Down Expand Up @@ -789,10 +767,10 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
}

public void updateMaxPublishRate(Policies policies) {
updatePublishDispatcher(policies);
updatePublishDispatcher(Optional.of(policies));
}

private void updatePublishDispatcher(Policies policies) {
private void updatePublishDispatcher(Optional<Policies> optPolicies) {
//if topic-level policy exists, try to use topic-level publish rate policy
Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
Expand All @@ -802,9 +780,23 @@ private void updatePublishDispatcher(Policies policies) {
return;
}

Policies policies;
try {
if (optPolicies.isPresent()) {
policies = optPolicies.get();
} else {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
}
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}

//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
final PublishRate publishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2535,18 +2535,12 @@ public int getDefaultNumPartitions(final TopicName topicName) {
}

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};",
topicName, t.getMessage(), t);
return null;
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
log.debug("No autoTopicCreateOverride policy found for {}", topicName);
return null;
Expand All @@ -2568,18 +2562,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
}

private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoSubscriptionCreateOverride policy for {}: {};",
topicName, t.getMessage(), t);
return null;
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
}
log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,7 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {

public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
try {
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(namespace);
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
return Optional.empty();
}
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -120,7 +121,13 @@ public void setup() throws Exception {
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setTransactionCoordinatorEnabled(true);
pulsarMock = spy(new PulsarService(svcConfig));
doReturn(mock(PulsarResources.class)).when(pulsarMock).getPulsarResources();
PulsarResources pulsarResources = mock(PulsarResources.class);
doReturn(pulsarResources).when(pulsarMock).getPulsarResources();
NamespaceResources namespaceResources = mock(NamespaceResources.class);
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();

doReturn(Optional.of(new Policies())).when(namespaceResources).getPoliciesIfCached(any());

doReturn(new InMemTransactionBufferProvider()).when(pulsarMock).getTransactionBufferProvider();
doReturn(new TransactionPendingAckStoreProvider() {
@Override
Expand Down

0 comments on commit 37aca83

Please sign in to comment.