Skip to content

Commit

Permalink
YARN-7411. Inter-Queue preemption's computeFixpointAllocation need to…
Browse files Browse the repository at this point in the history
… handle absolute resources while computing normalizedGuarantee. (Sunil G via wangda)

Change-Id: I41b1d7558c20fc4eb2050d40134175a2ef6330cb
  • Loading branch information
wangdatan committed Dec 8, 2017
1 parent aa3f627 commit 034b312
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 20 deletions.
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceInformationProto;
Expand Down Expand Up @@ -152,17 +151,6 @@ private void initResources() {
.newInstance(ResourceInformation.VCORES);
this.setMemorySize(p.getMemory());
this.setVirtualCores(p.getVirtualCores());

// Update missing resource information on respective index.
updateResourceInformationMap(types);
}

private void updateResourceInformationMap(ResourceInformation[] types) {
for (int i = 0; i < types.length; i++) {
if (resources[i] == null) {
resources[i] = ResourceInformation.newInstance(types[i]);
}
}
}

private static ResourceInformation newDefaultInformation(
Expand Down
Expand Up @@ -111,6 +111,14 @@ public Resource multiplyAndNormalizeUp(Resource r, double by,
stepFactor.getMemorySize()));
}

@Override
public Resource multiplyAndNormalizeUp(Resource r, double[] by,
Resource stepFactor) {
return Resources.createResource(
roundUp((long) (r.getMemorySize() * by[0] + 0.5),
stepFactor.getMemorySize()));
}

@Override
public Resource multiplyAndNormalizeDown(Resource r, double by,
Resource stepFactor) {
Expand Down
Expand Up @@ -495,6 +495,27 @@ private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
return ret;
}

@Override
public Resource multiplyAndNormalizeUp(Resource r, double[] by,
Resource stepFactor) {
Resource ret = Resource.newInstance(r);
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation rResourceInformation = r.getResourceInformation(i);
ResourceInformation stepFactorResourceInformation = stepFactor
.getResourceInformation(i);

long rValue = rResourceInformation.getValue();
long stepFactorValue = UnitsConversionUtil.convert(
stepFactorResourceInformation.getUnits(),
rResourceInformation.getUnits(),
stepFactorResourceInformation.getValue());
ret.setResourceValue(i, ResourceCalculator
.roundUp((long) Math.ceil(rValue * by[i]), stepFactorValue));
}
return ret;
}

