Skip to content

Commit

Permalink
YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkanter committed May 6, 2015
1 parent 0d3188f commit b725078
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 36 deletions.
2 changes: 2 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -180,6 +180,8 @@ Release 2.8.0 - UNRELEASED
YARN-3396. Handle URISyntaxException in ResourceLocalizationService. YARN-3396. Handle URISyntaxException in ResourceLocalizationService.
(Brahma Reddy Battula via junping_du) (Brahma Reddy Battula via junping_du)


YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)

OPTIMIZATIONS OPTIMIZATIONS


YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Expand Up @@ -42,9 +42,12 @@
/** /**
* Manages a list of local storage directories. * Manages a list of local storage directories.
*/ */
class DirectoryCollection { public class DirectoryCollection {
private static final Log LOG = LogFactory.getLog(DirectoryCollection.class); private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);


/**
* The enum defines disk failure type.
*/
public enum DiskErrorCause { public enum DiskErrorCause {
DISK_FULL, OTHER DISK_FULL, OTHER
} }
Expand All @@ -59,6 +62,13 @@ static class DiskErrorInformation {
} }
} }


/**
* The interface provides a callback when localDirs is changed.
*/
public interface DirsChangeListener {
void onDirsChanged();
}

/** /**
* Returns a merged list which contains all the elements of l1 and l2 * Returns a merged list which contains all the elements of l1 and l2
* @param l1 the first list to be included * @param l1 the first list to be included
Expand All @@ -84,6 +94,8 @@ static List<String> concat(List<String> l1, List<String> l2) {


private int goodDirsDiskUtilizationPercentage; private int goodDirsDiskUtilizationPercentage;


private Set<DirsChangeListener> dirsChangeListeners;

/** /**
* Create collection for the directories specified. No check for free space. * Create collection for the directories specified. No check for free space.
* *
Expand Down Expand Up @@ -154,6 +166,20 @@ public DirectoryCollection(String[] dirs,
: utilizationPercentageCutOff); : utilizationPercentageCutOff);
diskUtilizationSpaceCutoff = diskUtilizationSpaceCutoff =
utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff; utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff;

dirsChangeListeners = new HashSet<DirsChangeListener>();
}

synchronized void registerDirsChangeListener(
DirsChangeListener listener) {
if (dirsChangeListeners.add(listener)) {
listener.onDirsChanged();
}
}

synchronized void deregisterDirsChangeListener(
DirsChangeListener listener) {
dirsChangeListeners.remove(listener);
} }


/** /**
Expand Down Expand Up @@ -280,6 +306,11 @@ synchronized boolean checkDirs() {
} }
} }
setGoodDirsDiskUtilizationPercentage(); setGoodDirsDiskUtilizationPercentage();
if (setChanged) {
for (DirsChangeListener listener : dirsChangeListeners) {
listener.onDirsChanged();
}
}
return setChanged; return setChanged;
} }


Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;


/** /**
Expand Down Expand Up @@ -192,6 +193,22 @@ protected void serviceStop() throws Exception {
super.serviceStop(); super.serviceStop();
} }


public void registerLocalDirsChangeListener(DirsChangeListener listener) {
localDirs.registerDirsChangeListener(listener);
}

public void registerLogDirsChangeListener(DirsChangeListener listener) {
logDirs.registerDirsChangeListener(listener);
}

public void deregisterLocalDirsChangeListener(DirsChangeListener listener) {
localDirs.deregisterDirsChangeListener(listener);
}

public void deregisterLogDirsChangeListener(DirsChangeListener listener) {
logDirs.deregisterDirsChangeListener(listener);
}

/** /**
* @return the good/valid local directories based on disks' health * @return the good/valid local directories based on disks' health
*/ */
Expand Down
Expand Up @@ -92,6 +92,7 @@
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
Expand Down Expand Up @@ -161,6 +162,8 @@ public class ResourceLocalizationService extends CompositeService
private LocalResourcesTracker publicRsrc; private LocalResourcesTracker publicRsrc;


