Skip to content

Commit

Permalink
[pulsar-broker] configure namespace anti-affinity in local policies (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and flowchartsman committed Nov 17, 2020
1 parent 45037f4 commit 163c343
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -2181,40 +2182,56 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty");
}

Map.Entry<Policies, Stat> policiesNode = null;

try {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().antiAffinityGroup = antiAffinityGroup;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString());
Stat nodeStat = new Stat();

log.info("[{}] Successfully updated the antiAffinityGroup {} on namespace {}", clientAppId(),
antiAffinityGroup, namespaceName);
LocalPolicies localPolicies = null;
int version = -1;
try {
byte[] content = pulsar().getLocalZkCache().getZooKeeper().getData(path, false, nodeStat);
localPolicies = jsonMapper().readValue(content, LocalPolicies.class);
version = nodeStat.getVersion();
} catch (KeeperException.NoNodeException e) {
log.info("local-policies for {} is not setup at path {}", this.namespaceName, path);
// if policies is not present into localZk then create new policies
this.pulsar().getLocalZkCacheService().createPolicies(path, false)
.get(pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
localPolicies = new LocalPolicies();
}
localPolicies.namespaceAntiAffinityGroup = antiAffinityGroup;
byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localPolicies);
pulsar().getLocalZkCache().getZooKeeper().setData(path, data, Math.toIntExact(version));
// invalidate namespace's local-policies
pulsar().getLocalZkCacheService().policiesCache().invalidate(path);
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(localPolicies));
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the antiAffinityGroup for namespace {}: does not exist", clientAppId(),
log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to update the antiAffinityGroup on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

log.warn("[{}] Failed to update policies for namespace {}: concurrent modification", clientAppId(),
namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the antiAffinityGroup on namespace {}", clientAppId(), namespaceName, e);
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
e);
throw new RestException(e);
}
}

protected String internalGetNamespaceAntiAffinityGroup() {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).antiAffinityGroup;

try {
return localPoliciesCache()
.get(AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString()))
.orElse(new LocalPolicies()).namespaceAntiAffinityGroup;
} catch (Exception e) {
log.error("[{}] Failed to get the antiAffinityGroup of namespace {}", clientAppId(), namespaceName, e);
throw new RestException(Status.NOT_FOUND, "Couldn't find namespace policies");
}
}

protected void internalRemoveNamespaceAntiAffinityGroup() {
Expand All @@ -2225,12 +2242,12 @@ protected void internalRemoveNamespaceAntiAffinityGroup() {

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
policies.antiAffinityGroup = null;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
final String path = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString());
byte[] content = localZk().getData(path, null, nodeStat);
LocalPolicies policies = jsonMapper().readValue(content, LocalPolicies.class);
policies.namespaceAntiAffinityGroup = null;
localZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString()));
log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName);

} catch (KeeperException.NoNodeException e) {
Expand Down Expand Up @@ -2265,14 +2282,14 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
List<String> namespaces = getListOfNamespaces(tenant);

return namespaces.stream().filter(ns -> {
Optional<Policies> policies;
Optional<LocalPolicies> policies;
try {
policies = policiesCache().get(AdminResource.path(POLICIES, ns));
policies = localPoliciesCache().get(AdminResource.joinPath(LOCAL_POLICIES_ROOT, ns));
} catch (Exception e) {
throw new RuntimeException(e);
}

String storedAntiAffinityGroup = policies.orElse(new Policies()).antiAffinityGroup;
String storedAntiAffinityGroup = policies.orElse(new LocalPolicies()).namespaceAntiAffinityGroup;
return antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup);
}).collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.pulsar.broker.loadbalance.impl;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;

