Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid potentially blocking calls to metadata on critical threads #12339

Merged
merged 6 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.base.Function;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
Expand All @@ -46,6 +47,7 @@
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -322,48 +324,43 @@ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName n
return updateSubscriptionPermissionAsync(namespace, subscriptionName, Collections.singleton(role), true);
}

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles,
boolean remove) {
CompletableFuture<Void> result = new CompletableFuture<>();

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
Set<String> roles,
boolean remove) {
try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
result.completeExceptionally(e);
return FutureUtil.failedFuture(e);
}

try {
Policies policies = pulsarResources.getNamespaceResources().getPolicies(namespace)
.orElseThrow(() -> new NotFoundException(namespace + " not found"));
if (remove) {
if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) {
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
}else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, subscriptionName, roles);
result.completeExceptionally(new IllegalArgumentException("couldn't find subscription"));
return result;
}
} else {
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles);
}
pulsarResources.getNamespaceResources().setPolicies(namespace, (data)->policies);
CompletableFuture<Void> future =
pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies -> {
if (remove) {
Set<String> subscriptionAuth =
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName);
if (subscriptionAuth != null) {
subscriptionAuth.removeAll(roles);
} else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace,
merlimat marked this conversation as resolved.
Show resolved Hide resolved
roles, subscriptionName);
throw new IllegalArgumentException("couldn't find subscription");
}
} else {
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles);
}
return policies;
}).thenRun(() -> {
log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName,
roles);
});

log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles);
result.complete(null);
} catch (NotFoundException e) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", subscriptionName, namespace);
result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace));
} catch (BadVersionException e) {
log.warn("[{}] Failed to set permissions for {} on namespace {}: concurrent modification", subscriptionName, roles, namespace);
result.completeExceptionally(new IllegalStateException(
"Concurrent modification on metadata path: " + namespace + ", " + e.getMessage()));
} catch (Exception e) {
log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace, e);
result.completeExceptionally(
new IllegalStateException("Failed to get permissions for namespace " + namespace));
}
future.exceptionally(ex -> {
log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace,
ex);
return null;
});

return result;
return future;
}

private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,31 +146,18 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration()
.getMaxMessageSizeCheckIntervalInSeconds());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
updatePublishDispatcher();
}

protected boolean isProducersExceeded() {
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);

if (maxProducers == null) {
Policies policies;
try {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
} catch (Exception e) {
policies = new Policies();
}
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
Expand Down Expand Up @@ -208,21 +195,12 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {
Policies policies;
try {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic,
e.getMessage());
policies = new Policies();
}
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
Expand Down Expand Up @@ -789,10 +767,10 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
}

public void updateMaxPublishRate(Policies policies) {
updatePublishDispatcher(policies);
updatePublishDispatcher();
}

private void updatePublishDispatcher(Policies policies) {
private void updatePublishDispatcher() {
//if topic-level policy exists, try to use topic-level publish rate policy
Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
Expand All @@ -802,9 +780,19 @@ private void updatePublishDispatcher(Policies policies) {
return;
}

Policies policies;
try {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this was added in the most recent commit to help with a mocked test. This block doesn't throw any checked exceptions. Do we expect RuntimeExceptions from it? If we do, should we wrap other similar code blocks in this PR with try and catch? If we don't, perhaps we should update the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many tests are using mocked versions which would throw exception... then the try/catch was the thing that was making all these tests to work..

} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}

//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
final PublishRate publishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2535,18 +2535,12 @@ public int getDefaultNumPartitions(final TopicName topicName) {
}

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};",
topicName, t.getMessage(), t);
return null;
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
log.debug("No autoTopicCreateOverride policy found for {}", topicName);
return null;
Expand All @@ -2568,18 +2562,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
}

private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoSubscriptionCreateOverride policy for {}: {};",
topicName, t.getMessage(), t);
return null;
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
}
log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,7 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {

public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
try {
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(namespace);
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
return Optional.empty();
}
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -120,7 +121,13 @@ public void setup() throws Exception {
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setTransactionCoordinatorEnabled(true);
pulsarMock = spy(new PulsarService(svcConfig));
doReturn(mock(PulsarResources.class)).when(pulsarMock).getPulsarResources();
PulsarResources pulsarResources = mock(PulsarResources.class);
doReturn(pulsarResources).when(pulsarMock).getPulsarResources();
NamespaceResources namespaceResources = mock(NamespaceResources.class);
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();

doReturn(Optional.of(new Policies())).when(namespaceResources).getPoliciesIfCached(any());

doReturn(new InMemTransactionBufferProvider()).when(pulsarMock).getTransactionBufferProvider();
doReturn(new TransactionPendingAckStoreProvider() {
@Override
Expand Down