@Override
public Resource multiplyAndNormalizeUp(Resource r, double by,
Resource stepFactor) {
Expand Down
Expand Up @@ -125,7 +125,19 @@ public abstract long computeAvailableContainers(
*/
public abstract Resource multiplyAndNormalizeUp(
Resource r, double by, Resource stepFactor);


/**
* Multiply resource <code>r</code> by factor <code>by</code>
* and normalize up using step-factor <code>stepFactor</code>.
*
* @param r resource to be multiplied
* @param by multiplier array for all resource types
* @param stepFactor factor by which to normalize up
* @return resulting normalized resource
*/
public abstract Resource multiplyAndNormalizeUp(
Resource r, double[] by, Resource stepFactor);

/**
* Multiply resource <code>r</code> by factor <code>by</code>
* and normalize down using step-factor <code>stepFactor</code>.
Expand Down
Expand Up @@ -347,6 +347,11 @@ public static Resource multiplyAndAddTo(
return lhs;
}

public static Resource multiplyAndNormalizeUp(ResourceCalculator calculator,
Resource lhs, double[] by, Resource factor) {
return calculator.multiplyAndNormalizeUp(lhs, by, factor);
}

public static Resource multiplyAndNormalizeUp(
ResourceCalculator calculator,Resource lhs, double by, Resource factor) {
return calculator.multiplyAndNormalizeUp(lhs, by, factor);
Expand Down
Expand Up @@ -19,8 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.ArrayList;
Expand Down Expand Up @@ -198,18 +201,33 @@ protected void computeFixpointAllocation(Resource totGuarant,
private void resetCapacity(Resource clusterResource,
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();

if (ignoreGuar) {
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = 1.0f / queues.size();
for (int i = 0; i < maxLength; i++) {
q.normalizedGuarantee[i] = 1.0f / queues.size();
}
}
} else {
for (TempQueuePerPartition q : queues) {
Resources.addTo(activeCap, q.getGuaranteed());
}
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
q.getGuaranteed(), activeCap);
for (int i = 0; i < maxLength; i++) {
ResourceInformation nResourceInformation = q.getGuaranteed()
.getResourceInformation(i);
ResourceInformation dResourceInformation = activeCap
.getResourceInformation(i);

long nValue = nResourceInformation.getValue();
long dValue = UnitsConversionUtil.convert(
dResourceInformation.getUnits(), nResourceInformation.getUnits(),
dResourceInformation.getValue());
if (dValue != 0) {
q.normalizedGuarantee[i] = (float) nValue / dValue;
}
}
}
}
}
Expand Down
Expand Up @@ -22,9 +22,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
Expand All @@ -46,7 +48,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
Resource untouchableExtra;
Resource preemptableExtra;

double normalizedGuarantee;
double[] normalizedGuarantee;

private Resource effMinRes;
private Resource effMaxRes;
Expand Down Expand Up @@ -88,7 +90,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
pendingDeductReserved = Resources.createResource(0);
}

this.normalizedGuarantee = Float.NaN;
this.normalizedGuarantee = new double[ResourceUtils
.getNumberOfKnownResourceTypes()];
this.children = new ArrayList<>();
this.apps = new ArrayList<>();
this.untouchableExtra = Resource.newInstance(0, 0);
Expand Down Expand Up @@ -240,8 +243,9 @@ public String toString() {
sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
.append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
.append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
.append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ")
.append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted)
.append(Arrays.toString(normalizedGuarantee))
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
.append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
.append(" UNTOUCHABLE: ").append(untouchableExtra)
.append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -533,6 +535,18 @@ private Resource parseResourceFromString(String p) {
} else {
res = Resources.createResource(Integer.valueOf(resource[0]),
Integer.valueOf(resource[1]));
if (resource.length > 2) {
// Using the same order of resources from ResourceUtils, set resource
// informations.
ResourceInformation[] storedResourceInfo = ResourceUtils
.getResourceTypesArray();
for (int i = 2; i < resource.length; i++) {
res.setResourceInformation(storedResourceInfo[i].getName(),
ResourceInformation.newInstance(storedResourceInfo[i].getName(),
storedResourceInfo[i].getUnits(),
Integer.valueOf(resource[i])));
}
}
}
return res;
}
Expand Down
Expand Up @@ -18,11 +18,17 @@

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -613,4 +619,74 @@ public void testNodePartitionPreemptionWithVCoreResource() throws IOException {
verify(mDisp, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}

@Test
public void testNormalizeGuaranteeWithMultipleResource() throws IOException {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
String RESOURCE_1 = "res1";

// Initialize mandatory resources
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));

ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);

/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* / \ / \
* a1 a2 b1 b2
* </pre>
*
* a1 and b2 are using most of resources.
* a2 and b1 needs more resources. Both are under served.
* hence demand will consider both queue's need while trying to
* do preemption.
*/
String labelsConfig =
"=100,true;";
String nodesConfig =
"n1=;"; // n1 is default partition
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100:100:10 100:100:10 100:100:10 100:100:10]);" + //root
"-a(=[50:80:4 100:100:10 80:90:10 30:20:4]);" + // a
"--a1(=[25:30:2 100:50:10 80:90:10 0]);" + // a1
"--a2(=[25:50:2 100:50:10 0 30:20:4]);" + // a2
"-b(=[50:20:6 100:100:10 20:10 40:50:8]);" + // b
"--b1(=[25:5:4 100:20:10 0 20:10:4]);" + // b1
"--b2(=[25:15:2 100:20:10 20:10 20:10:4])"; // b2
String appsConfig=
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a1\t" // app1 in a1
+ "(1,8:9:1,n1,,10,false);" +
"b2\t" // app2 in b2
+ "(1,2:1,n1,,10,false)"; // 80 of y

buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();

verify(mDisp, times(7)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));

riMap.remove(RESOURCE_1);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
}

0 comments on commit 034b312

Please sign in to comment.