import java.io.IOException;
Expand All @@ -45,7 +45,7 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
Expand Down Expand Up @@ -421,14 +421,14 @@ public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOw
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {

CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
ZooKeeperDataCache<Policies> policiesCache = pulsar.getConfigurationCache().policiesCache();
ZooKeeperDataCache<LocalPolicies> policiesCache = pulsar.getLocalZkCacheService().policiesCache();

policiesCache.getAsync(path(POLICIES, namespaceName)).thenAccept(policies -> {
if (!policies.isPresent() || StringUtils.isBlank(policies.get().antiAffinityGroup)) {
policiesCache.getAsync(joinPath(LOCAL_POLICIES_ROOT, namespaceName)).thenAccept(policies -> {
if (!policies.isPresent() || StringUtils.isBlank(policies.get().namespaceAntiAffinityGroup)) {
antiAffinityNsBrokersResult.complete(null);
return;
}
final String antiAffinityGroup = policies.get().antiAffinityGroup;
final String antiAffinityGroup = policies.get().namespaceAntiAffinityGroup;
final Map<String, Integer> brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>();
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
Expand All @@ -439,8 +439,8 @@ public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOw

CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> {
if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().antiAffinityGroup)) {
policiesCache.getAsync(joinPath(LOCAL_POLICIES_ROOT, ns)).thenAccept(nsPolicies -> {
if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().namespaceAntiAffinityGroup)) {
brokerToAntiAffinityNamespaceCount.compute(broker,
(brokerName, count) -> count == null ? 1 : count + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -69,7 +69,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -655,9 +655,9 @@ private void updateBundleUnloadingMetrics(Multimap<String, String> bundlesToUnlo

public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
try {
Optional<Policies> nsPolicies = pulsar.getConfigurationCache().policiesCache()
.get(path(POLICIES, namespace));
if (!nsPolicies.isPresent() || StringUtils.isBlank(nsPolicies.get().antiAffinityGroup)) {
Optional<LocalPolicies> nsPolicies = pulsar.getLocalZkCacheService().policiesCache()
.get(joinPath(LOCAL_POLICIES_ROOT, namespace));
if (!nsPolicies.isPresent() || StringUtils.isBlank(nsPolicies.get().namespaceAntiAffinityGroup)) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
public class LocalPolicies {

public BundlesData bundles;
// bookie affinity group for bookie-isolation
public BookieAffinityGroupData bookieAffinityGroup;
// namespace anti-affinity-group
public String namespaceAntiAffinityGroup;

public LocalPolicies() {
bundles = defaultBundle();
Expand All @@ -44,7 +47,8 @@ public boolean equals(Object obj) {
if (obj instanceof LocalPolicies) {
LocalPolicies other = (LocalPolicies) obj;
return Objects.equal(bundles, other.bundles)
&& Objects.equal(bookieAffinityGroup, other.bookieAffinityGroup);
&& Objects.equal(bookieAffinityGroup, other.bookieAffinityGroup)
&& Objects.equal(namespaceAntiAffinityGroup, other.namespaceAntiAffinityGroup);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public RetentionPolicies retention_policies = null;
public boolean deleted = false;
public String antiAffinityGroup;

public static final String FIRST_BOUNDARY = "0x00000000";
public static final String LAST_BOUNDARY = "0xffffffff";

Expand Down Expand Up @@ -124,7 +122,7 @@ public int hashCode() {
message_ttl_in_seconds, subscription_expiration_time_minutes, retention_policies,
encryption_required, delayed_delivery_policies, inactive_topic_policies,
subscription_auth_mode,
antiAffinityGroup, max_producers_per_topic,
max_producers_per_topic,
max_consumers_per_topic, max_consumers_per_subscription,
max_unacked_messages_per_consumer, max_unacked_messages_per_subscription,
compaction_threshold, offload_threshold,
Expand Down Expand Up @@ -162,7 +160,6 @@ public boolean equals(Object obj) {
&& Objects.equals(delayed_delivery_policies, other.delayed_delivery_policies)
&& Objects.equals(inactive_topic_policies, other.inactive_topic_policies)
&& Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
&& Objects.equals(antiAffinityGroup, other.antiAffinityGroup)
&& max_producers_per_topic == other.max_producers_per_topic
&& max_consumers_per_topic == other.max_consumers_per_topic
&& max_consumers_per_subscription == other.max_consumers_per_subscription
Expand Down Expand Up @@ -214,7 +211,6 @@ public String toString() {
.add("clusterSubscribeRate", clusterSubscribeRate)
.add("publishMaxMessageRate", publishMaxMessageRate)
.add("latency_stats_sample_rate", latency_stats_sample_rate)
.add("antiAffinityGroup", antiAffinityGroup)
.add("message_ttl_in_seconds", message_ttl_in_seconds)
.add("subscription_expiration_time_minutes", subscription_expiration_time_minutes)
.add("retention_policies", retention_policies)
Expand Down

0 comments on commit 163c343

Please sign in to comment.