Skip to content

Commit

Permalink
YARN-8191. Fair scheduler: queue deletion without RM restart. (Gergo …
Browse files Browse the repository at this point in the history
…Repas via Haibo Chen)
  • Loading branch information
haibchen committed May 25, 2018
1 parent d9852eb commit 86bc642
Show file tree
Hide file tree
Showing 7 changed files with 596 additions and 81 deletions.
Expand Up @@ -87,23 +87,24 @@ public class AllocationFileLoaderService extends AbstractService {
private Path allocFile;
private FileSystem fs;

private Listener reloadListener;
private final Listener reloadListener;

@VisibleForTesting
long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;

private Thread reloadThread;
private volatile boolean running = true;

public AllocationFileLoaderService() {
this(SystemClock.getInstance());
public AllocationFileLoaderService(Listener reloadListener) {
this(reloadListener, SystemClock.getInstance());
}

private List<Permission> defaultPermissions;

public AllocationFileLoaderService(Clock clock) {
public AllocationFileLoaderService(Listener reloadListener, Clock clock) {
super(AllocationFileLoaderService.class.getName());
this.clock = clock;
this.reloadListener = reloadListener;
}

@Override
Expand All @@ -114,6 +115,7 @@ public void serviceInit(Configuration conf) throws Exception {
reloadThread = new Thread(() -> {
while (running) {
try {
reloadListener.onCheck();
long time = clock.getTime();
long lastModified =
fs.getFileStatus(allocFile).getModificationTime();
Expand Down Expand Up @@ -207,10 +209,6 @@ public Path getAllocationFile(Configuration conf)
return allocPath;
}

public synchronized void setReloadListener(Listener reloadListener) {
this.reloadListener = reloadListener;
}

/**
* Updates the allocation list from the allocation config file. This file is
* expected to be in the XML format specified in the design doc.
Expand Down Expand Up @@ -351,5 +349,7 @@ protected List<Permission> getDefaultPermissions() {

public interface Listener {
void onReload(AllocationConfiguration info) throws IOException;

void onCheck();
}
}
Expand Up @@ -21,7 +21,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -34,6 +36,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
Expand All @@ -56,6 +59,8 @@ public class FSLeafQueue extends FSQueue {
// apps that are runnable
private final List<FSAppAttempt> runnableApps = new ArrayList<>();
private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
// assignedApps keeps track of applications that have no appAttempts
private final Set<ApplicationId> assignedApps = new HashSet<>();
// get a lock with fair distribution for app list updates
private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
private final Lock readLock = rwl.readLock();
Expand Down Expand Up @@ -89,6 +94,9 @@ void addApp(FSAppAttempt app, boolean runnable) {
} else {
nonRunnableApps.add(app);
}
// when an appAttempt is created for an application, we'd like to move
// it over from assignedApps to either runnableApps or nonRunnableApps
assignedApps.remove(app.getApplicationId());
incUsedResource(app.getResourceUsage());
} finally {
writeLock.unlock();
Expand Down Expand Up @@ -440,6 +448,15 @@ public int getNumPendingApps() {
return numPendingApps;
}

public int getNumAssignedApps() {
readLock.lock();
try {
return assignedApps.size();
} finally {
readLock.unlock();
}
}

/**
* TODO: Based on how frequently this is called, we might want to club
* counting pending and active apps in the same method.
Expand Down Expand Up @@ -609,4 +626,18 @@ protected void dumpStateInternal(StringBuilder sb) {
", LastTimeAtMinShare: " + lastTimeAtMinShare +
"}");
}

/**
* This method is called when an application is assigned to this queue
* for book-keeping purposes (to be able to determine if the queue is empty).
* @param applicationId the application's id
*/
public void addAssignedApp(ApplicationId applicationId) {
writeLock.lock();
try {
assignedApps.add(applicationId);
} finally {
writeLock.unlock();
}
}
}
Expand Up @@ -83,6 +83,7 @@ public abstract class FSQueue implements Queue, Schedulable {
private long minSharePreemptionTimeout = Long.MAX_VALUE;
private float fairSharePreemptionThreshold = 0.5f;
private boolean preemptable = true;
private boolean isDynamic = true;

public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
Expand Down Expand Up @@ -585,4 +586,12 @@ public String dumpState() {
* @param sb the {code StringBuilder} which holds queue states
*/
protected abstract void dumpStateInternal(StringBuilder sb);

public boolean isDynamic() {
return isDynamic;
}

public void setDynamic(boolean dynamic) {
this.isDynamic = dynamic;
}
}
Expand Up @@ -99,6 +99,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
Expand Down Expand Up @@ -207,7 +208,8 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext(this);
allocsLoader = new AllocationFileLoaderService();
allocsLoader =
new AllocationFileLoaderService(new AllocationReloadListener());
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
Expand Down Expand Up @@ -516,6 +518,7 @@ protected void addApplication(ApplicationId applicationId,
new SchedulerApplication<FSAppAttempt>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
queue.addAssignedApp(applicationId);

LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queue.getName()
Expand Down Expand Up @@ -1435,7 +1438,6 @@ private void initScheduler(Configuration conf) throws IOException {
}

allocsLoader.init(conf);
allocsLoader.setReloadListener(new AllocationReloadListener());
// If we fail to load allocations file on initialize, we want to fail
// immediately. After a successful load, exceptions on future reloads
// will just result in leaving things as they are.
Expand Down Expand Up @@ -1589,6 +1591,7 @@ public void onReload(AllocationConfiguration queueInfo)
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.

Set<String> removedStaticQueues = getRemovedStaticQueues(queueInfo);
writeLock.lock();
try {
if (queueInfo == null) {
Expand All @@ -1599,13 +1602,35 @@ public void onReload(AllocationConfiguration queueInfo)
setQueueAcls(allocConf.getQueueAcls());
allocConf.getDefaultSchedulingPolicy().initialize(getContext());
queueMgr.updateAllocationConfiguration(allocConf);
queueMgr.setQueuesToDynamic(removedStaticQueues);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
}
} finally {
writeLock.unlock();
}
}

private Set<String> getRemovedStaticQueues(
AllocationConfiguration queueInfo) {
if (queueInfo == null || allocConf == null) {
return Collections.emptySet();
}
Set<String> removedStaticQueues = new HashSet<>();
for (Set<String> queues : allocConf.getConfiguredQueues().values()) {
removedStaticQueues.addAll(queues);
}
for (Set<String> queues : queueInfo.getConfiguredQueues().values()) {
removedStaticQueues.removeAll(queues);
}
return removedStaticQueues;
}

@Override
public void onCheck() {
queueMgr.removeEmptyDynamicQueues();
queueMgr.removePendingIncompatibleQueues();
}
}

private void setQueueAcls(
Expand Down

0 comments on commit 86bc642

Please sign in to comment.