Skip to content

Commit

Permalink
YARN-3635. Refactored current queue mapping implementation in Capacit…
Browse files Browse the repository at this point in the history
…yScheduler to use a generic PlacementManager framework. Contributed by Wangda Tan
  • Loading branch information
jian-he committed Sep 15, 2015
1 parent d777757 commit 5468baa
Show file tree
Hide file tree
Showing 13 changed files with 584 additions and 279 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -455,6 +455,9 @@ Release 2.8.0 - UNRELEASED
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
container allocation logic. (Wangda Tan via jianhe) container allocation logic. (Wangda Tan via jianhe)


YARN-3635. Refactored current queue mapping implementation in CapacityScheduler
to use a generic PlacementManager framework. (Wangda Tan via jianhe)

BUG FIXES BUG FIXES


YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
Expand Down Expand Up @@ -99,9 +100,10 @@ public class RMActiveServiceContext {
private long schedulerRecoveryWaitTime = 0; private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true; private boolean printLog = true;
private boolean isSchedulerReady = false; private boolean isSchedulerReady = false;
private PlacementManager queuePlacementManager = null;


public RMActiveServiceContext() { public RMActiveServiceContext() {

queuePlacementManager = new PlacementManager();
} }


@Private @Private
Expand Down Expand Up @@ -424,4 +426,16 @@ public void setSystemClock(Clock clock) {
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() { public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials; return systemCredentials;
} }

@Private
@Unstable
public PlacementManager getQueuePlacementManager() {
return queuePlacementManager;
}

@Private
@Unstable
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.queuePlacementManager = placementMgr;
}
} }
Expand Up @@ -326,6 +326,15 @@ protected void recoverApplication(ApplicationStateData appState,
private RMAppImpl createAndPopulateNewRMApp( private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime, ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery) throws YarnException { String user, boolean isRecovery) throws YarnException {
// Do queue mapping
if (!isRecovery) {
if (rmContext.getQueuePlacementManager() != null) {
// We only do queue mapping when it's a new application
rmContext.getQueuePlacementManager().placeApplication(
submissionContext, user);
}
}

ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq = ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery); validateAndCreateResourceRequest(submissionContext, isRecovery);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
Expand Down Expand Up @@ -124,4 +125,8 @@ void setRMApplicationHistoryWriter(
boolean isSchedulerReadyForAllocatingContainers(); boolean isSchedulerReadyForAllocatingContainers();


Configuration getYarnConfiguration(); Configuration getYarnConfiguration();

PlacementManager getQueuePlacementManager();

void setQueuePlacementManager(PlacementManager placementMgr);
} }
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
Expand Down Expand Up @@ -76,7 +77,6 @@ public class RMContextImpl implements RMContext {
* individual fields. * individual fields.
*/ */
public RMContextImpl() { public RMContextImpl() {

} }


@VisibleForTesting @VisibleForTesting
Expand Down Expand Up @@ -438,4 +438,14 @@ public Configuration getYarnConfiguration() {
public void setYarnConfiguration(Configuration yarnConfiguration) { public void setYarnConfiguration(Configuration yarnConfiguration) {
this.yarnConfiguration=yarnConfiguration; this.yarnConfiguration=yarnConfiguration;
} }

@Override
public PlacementManager getQueuePlacementManager() {
return this.activeServiceContext.getQueuePlacementManager();
}

@Override
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.activeServiceContext.setQueuePlacementManager(placementMgr);
}
} }
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.placement;

import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;

import com.google.common.annotations.VisibleForTesting;

public class PlacementManager {
private static final Log LOG = LogFactory.getLog(PlacementManager.class);

List<PlacementRule> rules;
ReadLock readLock;
WriteLock writeLock;

public PlacementManager() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}

public void updateRules(List<PlacementRule> rules) {
try {
writeLock.lock();
this.rules = rules;
} finally {
writeLock.unlock();
}
}

public void placeApplication(ApplicationSubmissionContext asc, String user)
throws YarnException {
try {
readLock.lock();
if (null == rules || rules.isEmpty()) {
return;
}

String newQueueName = null;
for (PlacementRule rule : rules) {
newQueueName = rule.getQueueForApp(asc, user);
if (newQueueName != null) {
break;
}
}

// Failed to get where to place application
if (null == newQueueName && null == asc.getQueue()) {
String msg = "Failed to get where to place application="
+ asc.getApplicationId();
LOG.error(msg);
throw new YarnException(msg);
}

// Set it to ApplicationSubmissionContext
if (!StringUtils.equals(asc.getQueue(), newQueueName)) {
LOG.info("Placed application=" + asc.getApplicationId() + " to queue="
+ newQueueName + ", original queue=" + asc.getQueue());
asc.setQueue(newQueueName);
}
} finally {
readLock.unlock();
}
}

@VisibleForTesting
public List<PlacementRule> getPlacementRules() {
return rules;
}
}
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.placement;

import java.util.Map;

import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;

public abstract class PlacementRule {
public String getName() {
return this.getClass().getName();
}

public void initialize(Map<String, String> parameters, RMContext rmContext)
throws YarnException {
}

/**
* Get queue for a given application
*
* @param asc application submission context
* @param user userName
*
* @throws YarnException
* if any error happens
*
* @return <p>
* non-null value means it is determined
* </p>
* <p>
* null value means it is undetermined, so next {@link PlacementRule}
* in the {@link PlacementManager} will take care
* </p>
*/
public abstract String getQueueForApp(ApplicationSubmissionContext asc,
String user) throws YarnException;
}

0 comments on commit 5468baa

Please sign in to comment.