private LocalDirsHandlerService dirsHandler; private LocalDirsHandlerService dirsHandler;
private DirsChangeListener localDirsChangeListener;
private DirsChangeListener logDirsChangeListener;
private Context nmContext; private Context nmContext;


/** /**
Expand Down Expand Up @@ -254,6 +257,18 @@ public void serviceInit(Configuration conf) throws Exception {
localizerTracker = createLocalizerTracker(conf); localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker); addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker);
localDirsChangeListener = new DirsChangeListener() {
@Override
public void onDirsChanged() {
checkAndInitializeLocalDirs();
}
};
logDirsChangeListener = new DirsChangeListener() {
@Override
public void onDirsChanged() {
initializeLogDirs(lfs);
}
};
super.serviceInit(conf); super.serviceInit(conf);
} }


Expand Down Expand Up @@ -345,6 +360,8 @@ public void serviceStart() throws Exception {
server.getListenerAddress()); server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort()); LOG.info("Localizer started on port " + server.getPort());
super.serviceStart(); super.serviceStart();
dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener);
dirsHandler.registerLogDirsChangeListener(logDirsChangeListener);
} }


LocalizerTracker createLocalizerTracker(Configuration conf) { LocalizerTracker createLocalizerTracker(Configuration conf) {
Expand Down Expand Up @@ -375,6 +392,8 @@ Server createServer() {


@Override @Override
public void serviceStop() throws Exception { public void serviceStop() throws Exception {
dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener);
dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener);
if (server != null) { if (server != null) {
server.stop(); server.stop();
} }
Expand Down Expand Up @@ -814,11 +833,6 @@ public void addResource(LocalizerResourceRequestEvent request) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
} }


// In case this is not a newly initialized nm state, ensure
// initialized local/log dirs similar to LocalizerRunner
getInitializedLocalDirs();
getInitializedLogDirs();

// explicitly synchronize pending here to avoid future task // explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated // completing and being dequeued before pending updated
synchronized (pending) { synchronized (pending) {
Expand Down Expand Up @@ -1120,8 +1134,6 @@ public void run() {
// 1) write credentials to private dir // 1) write credentials to private dir
writeCredentials(nmPrivateCTokensPath); writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait // 2) exec initApplication and wait
List<String> localDirs = getInitializedLocalDirs();
List<String> logDirs = getInitializedLogDirs();
if (dirsHandler.areDisksHealthy()) { if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(), context.getUser(),
Expand Down Expand Up @@ -1387,13 +1399,12 @@ private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
} }


/** /**
* Synchronized method to get a list of initialized local dirs. Method will * Check each local dir to ensure it has been setup correctly and will
* check each local dir to ensure it has been setup correctly and will attempt * attempt to fix any issues it finds.
* to fix any issues it finds. * @return void
*
* @return list of initialized local dirs
*/ */
synchronized private List<String> getInitializedLocalDirs() { @VisibleForTesting
void checkAndInitializeLocalDirs() {
List<String> dirs = dirsHandler.getLocalDirs(); List<String> dirs = dirsHandler.getLocalDirs();
List<String> checkFailedDirs = new ArrayList<String>(); List<String> checkFailedDirs = new ArrayList<String>();
for (String dir : dirs) { for (String dir : dirs) {
Expand All @@ -1415,7 +1426,6 @@ synchronized private List<String> getInitializedLocalDirs() {
throw new YarnRuntimeException(msg, e); throw new YarnRuntimeException(msg, e);
} }
} }
return dirs;
} }


private boolean checkLocalDir(String localDir) { private boolean checkLocalDir(String localDir) {
Expand Down Expand Up @@ -1463,17 +1473,4 @@ private Map<Path, FsPermission> getLocalDirsPathPermissionsMap(String localDir)
localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission); localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
return localDirPathFsPermissionsMap; return localDirPathFsPermissionsMap;
} }

