Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.usermanagement.AbstractCSUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand All @@ -38,7 +38,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
Expand Down Expand Up @@ -226,31 +225,31 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource,
* Algorithm for calculating idealAssigned is as follows:
* For each partition:
* Q.reassignable = Q.used - Q.selected;
*
*
* # By default set ideal assigned 0 for app.
* app.idealAssigned as 0
* # get user limit from scheduler.
* userLimitRes = Q.getUserLimit(userName)
*
*
* # initial all value to 0
* Map<String, Resource> userToAllocated
*
*
* # Loop from highest priority to lowest priority app to calculate ideal
* for app in sorted-by(priority) {
* if Q.reassignable < 0:
* break;
*
*
* if (user-to-allocated.get(app.user) < userLimitRes) {
* idealAssigned = min((userLimitRes - userToAllocated.get(app.user)),
* idealAssigned = min((userLimitRes - userToAllocated.get(app.user)),
* (app.used + app.pending - app.selected))
* app.idealAssigned = min(Q.reassignable, idealAssigned)
* userToAllocated.get(app.user) += app.idealAssigned;
* } else {
* } else {
* // skip this app because user-limit reached
* }
* Q.reassignable -= app.idealAssigned
* }
*
*
* @param clusterResource Cluster Resource
* @param tq TempQueue
* @param selectedCandidates Already Selected preemption candidates
Expand Down Expand Up @@ -418,10 +417,11 @@ private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
String userName = app.getUser();
TempUserPerPartition tmpUser = usersPerPartition.get(userName);
if (tmpUser == null) {
// User might have already been removed, but preemption still accounts for this app,
// therefore reinserting the user will not cause a memory leak
User user = tq.leafQueue.getOrCreateUser(userName);
ResourceUsage userResourceUsage = user.getResourceUsage();
AbstractCSUser user = tq.leafQueue.getUser(userName);
if (user == null) {
// TODO - Check why https://issues.apache.org/jira/browse/YARN-10996 expects user to be present here
continue;
}

// perUserAMUsed was populated with running apps, now we are looping
// through both running and pending apps.
Expand All @@ -430,9 +430,9 @@ private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
? Resources.none() : userSpecificAmUsed;

tmpUser = new TempUserPerPartition(user, tq.queueName,
Resources.clone(userResourceUsage.getUsed(partition)),
user.getUsedCloned(partition),
Resources.clone(amUsed),
Resources.clone(userResourceUsage.getReserved(partition)),
user.getReservedCloned(partition),
Resources.none());

Resource userLimitResource = Resources.clone(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.usermanagement.AbstractCSUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand Down Expand Up @@ -197,12 +198,15 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
String partition, AbstractLeafQueue leafQueue,
Map<String, Resource> rollingResourceUsagePerUser) {
for (String user : leafQueue.getAllUsers()) {
for (String userName : leafQueue.getAllUsers()) {
// Initialize used resource of a given user for rolling computation.
rollingResourceUsagePerUser.put(user, Resources.clone(
leafQueue.getUser(user).getResourceUsage().getUsed(partition)));
LOG.debug("Rolling resource usage for user:{} is : {}", user,
rollingResourceUsagePerUser.get(user));
AbstractCSUser user = leafQueue.getUser(userName);
// user can be null because users can get removed after calling getAllUsers()
if (user != null) {
rollingResourceUsagePerUser.put(userName, user.getUsedCloned(partition));
LOG.debug("Rolling resource usage for user:{} is : {}", userName,
rollingResourceUsagePerUser.get(userName));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.usermanagement.AbstractCSUser;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

Expand All @@ -30,11 +30,11 @@
*/
public class TempUserPerPartition extends AbstractPreemptionEntity {

private final User user;
private final AbstractCSUser user;
private Resource userLimit;
private boolean donePreemptionQuotaForULDelta = false;

TempUserPerPartition(User user, String queueName, Resource usedPerPartition,
TempUserPerPartition(AbstractCSUser user, String queueName, Resource usedPerPartition,
Resource amUsedPerPartition, Resource reserved,
Resource pendingPerPartition) {
super(queueName, usedPerPartition, amUsedPerPartition, reserved,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,4 @@
* limitations under the License.
*/

@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public class QueueMetrics implements MetricsSource {

protected final MetricsRegistry registry;
protected final String queueName;
private QueueMetrics parent;
private volatile QueueMetrics parent;
private Queue parentQueue;
protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ public void decUsed(String label, Resource res) {
public void setUsed(Resource res) {
setUsed(NL, res);
}


public static ResourceUsage clone(AbstractResourceUsage res) {
ResourceUsage cloned = new ResourceUsage();
cloned.copyAllUsed(res);
return cloned;
}

public void copyAllUsed(AbstractResourceUsage other) {
writeLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public abstract class AbstractCSQueue implements CSQueue {
// used-capacity/abs-used-capacity/capacity/abs-capacity,
// etc.
QueueCapacities queueCapacities;
CSQueueUsageTracker usageTracker;
final CSQueueUsageTracker usageTracker;

public enum CapacityConfigType {
NONE, PERCENTAGE, ABSOLUTE_RESOURCE
Expand Down Expand Up @@ -253,6 +253,10 @@ public String getQueueName() {
return this.queuePath.getLeafName();
}

public Resource getClusterResource() {
return this.queueContext.getClusterResource();
}

@Override
public CSQueue getParent() {
return parent;
Expand Down
Loading