From a08921ca6cb1dad98935808c8f474b654f861263 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 19 Mar 2018 11:04:27 -0700 Subject: [PATCH] YARN-8002. Support NOT_SELF and ALL namespace types for allocation tag. (Weiwei Yang via wangda) Change-Id: I63b4e4192a95bf7ded98c54e46a2871c72869700 --- .../records/AllocationTagNamespaceType.java | 29 -- .../yarn/api/records/AllocationTags.java | 50 ---- .../api/resource/PlacementConstraints.java | 58 +++- .../InvalidAllocationTagException.java | 34 --- .../constraint}/AllocationTagNamespace.java | 114 +++----- .../scheduler/constraint/AllocationTags.java | 82 ++++++ .../constraint/AllocationTagsManager.java | 146 +++++++--- .../scheduler/constraint}/Evaluable.java | 2 +- .../constraint/PlacementConstraintsUtil.java | 55 +--- .../constraint}/TargetApplications.java | 8 +- .../algorithm/LocalAllocationTagsManager.java | 27 +- ...SingleConstraintAppPlacementAllocator.java | 17 +- .../yarn/server/resourcemanager/MockAM.java | 11 +- .../rmcontainer/TestRMContainerImpl.java | 47 ++- ...tSchedulingRequestContainerAllocation.java | 126 ++++++++ .../constraint/TestAllocationTagsManager.java | 272 +++++++++++++++--- .../TestAllocationTagsNamespace.java | 36 +-- .../TestPlacementConstraintsUtil.java | 247 +++++++++++++++- .../TestLocalAllocationTagsManager.java | 33 ++- ...SingleConstraintAppPlacementAllocator.java | 9 +- 20 files changed, 1031 insertions(+), 372 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java rename hadoop-yarn-project/hadoop-yarn/{hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records => hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint}/AllocationTagNamespace.java (75%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java rename hadoop-yarn-project/hadoop-yarn/{hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records => hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint}/Evaluable.java (94%) rename hadoop-yarn-project/hadoop-yarn/{hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records => hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint}/TargetApplications.java (92%) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java index 5e46cd0f3d94c..de5492ee25f24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java @@ -18,12 +18,6 @@ package org.apache.hadoop.yarn.api.records; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; - -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Collectors; - /** * Class to describe all supported forms of namespaces for an allocation tag. */ @@ -44,29 +38,6 @@ public String getTypeKeyword() { return this.typeKeyword; } - /** - * Parses the namespace type from a given string. - * @param prefix namespace prefix. - * @return namespace type. - * @throws InvalidAllocationTagException - */ - public static AllocationTagNamespaceType fromString(String prefix) throws - InvalidAllocationTagException { - for (AllocationTagNamespaceType type : - AllocationTagNamespaceType.values()) { - if(type.getTypeKeyword().equals(prefix)) { - return type; - } - } - - Set values = Arrays.stream(AllocationTagNamespaceType.values()) - .map(AllocationTagNamespaceType::toString) - .collect(Collectors.toSet()); - throw new InvalidAllocationTagException( - "Invalid namespace prefix: " + prefix - + ", valid values are: " + String.join(",", values)); - } - @Override public String toString() { return this.getTypeKeyword(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java deleted file mode 100644 index 50bffc3551615..0000000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records; - -import java.util.Set; - -/** - * Allocation tags under same namespace. - */ -public class AllocationTags { - - private AllocationTagNamespace ns; - private Set tags; - - public AllocationTags(AllocationTagNamespace namespace, - Set allocationTags) { - this.ns = namespace; - this.tags = allocationTags; - } - - /** - * @return the namespace of these tags. - */ - public AllocationTagNamespace getNamespace() { - return this.ns; - } - - /** - * @return the allocation tags. - */ - public Set getTags() { - return this.tags; - } -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java index af70e2a7471d1..02138bd942932 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java @@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr; @@ -107,6 +107,25 @@ public static AbstractConstraint cardinality(String scope, int minCardinality, PlacementTargets.allocationTag(allocationTags)); } + /** + * Similar to {@link #cardinality(String, int, int, String...)}, but let you + * attach a namespace to the given allocation tags. + * + * @param scope the scope of the constraint + * @param namespace the namespace of the allocation tags + * @param minCardinality determines the minimum number of allocations within + * the scope + * @param maxCardinality determines the maximum number of allocations within + * the scope + * @param allocationTags allocation tags + * @return the resulting placement constraint + */ + public static AbstractConstraint cardinality(String scope, String namespace, + int minCardinality, int maxCardinality, String... allocationTags) { + return new SingleConstraint(scope, minCardinality, maxCardinality, + PlacementTargets.allocationTagWithNamespace(namespace, allocationTags)); + } + /** * Similar to {@link #cardinality(String, int, int, String...)}, but * determines only the minimum cardinality (the maximum cardinality is @@ -124,6 +143,23 @@ public static AbstractConstraint minCardinality(String scope, allocationTags); } + /** + * Similar to {@link #minCardinality(String, int, String...)}, but let you + * attach a namespace to the allocation tags. + * + * @param scope the scope of the constraint + * @param namespace the namespace of these tags + * @param minCardinality determines the minimum number of allocations within + * the scope + * @param allocationTags the constraint targets allocations with these tags + * @return the resulting placement constraint + */ + public static AbstractConstraint minCardinality(String scope, + String namespace, int minCardinality, String... allocationTags) { + return cardinality(scope, namespace, minCardinality, Integer.MAX_VALUE, + allocationTags); + } + /** * Similar to {@link #cardinality(String, int, int, String...)}, but * determines only the maximum cardinality (the minimum cardinality is 0). @@ -139,6 +175,23 @@ public static AbstractConstraint maxCardinality(String scope, return cardinality(scope, 0, maxCardinality, allocationTags); } + /** + * Similar to {@link #maxCardinality(String, int, String...)}, but let you + * specify a namespace for the tags, see supported namespaces in + * {@link AllocationTagNamespaceType}. + * + * @param scope the scope of the constraint + * @param tagNamespace the namespace of these tags + * @param maxCardinality determines the maximum number of allocations within + * the scope + * @param allocationTags allocation tags + * @return the resulting placement constraint + */ + public static AbstractConstraint maxCardinality(String scope, + String tagNamespace, int maxCardinality, String... allocationTags) { + return cardinality(scope, tagNamespace, 0, maxCardinality, allocationTags); + } + /** * This constraint generalizes the cardinality and target constraints. * @@ -242,9 +295,8 @@ public static TargetExpression allocationTagWithNamespace(String namespace, */ public static TargetExpression allocationTagToIntraApp( String... allocationTags) { - AllocationTagNamespace selfNs = new AllocationTagNamespace.Self(); return new TargetExpression(TargetType.ALLOCATION_TAG, - selfNs.toString(), allocationTags); + AllocationTagNamespaceType.SELF.toString(), allocationTags); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java deleted file mode 100644 index be8d881d7ae0b..0000000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.exceptions; - -/** - * This exception is thrown by - * {@link - * org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)} - * when it fails to parse a namespace. - */ -public class InvalidAllocationTagException extends YarnException { - - private static final long serialVersionUID = 1L; - - public InvalidAllocationTagException(String message) { - super(message); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java similarity index 75% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java index 25f876156bde6..7b9f3bee19b51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java @@ -16,22 +16,24 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.api.records; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; +import org.apache.hadoop.yarn.api.records.ApplicationId; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString; /** * Class to describe the namespace of an allocation tag. @@ -69,8 +71,6 @@ public AllocationTagNamespaceType getNamespaceType() { /** * Get the scope of the namespace, in form of a set of applications. - * Before calling this method, {@link #evaluate(TargetApplications)} - * must be called in prior to ensure the scope is proper evaluated. * * @return a set of applications. */ @@ -83,51 +83,20 @@ public Set getNamespaceScope() { return this.nsScope; } - @Override - public abstract void evaluate(TargetApplications target) - throws InvalidAllocationTagException; - /** - * @return true if the namespace is effective in all applications - * in this cluster. Specifically the namespace prefix should be - * "all". - */ - public boolean isGlobal() { - return AllocationTagNamespaceType.ALL.equals(getNamespaceType()); - } - - /** - * @return true if the namespace is effective within a single application - * by its application ID, the namespace prefix should be "app-id"; - * false otherwise. - */ - public boolean isSingleInterApp() { - return AllocationTagNamespaceType.APP_ID.equals(getNamespaceType()); - } - - /** - * @return true if the namespace is effective to the application itself, - * the namespace prefix should be "self"; false otherwise. - */ - public boolean isIntraApp() { - return AllocationTagNamespaceType.SELF.equals(getNamespaceType()); - } - - /** - * @return true if the namespace is effective to all applications except - * itself, the namespace prefix should be "not-self"; false otherwise. - */ - public boolean isNotSelf() { - return AllocationTagNamespaceType.NOT_SELF.equals(getNamespaceType()); - } - - /** - * @return true if the namespace is effective to a group of applications - * identified by a application label, the namespace prefix should be - * "app-label"; false otherwise. + * Evaluate the namespace against given target applications + * if it is necessary. Only self/not-self/app-label namespace types + * require this evaluation step, because they are not binding to a + * specific scope during initiating. So we do lazy binding for them + * in this method. + * + * @param target a generic type target that impacts this evaluation. + * @throws InvalidAllocationTagsQueryException */ - public boolean isAppLabel() { - return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType()); + @Override + public void evaluate(TargetApplications target) + throws InvalidAllocationTagsQueryException { + // Sub-class needs to override this when it requires the eval step. } @Override @@ -146,9 +115,9 @@ public Self() { @Override public void evaluate(TargetApplications target) - throws InvalidAllocationTagException { + throws InvalidAllocationTagsQueryException { if (target == null || target.getCurrentApplicationId() == null) { - throw new InvalidAllocationTagException("Namespace Self must" + throw new InvalidAllocationTagsQueryException("Namespace Self must" + " be evaluated against a single application ID."); } ApplicationId applicationId = target.getCurrentApplicationId(); @@ -196,12 +165,6 @@ public static class All extends AllocationTagNamespace { public All() { super(ALL); } - - @Override - public void evaluate(TargetApplications target) { - Set allAppIds = target.getAllApplicationIds(); - setScopeIfNotNull(allAppIds); - } } /** @@ -229,10 +192,6 @@ public static class AppID extends AllocationTagNamespace { public AppID(ApplicationId applicationId) { super(APP_ID); this.targetAppId = applicationId; - } - - @Override - public void evaluate(TargetApplications target) { setScopeIfNotNull(ImmutableSet.of(targetAppId)); } @@ -248,11 +207,11 @@ public String toString() { * * @param namespaceStr namespace string. * @return an instance of {@link AllocationTagNamespace}. - * @throws InvalidAllocationTagException + * @throws InvalidAllocationTagsQueryException * if given string is not in valid format */ public static AllocationTagNamespace parse(String namespaceStr) - throws InvalidAllocationTagException { + throws InvalidAllocationTagsQueryException { // Return the default namespace if no valid string is given. if (Strings.isNullOrEmpty(namespaceStr)) { return new Self(); @@ -273,7 +232,7 @@ public static AllocationTagNamespace parse(String namespaceStr) return new All(); case APP_ID: if (nsValues.size() != 2) { - throw new InvalidAllocationTagException( + throw new InvalidAllocationTagsQueryException( "Missing the application ID in the namespace string: " + namespaceStr); } @@ -282,18 +241,35 @@ public static AllocationTagNamespace parse(String namespaceStr) case APP_LABEL: return new AppLabel(); default: - throw new InvalidAllocationTagException( + throw new InvalidAllocationTagsQueryException( "Invalid namespace string " + namespaceStr); } } + private static AllocationTagNamespaceType fromString(String prefix) throws + InvalidAllocationTagsQueryException { + for (AllocationTagNamespaceType type : + AllocationTagNamespaceType.values()) { + if(type.getTypeKeyword().equals(prefix)) { + return type; + } + } + + Set values = Arrays.stream(AllocationTagNamespaceType.values()) + .map(AllocationTagNamespaceType::toString) + .collect(Collectors.toSet()); + throw new InvalidAllocationTagsQueryException( + "Invalid namespace prefix: " + prefix + + ", valid values are: " + String.join(",", values)); + } + private static AllocationTagNamespace parseAppID(String appIDStr) - throws InvalidAllocationTagException { + throws InvalidAllocationTagsQueryException { try { ApplicationId applicationId = ApplicationId.fromString(appIDStr); return new AppID(applicationId); } catch (IllegalArgumentException e) { - throw new InvalidAllocationTagException( + throw new InvalidAllocationTagsQueryException( "Invalid application ID for " + APP_ID.getTypeKeyword() + ": " + appIDStr); } @@ -307,11 +283,11 @@ private static AllocationTagNamespace parseAppID(String appIDStr) * * @param namespaceStr namespace string. * @return a list of parsed strings. - * @throws InvalidAllocationTagException + * @throws InvalidAllocationTagsQueryException * if namespace format is unexpected. */ private static List normalize(String namespaceStr) - throws InvalidAllocationTagException { + throws InvalidAllocationTagsQueryException { List result = new ArrayList<>(); if (namespaceStr == null) { return result; @@ -326,7 +302,7 @@ private static List normalize(String namespaceStr) // Currently we only allow 1 or 2 values for a namespace string if (result.size() == 0 || result.size() > 2) { - throw new InvalidAllocationTagException("Invalid namespace string: " + throw new InvalidAllocationTagsQueryException("Invalid namespace string: " + namespaceStr + ", the syntax is or" + " /"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java new file mode 100644 index 0000000000000..dc0237ec77b04 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import java.util.Set; + +/** + * Allocation tags under same namespace. + */ +public final class AllocationTags { + + private AllocationTagNamespace ns; + private Set tags; + + private AllocationTags(AllocationTagNamespace namespace, + Set allocationTags) { + this.ns = namespace; + this.tags = allocationTags; + } + + /** + * @return the namespace of these tags. + */ + public AllocationTagNamespace getNamespace() { + return this.ns; + } + + /** + * @return the allocation tags. + */ + public Set getTags() { + return this.tags; + } + + @VisibleForTesting + public static AllocationTags createSingleAppAllocationTags( + ApplicationId appId, Set tags) { + AllocationTagNamespace namespace = new AllocationTagNamespace.AppID(appId); + return new AllocationTags(namespace, tags); + } + + @VisibleForTesting + public static AllocationTags createGlobalAllocationTags(Set tags) { + AllocationTagNamespace namespace = new AllocationTagNamespace.All(); + return new AllocationTags(namespace, tags); + } + + @VisibleForTesting + public static AllocationTags createOtherAppAllocationTags( + ApplicationId currentApp, Set allIds, Set tags) + throws InvalidAllocationTagsQueryException { + AllocationTagNamespace namespace = new AllocationTagNamespace.NotSelf(); + TargetApplications ta = new TargetApplications(currentApp, allIds); + namespace.evaluate(ta); + return new AllocationTags(namespace, tags); + } + + public static AllocationTags newAllocationTags( + AllocationTagNamespace namespace, Set tags) { + return new AllocationTags(namespace, tags); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index fb2619afcfa4b..830566a625a4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.log4j.Logger; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -75,6 +78,12 @@ public static class TypeToCountedTags { // Map> private Map> typeToTagsWithCount = new HashMap<>(); + public TypeToCountedTags() {} + + private TypeToCountedTags(Map> tags) { + this.typeToTagsWithCount = tags; + } + // protected by external locks private void addTags(T type, Set tags) { Map innerMap = @@ -206,6 +215,52 @@ private boolean isEmpty() { public Map> getTypeToTagsWithCount() { return typeToTagsWithCount; } + + /** + * Absorbs the given {@link TypeToCountedTags} to current mapping, + * this will aggregate the count of the tags with same name. + * + * @param target a {@link TypeToCountedTags} to merge with. + */ + protected void absorb(final TypeToCountedTags target) { + // No opt if the given target is null. + if (target == null || target.getTypeToTagsWithCount() == null) { + return; + } + + // Merge the target. + Map> targetMap = target.getTypeToTagsWithCount(); + for (Map.Entry> targetEntry : + targetMap.entrySet()) { + // Get a mutable copy, do not modify the target reference. + Map copy = Maps.newHashMap(targetEntry.getValue()); + + // If the target type doesn't exist in the current mapping, + // add as a new entry. + Map existingMapping = + this.typeToTagsWithCount.putIfAbsent(targetEntry.getKey(), copy); + // There was a mapping for this target type, + // do proper merging on the operator. + if (existingMapping != null) { + Map localMap = + this.typeToTagsWithCount.get(targetEntry.getKey()); + // Merge the target map to the inner map. + Map targetValue = targetEntry.getValue(); + for (Map.Entry entry : targetValue.entrySet()) { + localMap.merge(entry.getKey(), entry.getValue(), + (a, b) -> Long.sum(a, b)); + } + } + } + } + + /** + * @return an immutable copy of current instance. + */ + protected TypeToCountedTags immutableCopy() { + return new TypeToCountedTags( + Collections.unmodifiableMap(this.typeToTagsWithCount)); + } } @VisibleForTesting @@ -235,6 +290,34 @@ public AllocationTagsManager(RMContext context) { rmContext = context; } + /** + * Aggregates multiple {@link TypeToCountedTags} to a single one based on + * a given set of application IDs, the values are properly merged. + * + * @param appIds a set of application IDs. + * @return an aggregated {@link TypeToCountedTags}. + */ + private TypeToCountedTags aggregateAllocationTags(Set appIds, + Map mapping) { + TypeToCountedTags result = new TypeToCountedTags(); + if (appIds != null) { + if (appIds.size() == 1) { + // If there is only one app, we simply return the mapping + // without any extra computation. + return mapping.get(appIds.iterator().next()); + } + + for (ApplicationId applicationId : appIds) { + TypeToCountedTags appIdTags = mapping.get(applicationId); + if (appIdTags != null) { + // Make sure ATM state won't be changed. + result.absorb(appIdTags.immutableCopy()); + } + } + } + return result; + } + /** * Notify container allocated on a node. * @@ -458,9 +541,8 @@ public boolean allocationTagExistsOnNode(NodeId nodeId, * to implement customized logic. * * @param nodeId nodeId, required. - * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all applications. - * @param tags allocation tags, see + * @param tags {@link AllocationTags}, allocation tags under a + * specific namespace. See * {@link SchedulingRequest#getAllocationTags()}, * When multiple tags specified. Returns cardinality * depends on op. If a specified tag doesn't exist, 0 @@ -474,29 +556,28 @@ public boolean allocationTagExistsOnNode(NodeId nodeId, * @throws InvalidAllocationTagsQueryException when illegal query * parameter specified */ - public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId, - Set tags, LongBinaryOperator op) - throws InvalidAllocationTagsQueryException { + public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags, + LongBinaryOperator op) throws InvalidAllocationTagsQueryException { readLock.lock(); - try { - if (nodeId == null || op == null) { + if (nodeId == null || op == null || tags == null) { throw new InvalidAllocationTagsQueryException( "Must specify nodeId/tags/op to query cardinality"); } TypeToCountedTags mapping; - if (applicationId != null) { - mapping = perAppNodeMappings.get(applicationId); - } else { + if (AllocationTagNamespaceType.ALL.equals( + tags.getNamespace().getNamespaceType())) { mapping = globalNodeMapping; + } else { + // Aggregate app tags cardinality by applications. + mapping = aggregateAllocationTags( + tags.getNamespace().getNamespaceScope(), + perAppNodeMappings); } - if (mapping == null) { - return 0; - } - - return mapping.getCardinality(nodeId, tags, op); + return mapping == null ? 0 : + mapping.getCardinality(nodeId, tags.getTags(), op); } finally { readLock.unlock(); } @@ -507,9 +588,8 @@ public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId, * to implement customized logic. * * @param rack rack, required. - * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all applications. - * @param tags allocation tags, see + * @param tags {@link AllocationTags}, allocation tags under a + * specific namespace. See * {@link SchedulingRequest#getAllocationTags()}, * When multiple tags specified. Returns cardinality * depends on op. If a specified tag doesn't exist, 0 @@ -523,30 +603,28 @@ public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId, * @throws InvalidAllocationTagsQueryException when illegal query * parameter specified */ - @SuppressWarnings("unchecked") - public long getRackCardinalityByOp(String rack, ApplicationId applicationId, - Set tags, LongBinaryOperator op) - throws InvalidAllocationTagsQueryException { + public long getRackCardinalityByOp(String rack, AllocationTags tags, + LongBinaryOperator op) throws InvalidAllocationTagsQueryException { readLock.lock(); - try { - if (rack == null || op == null) { + if (rack == null || op == null || tags == null) { throw new InvalidAllocationTagsQueryException( - "Must specify rack/tags/op to query cardinality"); + "Must specify nodeId/tags/op to query cardinality"); } TypeToCountedTags mapping; - if (applicationId != null) { - mapping = perAppRackMappings.get(applicationId); - } else { + if (AllocationTagNamespaceType.ALL.equals( + tags.getNamespace().getNamespaceType())) { mapping = globalRackMapping; + } else { + // Aggregates cardinality by rack. + mapping = aggregateAllocationTags( + tags.getNamespace().getNamespaceScope(), + perAppRackMappings); } - if (mapping == null) { - return 0; - } - - return mapping.getCardinality(rack, tags, op); + return mapping == null ? 0 : + mapping.getCardinality(rack, tags.getTags(), op); } finally { readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java similarity index 94% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java index 7a74002ea5d70..6a7e54e470814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.api.records; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import org.apache.hadoop.yarn.exceptions.YarnException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 2d0e95a9b9f51..389fc5cc908c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -24,11 +24,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.hadoop.yarn.api.records.TargetApplications; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; @@ -38,7 +36,6 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; @@ -70,43 +67,25 @@ private PlacementConstraintsUtil() { */ private static AllocationTagNamespace getAllocationTagNamespace( ApplicationId currentAppId, String targetKey, AllocationTagsManager atm) - throws InvalidAllocationTagException{ + throws InvalidAllocationTagsQueryException { // Parse to a valid namespace. AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey); - // TODO remove such check once we support all forms of namespaces - if (!namespace.isIntraApp() && !namespace.isSingleInterApp()) { - throw new InvalidAllocationTagException( - "Only support " + AllocationTagNamespaceType.SELF.toString() - + " and "+ AllocationTagNamespaceType.APP_ID + " now," - + namespace.toString() + " is not supported yet!"); + // TODO Complete remove this check once we support app-label. + if (AllocationTagNamespaceType.APP_LABEL + .equals(namespace.getNamespaceType())) { + throw new InvalidAllocationTagsQueryException( + namespace.toString() + " is not supported yet!"); } // Evaluate the namespace according to the given target // before it can be consumed. - TargetApplications ta = new TargetApplications(currentAppId, - atm.getAllApplicationIds()); + TargetApplications ta = + new TargetApplications(currentAppId, atm.getAllApplicationIds()); namespace.evaluate(ta); return namespace; } - // We return a single app Id now, because at present, - // only self and app-id namespace is supported. But moving on, - // this will return a set of application IDs. - // TODO support other forms of namespaces - private static ApplicationId getNamespaceScope( - AllocationTagNamespace namespace) - throws InvalidAllocationTagException { - if (namespace.getNamespaceScope() == null - || namespace.getNamespaceScope().size() != 1) { - throw new InvalidAllocationTagException( - "Invalid allocation tag namespace " + namespace.toString() - + ", expecting it is not null and only 1 application" - + " ID in the scope."); - } - return namespace.getNamespaceScope().iterator().next(); - } - /** * Returns true if single placement constraint with associated * allocationTags and scope is satisfied by a specific scheduler Node. @@ -128,14 +107,10 @@ private static boolean canSatisfySingleConstraintExpression( // Parse the allocation tag's namespace from the given target key, // then evaluate the namespace and get its scope, // which is represented by one or more application IDs. - ApplicationId effectiveAppID; - try { - AllocationTagNamespace namespace = getAllocationTagNamespace( + AllocationTagNamespace namespace = getAllocationTagNamespace( targetApplicationId, te.getTargetKey(), tm); - effectiveAppID = getNamespaceScope(namespace); - } catch (InvalidAllocationTagException e) { - throw new InvalidAllocationTagsQueryException(e); - } + AllocationTags allocationTags = AllocationTags + .newAllocationTags(namespace, te.getTargetValues()); long minScopeCardinality = 0; long maxScopeCardinality = 0; @@ -149,20 +124,20 @@ private static boolean canSatisfySingleConstraintExpression( if (sc.getScope().equals(PlacementConstraints.NODE)) { if (checkMinCardinality) { minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), - effectiveAppID, te.getTargetValues(), Long::max); + allocationTags, Long::max); } if (checkMaxCardinality) { maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), - effectiveAppID, te.getTargetValues(), Long::min); + allocationTags, Long::min); } } else if (sc.getScope().equals(PlacementConstraints.RACK)) { if (checkMinCardinality) { minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), - effectiveAppID, te.getTargetValues(), Long::max); + allocationTags, Long::max); } if (checkMaxCardinality) { maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), - effectiveAppID, te.getTargetValues(), Long::min); + allocationTags, Long::min); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java similarity index 92% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java index de0ea268b1039..0de7c9ec6b100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java @@ -16,7 +16,9 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.api.records; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import org.apache.hadoop.yarn.api.records.ApplicationId; import java.util.Set; import java.util.stream.Collectors; @@ -37,10 +39,6 @@ public TargetApplications(ApplicationId currentApplicationId, this.allAppIds = allApplicationIds; } - public Set getAllApplicationIds() { - return this.allAppIds; - } - public ApplicationId getCurrentApplicationId() { return this.currentAppId; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java index 9472719ae6d82..1fce466d3ac83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -139,29 +140,27 @@ public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId, } @Override - public long getRackCardinality(String rack, ApplicationId applicationId, - String tag) throws InvalidAllocationTagsQueryException { - return tagsManager.getRackCardinality(rack, applicationId, tag); + public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags, + LongBinaryOperator op) throws InvalidAllocationTagsQueryException { + return tagsManager.getNodeCardinalityByOp(nodeId, tags, op); } @Override - public boolean allocationTagExistsOnNode(NodeId nodeId, - ApplicationId applicationId, String tag) - throws InvalidAllocationTagsQueryException { - return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag); + public long getRackCardinality(String rack, ApplicationId applicationId, + String tag) throws InvalidAllocationTagsQueryException { + return tagsManager.getRackCardinality(rack, applicationId, tag); } @Override - public long getNodeCardinalityByOp(NodeId nodeId, - ApplicationId applicationId, Set tags, LongBinaryOperator op) - throws InvalidAllocationTagsQueryException { - return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op); + public long getRackCardinalityByOp(String rack, AllocationTags tags, + LongBinaryOperator op) throws InvalidAllocationTagsQueryException { + return tagsManager.getRackCardinalityByOp(rack, tags, op); } @Override - public long getRackCardinalityByOp(String rack, ApplicationId applicationId, - Set tags, LongBinaryOperator op) + public boolean allocationTagExistsOnNode(NodeId nodeId, + ApplicationId applicationId, String tag) throws InvalidAllocationTagsQueryException { - return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op); + return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 7e5506efddc04..9004110837e74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace; import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -339,18 +338,18 @@ private void validateAndSetSchedulingRequest(SchedulingRequest try { AllocationTagNamespace tagNS = AllocationTagNamespace.parse(targetExpression.getTargetKey()); - if (!AllocationTagNamespaceType.SELF + if (AllocationTagNamespaceType.APP_LABEL .equals(tagNS.getNamespaceType())) { throwExceptionWithMetaInfo( - "As of now, the only accepted target key for targetKey of " - + "allocation_tag target expression is: [" - + AllocationTagNamespaceType.SELF.toString() - + "]. Please make changes to placement constraints " - + "accordingly. If this is null, it will be set to " + "As of now, allocation tag namespace [" + + AllocationTagNamespaceType.APP_LABEL.toString() + + "] is not supported. Please make changes to placement " + + "constraints accordingly. If this is null, it will be " + + "set to " + AllocationTagNamespaceType.SELF.toString() + " by default."); } - } catch (InvalidAllocationTagException e) { + } catch (InvalidAllocationTagsQueryException e) { throwExceptionWithMetaInfo( "Invalid allocation tag namespace, message: " + e.getMessage()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 2ed201ce91875..5eb667e144bac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -305,6 +305,14 @@ public AllocateResponse allocate(List resourceRequest, public AllocateResponse allocateIntraAppAntiAffinity( ResourceSizing resourceSizing, Priority priority, long allocationId, Set allocationTags, String... targetTags) throws Exception { + return allocateAppAntiAffinity(resourceSizing, priority, allocationId, + null, allocationTags, targetTags); + } + + public AllocateResponse allocateAppAntiAffinity( + ResourceSizing resourceSizing, Priority priority, long allocationId, + String namespace, Set allocationTags, String... targetTags) + throws Exception { return this.allocate(null, Arrays.asList(SchedulingRequest.newBuilder().executionType( ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) @@ -313,7 +321,8 @@ public AllocateResponse allocateIntraAppAntiAffinity( PlacementConstraints .targetNotIn(PlacementConstraints.NODE, PlacementConstraints.PlacementTargets - .allocationTagToIntraApp(targetTags)).build()) + .allocationTagWithNamespace(namespace, targetTags)) + .build()) .resourceSizing(resourceSizing).build()), null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 27c5fbdb1f3ce..7a930cd361159 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -428,20 +430,27 @@ public void testContainerTransitionNotifyAllocationTagsManager() rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), null), + Long::max)); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); Assert.assertEquals(1, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus .newInstance(containerId, ContainerState.COMPLETE, "", 0), RMContainerEventType.KILL)); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); /* Second container: ACQUIRED -> FINISHED */ rmContainer = new RMContainerImpl(container, @@ -449,14 +458,18 @@ public void testContainerTransitionNotifyAllocationTagsManager() nodeId, "user", rmContext); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.setAllocationTags(ImmutableSet.of("mapper")); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); Assert.assertEquals(1, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.handle( new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); @@ -466,7 +479,9 @@ public void testContainerTransitionNotifyAllocationTagsManager() RMContainerEventType.FINISHED)); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); /* Third container: RUNNING -> FINISHED */ rmContainer = new RMContainerImpl(container, @@ -475,13 +490,17 @@ public void testContainerTransitionNotifyAllocationTagsManager() rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); Assert.assertEquals(1, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.handle( new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); @@ -494,7 +513,9 @@ public void testContainerTransitionNotifyAllocationTagsManager() RMContainerEventType.FINISHED)); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); /* Fourth container: NEW -> RECOVERED */ rmContainer = new RMContainerImpl(container, @@ -503,7 +524,9 @@ public void testContainerTransitionNotifyAllocationTagsManager() rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); NMContainerStatus containerStatus = NMContainerStatus .newInstance(containerId, 0, ContainerState.NEW, @@ -514,6 +537,8 @@ public void testContainerTransitionNotifyAllocationTagsManager() .handle(new RMContainerRecoverEvent(containerId, containerStatus)); Assert.assertEquals(1, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java index 27d86611e3171..d7124bb7a9c6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceSizing; @@ -224,6 +225,131 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + /** + * This UT covers some basic end-to-end inter-app anti-affinity + * constraint tests. For comprehensive tests over different namespace + * types, see more in TestPlacementConstraintsUtil. + * @throws Exception + */ + @Test + public void testInterAppAntiAffinity() throws Exception { + Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( + new Configuration()); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + // 4 NMs. + MockNM[] nms = new MockNM[4]; + RMNode[] rmNodes = new RMNode[4]; + for (int i = 0; i < 4; i++) { + nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId()); + } + + // app1 -> c + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]); + + // app1 asks for 3 anti-affinity containers for the same app. It should + // only get 3 containers allocated to 3 different nodes.. + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)), + Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + System.out.println("Mappers on HOST0: " + + rmNodes[0].getAllocationTagsWithCount().get("mapper")); + System.out.println("Mappers on HOST1: " + + rmNodes[1].getAllocationTagsWithCount().get("mapper")); + System.out.println("Mappers on HOST2: " + + rmNodes[2].getAllocationTagsWithCount().get("mapper")); + + // App1 should get 4 containers allocated (1 AM + 3 mappers). + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(4, schedulerApp.getLiveContainers().size()); + + // app2 -> c + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]); + + // App2 asks for 3 containers that anti-affinity with any mapper, + // since 3 out of 4 nodes already have mapper containers, all 3 + // containers will be allocated on the other node. + AllocationTagNamespace.All allNs = new AllocationTagNamespace.All(); + am2.allocateAppAntiAffinity( + ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)), + Priority.newInstance(1), 1L, allNs.toString(), + ImmutableSet.of("foo"), "mapper"); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App2 should get 4 containers allocated (1 AM + 3 container). + Assert.assertEquals(4, schedulerApp2.getLiveContainers().size()); + + // The allocated node should not have mapper tag. + Assert.assertTrue(schedulerApp2.getLiveContainers() + .stream().allMatch(rmContainer -> { + // except the nm host + if (!rmContainer.getContainer().getNodeId().equals(rmNodes[0])) { + return !rmContainer.getAllocationTags().contains("mapper"); + } + return true; + })); + + // app3 -> c + RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nms[0]); + + // App3 asks for 3 containers that anti-affinity with any mapper. + // Unlike the former case, since app3 source tags are also mapper, + // it will anti-affinity with itself too. So there will be only 1 + // container be allocated. + am3.allocateAppAntiAffinity( + ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)), + Priority.newInstance(1), 1L, allNs.toString(), + ImmutableSet.of("mapper"), "mapper"); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt( + am3.getApplicationAttemptId()); + + // App3 should get 2 containers allocated (1 AM + 1 container). + Assert.assertEquals(2, schedulerApp3.getLiveContainers().size()); + + rm1.close(); + } + @Test public void testSchedulingRequestDisabledByDefault() throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java index 76f451e919be0..cbf59682648ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -96,7 +97,9 @@ public void testAllocationTagsManagerSimpleCases() // Get Node Cardinality of app1 on node1, with tag "mapper" Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper")), Long::max)); // Get Rack Cardinality of app1 on rack0, with tag "mapper" @@ -106,20 +109,26 @@ public void testAllocationTagsManagerSimpleCases() // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of("mapper", "reducer"), Long::min)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer")), + Long::min)); // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max Assert.assertEquals(2, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of("mapper", "reducer"), Long::max)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer")), + Long::max)); // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum Assert.assertEquals(3, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of("mapper", "reducer"), Long::sum)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer")), + Long::sum)); // Get Node Cardinality by passing single tag. Assert.assertEquals(1, @@ -134,38 +143,52 @@ public void testAllocationTagsManagerSimpleCases() // op=min Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of("no_existed", "reducer"), Long::min)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("no_existed", "reducer")), + Long::min)); // Get Node Cardinality of app1 on node2, with tag "", op=max // (Expect this returns #containers from app1 on node2) Assert.assertEquals(2, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), null, Long::max)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), null), + Long::max)); // Get Node Cardinality of app1 on node2, with empty tag set, op=max Assert.assertEquals(2, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), null, Long::max)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), null), + Long::max)); // Get Cardinality of app1 on node2, with empty tag set, op=max Assert.assertEquals(2, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), ImmutableSet.of()), + Long::max)); // Get Node Cardinality of all apps on node2, with empty tag set, op=sum Assert.assertEquals(4, atm.getNodeCardinalityByOp( - NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum)); + NodeId.fromString("host2:123"), + AllocationTags.createGlobalAllocationTags(ImmutableSet.of()), + Long::sum)); // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum Assert.assertEquals(3, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), ImmutableSet.of()), + Long::sum)); // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(2), ImmutableSet.of()), + Long::sum)); // Finish all containers: atm.removeContainer(NodeId.fromString("host1:123"), @@ -189,33 +212,42 @@ public void testAllocationTagsManagerSimpleCases() // Get Cardinality of app1 on node1, with tag "mapper" Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper")), Long::max)); // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of("mapper", "reducer"), Long::min)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer")), + Long::min)); // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of("mapper", "reducer"), Long::max)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer")), + Long::max)); // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of("mapper", "reducer"), Long::sum)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer")), + Long::sum)); // Get Node Cardinality of app1 on node2, with tag "", op=max // (Expect this returns #containers from app1 on node2) Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of(TestUtils.getMockApplicationId(1).toString())), Long::max)); Assert.assertEquals(0, @@ -226,21 +258,32 @@ public void testAllocationTagsManagerSimpleCases() // Get Node Cardinality of app1 on node2, with empty tag set, op=max Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of()), + Long::max)); // Get Node Cardinality of all apps on node2, with empty tag set, op=sum Assert.assertEquals(0, atm.getNodeCardinalityByOp( - NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum)); + NodeId.fromString("host2:123"), + AllocationTags.createGlobalAllocationTags(ImmutableSet.of()), + Long::sum)); // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of()), + Long::sum)); // Get Node Cardinality of app_2 on node2, with empty tag set, op=sum Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of()), + Long::sum)); } @@ -296,20 +339,26 @@ public void testAllocationTagsManagerRackMapping() // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0", - TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of()), + Long::max)); // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0", - TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::min)); + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of()), + Long::min)); // Get Rack Cardinality of all apps on rack0, with empty tag set, op=min - Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0", null, - ImmutableSet.of(), Long::max)); + Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0", + AllocationTags.createGlobalAllocationTags(ImmutableSet.of()), + Long::max)); } @Test - public void testAllocationTagsManagerMemoryAfterCleanup() - throws InvalidAllocationTagsQueryException { + public void testAllocationTagsManagerMemoryAfterCleanup() { /** * Make sure YARN cleans up all memory once container/app finishes. */ @@ -362,8 +411,7 @@ public void testAllocationTagsManagerMemoryAfterCleanup() } @Test - public void testQueryCardinalityWithIllegalParameters() - throws InvalidAllocationTagsQueryException { + public void testQueryCardinalityWithIllegalParameters() { /** * Make sure YARN cleans up all memory once container/app finishes. */ @@ -391,9 +439,12 @@ public void testQueryCardinalityWithIllegalParameters() // No node-id boolean caughtException = false; try { - atm.getNodeCardinalityByOp(null, TestUtils.getMockApplicationId(2), - ImmutableSet.of("mapper"), Long::min); - } catch (InvalidAllocationTagsQueryException e) { + atm.getNodeCardinalityByOp(null, + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(2), + ImmutableSet.of("mapper")), + Long::min); + } catch (InvalidAllocationTagsQueryException e1) { caughtException = true; } Assert.assertTrue("should fail because of nodeId specified", @@ -403,11 +454,150 @@ public void testQueryCardinalityWithIllegalParameters() caughtException = false; try { atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null); - } catch (InvalidAllocationTagsQueryException e) { + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(2), + ImmutableSet.of("mapper")), + null); + } catch (InvalidAllocationTagsQueryException e1) { caughtException = true; } Assert.assertTrue("should fail because of nodeId specified", caughtException); } + + @Test + public void testNodeAllocationTagsAggregation() + throws InvalidAllocationTagsQueryException { + + AllocationTagsManager atm = new AllocationTagsManager(rmContext); + ApplicationId app1 = TestUtils.getMockApplicationId(1); + ApplicationId app2 = TestUtils.getMockApplicationId(2); + ApplicationId app3 = TestUtils.getMockApplicationId(3); + NodeId host1 = NodeId.fromString("host1:123"); + NodeId host2 = NodeId.fromString("host2:123"); + NodeId host3 = NodeId.fromString("host3:123"); + + /** + * Node1 (rack0) + * app1/A(2) + * app1/B(1) + * app2/A(3) + * app3/A(1) + * + * Node2 (rack0) + * app2/A(1) + * app2/B(2) + * app1/C(1) + * app3/B(1) + * + * Node3 (rack1): + * app2/D(1) + * app3/D(1) + */ + atm.addContainer(host1, TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("A", "B")); + atm.addContainer(host1, TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("A")); + atm.addContainer(host1, TestUtils.getMockContainerId(2, 1), + ImmutableSet.of("A")); + atm.addContainer(host1, TestUtils.getMockContainerId(2, 2), + ImmutableSet.of("A")); + atm.addContainer(host1, TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("A")); + atm.addContainer(host1, TestUtils.getMockContainerId(3, 1), + ImmutableSet.of("A")); + + atm.addContainer(host2, TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("C")); + atm.addContainer(host2, TestUtils.getMockContainerId(2, 4), + ImmutableSet.of("A")); + atm.addContainer(host2, TestUtils.getMockContainerId(2, 5), + ImmutableSet.of("B")); + atm.addContainer(host2, TestUtils.getMockContainerId(2, 6), + ImmutableSet.of("B")); + atm.addContainer(host2, TestUtils.getMockContainerId(3, 2), + ImmutableSet.of("B")); + + atm.addContainer(host3, TestUtils.getMockContainerId(2, 7), + ImmutableSet.of("D")); + atm.addContainer(host3, TestUtils.getMockContainerId(3, 3), + ImmutableSet.of("D")); + + // Target applications, current app: app1 + // all apps: app1, app2, app3 + TargetApplications ta = new TargetApplications(app1, + ImmutableSet.of(app1, app2, app3)); + + //******************************** + // 1) self (app1) + //******************************** + AllocationTags tags = AllocationTags + .createSingleAppAllocationTags(app1, ImmutableSet.of("A", "C")); + Assert.assertEquals(2, atm.getNodeCardinalityByOp(host1, tags, Long::max)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min)); + Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::max)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host2, tags, Long::min)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::max)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::min)); + + //******************************** + // 2) not-self (app2, app3) + //******************************** + /** + * Verify max/min cardinality of tag "A" on host1 from all applications + * other than app1. This returns the max/min cardinality of tag "A" of + * app2 or app3 on this node. + * + * Node1 (rack0) + * app1/A(1) + * app1/B(1) + * app2/A(3) + * app3/A(1) + * + * app2_app3/A(4) + * app2_app3/B(0) + * + * expecting to return max=3, min=1 + * + */ + tags = AllocationTags.createOtherAppAllocationTags(app1, + ImmutableSet.of(app1, app2, app3), ImmutableSet.of("A", "B")); + + Assert.assertEquals(4, atm.getNodeCardinalityByOp(host1, tags, Long::max)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min)); + Assert.assertEquals(4, atm.getNodeCardinalityByOp(host1, tags, Long::sum)); + + //******************************** + // 3) app-id/app2 (app2) + //******************************** + tags = AllocationTags + .createSingleAppAllocationTags(app2, ImmutableSet.of("A", "B")); + Assert.assertEquals(3, atm.getNodeCardinalityByOp(host1, tags, Long::max)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min)); + Assert.assertEquals(2, atm.getNodeCardinalityByOp(host2, tags, Long::max)); + Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::min)); + Assert.assertEquals(3, atm.getNodeCardinalityByOp(host2, tags, Long::sum)); + + + //******************************** + // 4) all (app1, app2, app3) + //******************************** + tags = AllocationTags + .createGlobalAllocationTags(ImmutableSet.of("A")); + Assert.assertEquals(6, atm.getNodeCardinalityByOp(host1, tags, Long::sum)); + Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::sum)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::sum)); + + tags = AllocationTags + .createGlobalAllocationTags(ImmutableSet.of("A", "B")); + Assert.assertEquals(7, atm.getNodeCardinalityByOp(host1, tags, Long::sum)); + Assert.assertEquals(4, atm.getNodeCardinalityByOp(host2, tags, Long::sum)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::sum)); + Assert.assertEquals(6, atm.getNodeCardinalityByOp(host1, tags, Long::max)); + Assert.assertEquals(3, atm.getNodeCardinalityByOp(host2, tags, Long::max)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::max)); + Assert.assertEquals(1, atm.getNodeCardinalityByOp(host1, tags, Long::min)); + Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::min)); + Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::min)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java index 67a3901e64401..d1ef331b19512 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java @@ -16,10 +16,8 @@ * limitations under the License. */ import com.google.common.collect.ImmutableSet; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.TargetApplications; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.junit.Assert; import org.junit.Test; @@ -29,29 +27,34 @@ public class TestAllocationTagsNamespace { @Test - public void testNamespaceParse() throws InvalidAllocationTagException { + public void testNamespaceParse() throws InvalidAllocationTagsQueryException { AllocationTagNamespace namespace; String namespaceStr = "self"; namespace = AllocationTagNamespace.parse(namespaceStr); - Assert.assertTrue(namespace.isIntraApp()); + Assert.assertEquals(AllocationTagNamespaceType.SELF, + namespace.getNamespaceType()); namespaceStr = "not-self"; namespace = AllocationTagNamespace.parse(namespaceStr); - Assert.assertTrue(namespace.isNotSelf()); + Assert.assertEquals(AllocationTagNamespaceType.NOT_SELF, + namespace.getNamespaceType()); namespaceStr = "all"; namespace = AllocationTagNamespace.parse(namespaceStr); - Assert.assertTrue(namespace.isGlobal()); + Assert.assertEquals(AllocationTagNamespaceType.ALL, + namespace.getNamespaceType()); namespaceStr = "app-label"; namespace = AllocationTagNamespace.parse(namespaceStr); - Assert.assertTrue(namespace.isAppLabel()); + Assert.assertEquals(AllocationTagNamespaceType.APP_LABEL, + namespace.getNamespaceType()); ApplicationId applicationId = ApplicationId.newInstance(12345, 1); namespaceStr = "app-id/" + applicationId.toString(); namespace = AllocationTagNamespace.parse(namespaceStr); - Assert.assertTrue(namespace.isSingleInterApp()); + Assert.assertEquals(AllocationTagNamespaceType.APP_ID, + namespace.getNamespaceType()); // Invalid app-id namespace syntax, invalid app ID. try { @@ -59,7 +62,7 @@ public void testNamespaceParse() throws InvalidAllocationTagException { AllocationTagNamespace.parse(namespaceStr); Assert.fail("Parsing should fail as the given app ID is invalid"); } catch (Exception e) { - Assert.assertTrue(e instanceof InvalidAllocationTagException); + Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); Assert.assertTrue(e.getMessage().startsWith( "Invalid application ID for app-id")); } @@ -71,7 +74,7 @@ public void testNamespaceParse() throws InvalidAllocationTagException { Assert.fail("Parsing should fail as the given namespace" + " is missing application ID"); } catch (Exception e) { - Assert.assertTrue(e instanceof InvalidAllocationTagException); + Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); Assert.assertTrue(e.getMessage().startsWith( "Missing the application ID in the namespace string")); } @@ -82,14 +85,15 @@ public void testNamespaceParse() throws InvalidAllocationTagException { AllocationTagNamespace.parse(namespaceStr); Assert.fail("Parsing should fail as the giving type is not supported."); } catch (Exception e) { - Assert.assertTrue(e instanceof InvalidAllocationTagException); + Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); Assert.assertTrue(e.getMessage().startsWith( "Invalid namespace prefix")); } } @Test - public void testNamespaceEvaluation() throws InvalidAllocationTagException { + public void testNamespaceEvaluation() throws + InvalidAllocationTagsQueryException { AllocationTagNamespace namespace; TargetApplications targetApplications; ApplicationId app1 = ApplicationId.newInstance(10000, 1); @@ -131,10 +135,8 @@ public void testNamespaceEvaluation() throws InvalidAllocationTagException { namespaceStr = "all"; namespace = AllocationTagNamespace.parse(namespaceStr); - targetApplications = new TargetApplications(null, - ImmutableSet.of(app1, app2)); - namespace.evaluate(targetApplications); - Assert.assertEquals(2, namespace.getNamespaceScope().size()); + Assert.assertEquals(AllocationTagNamespaceType.ALL, + namespace.getNamespaceType()); namespaceStr = "app-id/" + app2.toString(); namespace = AllocationTagNamespace.parse(namespaceStr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index 5ba89488af144..4814321932606 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -41,7 +41,6 @@ import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -512,6 +511,252 @@ public void testANDConstraintAssignment() createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); } + @Test + public void testGlobalAppConstraints() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + rmContext.setAllocationTagsManager(tm); + rmContext.setPlacementConstraintManager(pcm); + + long ts = System.currentTimeMillis(); + ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100); + ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101); + ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102); + + // Register App1 with anti-affinity constraint map. + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: app1/A(1), app2/A(1) + * n1: app3/A(3) + * n2: app1/A(2) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application1), ImmutableSet.of("A")); + tm.addContainer(n0r1.getNodeID(), + newContainerId(application2), ImmutableSet.of("A")); + tm.addContainer(n1r1.getNodeID(), + newContainerId(application3), ImmutableSet.of("A")); + tm.addContainer(n1r1.getNodeID(), + newContainerId(application3), ImmutableSet.of("A")); + tm.addContainer(n1r1.getNodeID(), + newContainerId(application3), ImmutableSet.of("A")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("A")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("A")); + + SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + AllocationTagNamespace namespaceAll = + new AllocationTagNamespace.All(); + + //*************************** + // 1) all, anti-affinity + //*************************** + // Anti-affinity with "A" from any application including itself. + PlacementConstraint constraint1 = PlacementConstraints.targetNotIn( + NODE, allocationTagWithNamespace(namespaceAll.toString(), "A")) + .build(); + Map, PlacementConstraint> constraintMap = new HashMap<>(); + Set srcTags1 = ImmutableSet.of("A"); + constraintMap.put(srcTags1, constraint1); + pcm.registerApplication(application1, constraintMap); + + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags1), + schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags1), + schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags1), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags1), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application1); + + //*************************** + // 2) all, max cardinality + //*************************** + PlacementConstraint constraint2 = PlacementConstraints + .maxCardinality(NODE, namespaceAll.toString(), 2, "A") + .build(); + constraintMap.clear(); + Set srcTags2 = ImmutableSet.of("foo"); + constraintMap.put(srcTags2, constraint2); + pcm.registerApplication(application2, constraintMap); + + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application2); + + //*************************** + // 3) all, min cardinality + //*************************** + PlacementConstraint constraint3 = PlacementConstraints + .minCardinality(NODE, namespaceAll.toString(), 3, "A") + .build(); + constraintMap.clear(); + Set srcTags3 = ImmutableSet.of("foo"); + constraintMap.put(srcTags3, constraint3); + pcm.registerApplication(application3, constraintMap); + + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application3, createSchedulingRequest(srcTags3), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application3, createSchedulingRequest(srcTags3), + schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application3, createSchedulingRequest(srcTags3), + schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application3, createSchedulingRequest(srcTags3), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application3); + } + + @Test + public void testNotSelfAppConstraints() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + rmContext.setAllocationTagsManager(tm); + rmContext.setPlacementConstraintManager(pcm); + + long ts = System.currentTimeMillis(); + ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100); + ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101); + ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102); + + // Register App1 with anti-affinity constraint map. + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: app1/A(1), app2/A(1) + * n1: app3/A(3) + * n2: app1/A(2) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application1), ImmutableSet.of("A")); + tm.addContainer(n0r1.getNodeID(), + newContainerId(application2), ImmutableSet.of("A")); + tm.addContainer(n1r1.getNodeID(), + newContainerId(application3), ImmutableSet.of("A")); + tm.addContainer(n1r1.getNodeID(), + newContainerId(application3), ImmutableSet.of("A")); + tm.addContainer(n1r1.getNodeID(), + newContainerId(application3), ImmutableSet.of("A")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("A")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("A")); + + SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + AllocationTagNamespace notSelf = + new AllocationTagNamespace.NotSelf(); + + //*************************** + // 1) not-self, app1 + //*************************** + // Anti-affinity with "A" from app2 and app3, + // n0 and n1 both have tag "A" from either app2 or app3, so they are + // not qualified for the placement. + PlacementConstraint constraint1 = PlacementConstraints.targetNotIn( + NODE, allocationTagWithNamespace(notSelf.toString(), "A")) + .build(); + Map, PlacementConstraint> constraintMap = new HashMap<>(); + Set srcTags1 = ImmutableSet.of("A"); + constraintMap.put(srcTags1, constraint1); + pcm.registerApplication(application1, constraintMap); + + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags1), + schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags1), + schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags1), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags1), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application1); + + //*************************** + // 2) not-self, app1 + //*************************** + // Affinity with "A" from app2 and app3, + // N0 and n1 are qualified for the placement. + PlacementConstraint constraint2 = PlacementConstraints.targetIn( + NODE, allocationTagWithNamespace(notSelf.toString(), "A")) + .build(); + Map, PlacementConstraint> cm2 = new HashMap<>(); + Set srcTags2 = ImmutableSet.of("A"); + cm2.put(srcTags2, constraint2); + pcm.registerApplication(application1, cm2); + + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags2), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags2), + schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags2), + schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application1, createSchedulingRequest(srcTags2), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application1); + } + @Test public void testInterAppConstraintsByAppID() throws InvalidAllocationTagsQueryException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java index 0b9657f15dedf..2ac5c3dc28c40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.junit.Assert; @@ -85,46 +86,62 @@ public void testTempContainerAllocations() // Expect tag mappings to be present including temp Tags Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper")), Long::sum)); Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("service"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("service")), Long::sum)); Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(2), + ImmutableSet.of("service")), Long::sum)); // Do a temp Tag cleanup on app2 ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(2)); Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(2), + ImmutableSet.of("service")), Long::sum)); // Expect app1 to be unaffected Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper")), Long::sum)); // Do a cleanup on app1 as well ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(1)); Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper")), Long::sum)); // Non temp-tags should be unaffected Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("service"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), + ImmutableSet.of("service")), Long::sum)); Assert.assertEquals(0, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(2), + ImmutableSet.of("service")), Long::sum)); // Expect app2 with no containers, and app1 with 2 containers across 2 nodes diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java index 9be56ff0c4a4b..4c6afd4197646 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.NodeId; @@ -366,8 +367,7 @@ public void testFunctionality() throws InvalidAllocationTagsQueryException { allocator.canAllocate(NodeType.NODE_LOCAL, TestUtils.getMockNode("host1", "/rack1", 123, 1024)); verify(spyAllocationTagsManager, Mockito.times(1)).getNodeCardinalityByOp( - eq(NodeId.fromString("host1:123")), eq(TestUtils.getMockApplicationId(1)), - eq(ImmutableSet.of("mapper", "reducer")), + eq(NodeId.fromString("host1:123")), any(AllocationTags.class), any(LongBinaryOperator.class)); allocator = new SingleConstraintAppPlacementAllocator(); @@ -388,9 +388,8 @@ public void testFunctionality() throws InvalidAllocationTagsQueryException { allocator.canAllocate(NodeType.NODE_LOCAL, TestUtils.getMockNode("host1", "/rack1", 123, 1024)); verify(spyAllocationTagsManager, Mockito.atLeast(1)).getNodeCardinalityByOp( - eq(NodeId.fromString("host1:123")), - eq(TestUtils.getMockApplicationId(1)), eq(ImmutableSet - .of("mapper", "reducer")), any(LongBinaryOperator.class)); + eq(NodeId.fromString("host1:123")), any(AllocationTags.class), + any(LongBinaryOperator.class)); SchedulerNode node1 = mock(SchedulerNode.class); when(node1.getPartition()).thenReturn("x");