/**
* Synchronized method to get a list of initialized log dirs. Method will
* check each local dir to ensure it has been setup correctly and will attempt
* to fix any issues it finds.
*
* @return list of initialized log dirs
*/
synchronized private List<String> getInitializedLogDirs() {
List<String> dirs = dirsHandler.getLogDirs();
initializeLogDirs(lfs);
return dirs;
}
} }
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
Expand Down Expand Up @@ -258,4 +259,50 @@ public void testConstructors() {
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta); Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff()); Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
} }

@Test
public void testDirsChangeListener() {
DirsChangeListenerTest listener1 = new DirsChangeListenerTest();
DirsChangeListenerTest listener2 = new DirsChangeListenerTest();
DirsChangeListenerTest listener3 = new DirsChangeListenerTest();

String dirA = new File(testDir, "dirA").getPath();
String[] dirs = { dirA };
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(listener1.num, 0);
Assert.assertEquals(listener2.num, 0);
Assert.assertEquals(listener3.num, 0);
dc.registerDirsChangeListener(listener1);
dc.registerDirsChangeListener(listener2);
dc.registerDirsChangeListener(listener3);
Assert.assertEquals(listener1.num, 1);
Assert.assertEquals(listener2.num, 1);
Assert.assertEquals(listener3.num, 1);

dc.deregisterDirsChangeListener(listener3);
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(listener1.num, 2);
Assert.assertEquals(listener2.num, 2);
Assert.assertEquals(listener3.num, 1);

dc.deregisterDirsChangeListener(listener2);
dc.setDiskUtilizationPercentageCutoff(100.0F);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(listener1.num, 3);
Assert.assertEquals(listener2.num, 2);
Assert.assertEquals(listener3.num, 1);
}

static class DirsChangeListenerTest implements DirsChangeListener {
public int num = 0;
public DirsChangeListenerTest() {
}
@Override
public void onDirsChanged() {
num++;
}
}
} }
Expand Up @@ -1098,7 +1098,6 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
isA(Configuration.class)); isA(Configuration.class));


spyService.init(conf); spyService.init(conf);
spyService.start();


final FsPermission defaultPerm = new FsPermission((short)0755); final FsPermission defaultPerm = new FsPermission((short)0755);


Expand All @@ -1110,6 +1109,8 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
.mkdir(eq(publicCache),eq(defaultPerm), eq(true)); .mkdir(eq(publicCache),eq(defaultPerm), eq(true));
} }


spyService.start();

final String user = "user0"; final String user = "user0";
// init application // init application
final Application app = mock(Application.class); final Application app = mock(Application.class);
Expand All @@ -1131,21 +1132,32 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
r.setSeed(seed); r.setSeed(seed);


// Queue up public resource localization // Queue up public resource localization
final LocalResource pubResource = getPublicMockedResource(r); final LocalResource pubResource1 = getPublicMockedResource(r);
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); final LocalResourceRequest pubReq1 =
new LocalResourceRequest(pubResource1);

LocalResource pubResource2 = null;
do {
pubResource2 = getPublicMockedResource(r);
} while (pubResource2 == null || pubResource2.equals(pubResource1));
// above call to make sure we don't get identical resources.
final LocalResourceRequest pubReq2 =
new LocalResourceRequest(pubResource2);

Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq1);
pubRsrcs.add(pubReq2);


Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req = Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility, new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>(); Collection<LocalResourceRequest>>();
req.put(LocalResourceVisibility.PUBLIC, req.put(LocalResourceVisibility.PUBLIC, pubRsrcs);
Collections.singletonList(pubReq));

Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq);


spyService.handle(new ContainerLocalizationRequestEvent(c, req)); spyService.handle(new ContainerLocalizationRequestEvent(c, req));
dispatcher.await(); dispatcher.await();


verify(spyService, times(1)).checkAndInitializeLocalDirs();

// verify directory creation // verify directory creation
for (Path p : localDirs) { for (Path p : localDirs) {
p = new Path((new URI(p.toString())).getPath()); p = new Path((new URI(p.toString())).getPath());
Expand Down

0 comments on commit b725078

Please sign in to comment.