Skip to content

Commit

Permalink
MAPREDUCE-3824. Distributed caches are not removed properly. Contribu…
Browse files Browse the repository at this point in the history
…ted by Thomas Graves.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0@1291091 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
mattf-apache committed Feb 19, 2012
1 parent e5851c6 commit d8adfd9
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 31 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Release 1.0.1 - 2012.02.19

HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)

MAPREDUCE-3824. Distributed caches are not removed properly. (Thomas Graves
via mattf)

Release 1.0.0 - 2011.12.15

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ public void release() throws IOException {
public void setSizes(long[] sizes) throws IOException {
int i = 0;
for (CacheFile c: cacheFiles) {
if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE &&
c.status != null) {
distributedCacheManager.setSize(c.status, sizes[i++]);
if (!c.isPublic && c.status != null) {
distributedCacheManager.setSize(c.status, sizes[i]);
}
i++;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -546,7 +547,7 @@ class CacheStatus {
//
// This field should be accessed under global cachedArchives lock.
//
private int refcount; // number of instances using this cache
private AtomicInteger refcount; // number of instances using this cache

//
// The following two fields should be accessed under
Expand Down Expand Up @@ -577,7 +578,7 @@ public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
String uniqueString, String user, String key) {
super();
this.localizedLoadPath = localLoadPath;
this.refcount = 0;
this.refcount = new AtomicInteger();
this.localizedBaseDir = baseDir;
this.size = 0;
this.subDir = subDir;
Expand All @@ -587,14 +588,16 @@ public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
}

public synchronized void incRefCount() {
refcount += 1;
refcount.incrementAndGet() ;
LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
}

public void decRefCount() {
synchronized (cachedArchives) {
synchronized (this) {
refcount -= 1;
if(refcount <= 0) {
refcount.decrementAndGet() ;
LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
if(refcount.get() <= 0) {
String key = this.key;
cachedArchives.remove(key);
cachedArchives.put(key, this);
Expand All @@ -604,11 +607,12 @@ public void decRefCount() {
}

public int getRefCount() {
return refcount;
return refcount.get();
}

public synchronized boolean isUsed() {
return refcount > 0;
LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
return refcount.get() > 0;
}

Path getBaseDir(){
Expand Down Expand Up @@ -641,7 +645,8 @@ public void purgeCache() {
try {
localFs.delete(f.getValue().localizedLoadPath, true);
} catch (IOException ie) {
LOG.debug("Error cleaning up cache", ie);
LOG.debug("Error cleaning up cache (" +
f.getValue().localizedLoadPath + ")", ie);
}
}
cachedArchives.clear();
Expand All @@ -657,6 +662,10 @@ public void purgeCache() {
return result;
}

/**
* Set the sizes for any archives, files, or directories in the private
* distributed cache.
*/
public void setArchiveSizes(JobID jobId, long[] sizes) throws IOException {
TaskDistributedCacheManager mgr = jobArchives.get(jobId);
if (mgr != null) {
Expand Down Expand Up @@ -978,8 +987,13 @@ void checkAndCleanup() throws IOException {
HashMap<Path, CacheDir> toBeCleanedBaseDir =
new HashMap<Path, CacheDir>();
synchronized (properties) {
LOG.debug("checkAndCleanup: Allowed Cache Size test");
for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
CacheDir baseDirCounts = baseDir.getValue();
LOG.debug(baseDir.getKey() + ": allowedCacheSize=" + allowedCacheSize +
",baseDirCounts.size=" + baseDirCounts.size +
",allowedCacheSubdirs=" + allowedCacheSubdirs +
",baseDirCounts.subdirs=" + baseDirCounts.subdirs);
if (allowedCacheSize < baseDirCounts.size ||
allowedCacheSubdirs < baseDirCounts.subdirs) {
CacheDir tcc = new CacheDir();
Expand All @@ -991,6 +1005,7 @@ void checkAndCleanup() throws IOException {
}
// try deleting cache Status with refcount of zero
synchronized (cachedArchives) {
LOG.debug("checkAndCleanup: Global Cache Size Check");
for(
Iterator<Map.Entry<String, CacheStatus>> it
= cachedArchives.entrySet().iterator();
Expand All @@ -999,11 +1014,16 @@ void checkAndCleanup() throws IOException {
String cacheId = entry.getKey();
CacheStatus cacheStatus = cachedArchives.get(cacheId);
CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir());

if (leftToClean != null && (leftToClean.size > 0 || leftToClean.subdirs > 0)) {
synchronized (cacheStatus) {
// if reference count is zero mark the cache for deletion
if (!cacheStatus.isUsed()) {
leftToClean.size -= cacheStatus.size;
boolean isUsed = cacheStatus.isUsed();
long cacheSize = cacheStatus.size;
LOG.debug(cacheStatus.getLocalizedUniqueDir() + ": isUsed=" + isUsed +
" size=" + cacheSize + " leftToClean.size=" + leftToClean.size);
if (!isUsed) {
leftToClean.size -= cacheSize;
leftToClean.subdirs--;
// delete this cache entry from the global list
// and mark the localized file for deletion
Expand Down
11 changes: 8 additions & 3 deletions src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.List;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -339,21 +340,25 @@ private static long[] downloadPrivateCacheObjects(Configuration conf,
* @return the size of the archive objects
*/
public static long[] downloadPrivateCache(Configuration conf) throws IOException {
downloadPrivateCacheObjects(conf,
long[] fileSizes = downloadPrivateCacheObjects(conf,
DistributedCache.getCacheFiles(conf),
DistributedCache.getLocalCacheFiles(conf),
DistributedCache.getFileTimestamps(conf),
TrackerDistributedCacheManager.
getFileVisibilities(conf),
false);
return
downloadPrivateCacheObjects(conf,

long[] archiveSizes = downloadPrivateCacheObjects(conf,
DistributedCache.getCacheArchives(conf),
DistributedCache.getLocalCacheArchives(conf),
DistributedCache.getArchiveTimestamps(conf),
TrackerDistributedCacheManager.
getArchiveVisibilities(conf),
true);

// The order here matters - it has to match order of cache files
// in TaskDistributedCacheManager.
return ArrayUtils.addAll(fileSizes, archiveSizes);
}

public void localizeJobFiles(JobID jobid, JobConf jConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,

/**
* The job initializer needs to report the sizes of the archive
* objects in the private distributed cache.
* objects and directories in the private distributed cache.
* @param jobId the job to update
* @param sizes the array of sizes that were computed
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public class TestTrackerDistributedCacheManager extends TestCase {
protected Path firstCacheFilePublic;
protected Path secondCacheFile;
protected Path secondCacheFilePublic;
protected Path firstCacheDirPublic;
protected Path firstCacheDirPrivate;
protected Path firstCacheFileInDirPublic;
protected Path firstCacheFileInDirPrivate;
private FileSystem fs;

protected LocalDirAllocator localDirAllocator =
Expand Down Expand Up @@ -126,6 +130,15 @@ protected void setUp() throws IOException,InterruptedException {
createPublicTempFile(secondCacheFilePublic);
createPrivateTempFile(firstCacheFile);
createPrivateTempFile(secondCacheFile);

firstCacheDirPublic = new Path(TEST_ROOT_DIR, "firstcachedirPublic");
firstCacheDirPrivate = new Path(TEST_ROOT_DIR, "firstcachedirPrivate");
firstCacheFileInDirPublic = new Path(firstCacheDirPublic, "firstcacheFileinDirPublic.txt");
firstCacheFileInDirPrivate = new Path(firstCacheDirPrivate, "firstcacheFileinDirPrivate.txt");
createPublicTempDir(firstCacheDirPublic);
createPrivateTempDir(firstCacheDirPrivate);
createPublicTempFile(firstCacheFileInDirPublic);
createPrivateTempFile(firstCacheFileInDirPrivate);
}

protected void refreshConf(Configuration conf) throws IOException {
Expand Down Expand Up @@ -253,41 +266,79 @@ public void testReferenceCount() throws IOException, LoginException,
TrackerDistributedCacheManager.determineCacheVisibilities(conf1);

// Task localizing for first job
JobID jobId = new JobID("jt", 1);
TaskDistributedCacheManager handle = manager
.newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
.newTaskDistributedCacheManager(jobId, conf1);
handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
JobLocalizer.downloadPrivateCache(conf1);
long[] sizes = JobLocalizer.downloadPrivateCache(conf1);
if (sizes != null) {
manager.setArchiveSizes(jobId, sizes);
}
handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
assertEquals(0, manager.getReferenceCount(c.getStatus()));
long filesize = FileUtil.getDU(new File(c.getStatus().localizedLoadPath.getParent().toString()));
assertTrue("filesize is not greater than 0", filesize > 0);
assertEquals(filesize, c.getStatus().size);
}

// Test specifying directories to go into distributed cache and make
// their sizes are calculated properly.
Job job2 = new Job(conf);
Configuration conf2 = job2.getConfiguration();
conf1.set("user.name", userName);
DistributedCache.addCacheFile(firstCacheDirPublic.toUri(), conf2);
DistributedCache.addCacheFile(firstCacheDirPrivate.toUri(), conf2);

TrackerDistributedCacheManager.determineTimestamps(conf2);
TrackerDistributedCacheManager.determineCacheVisibilities(conf2);

// Task localizing for second job
JobID job2Id = new JobID("jt", 2);
handle = manager.newTaskDistributedCacheManager(job2Id, conf2);
handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
long[] sizes2 = JobLocalizer.downloadPrivateCache(conf2);
for (int j=0; j > sizes2.length; j++) {
LOG.info("size is: " + sizes2[j]);
}
if (sizes2 != null) {
manager.setArchiveSizes(job2Id, sizes2);
}
handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
assertEquals(0, manager.getReferenceCount(c.getStatus()));
long filesize = FileUtil.getDU(new File(c.getStatus().localizedLoadPath.getParent().toString()));
assertTrue("filesize is not greater than 0", filesize > 0);
assertEquals(filesize, c.getStatus().size);
}

Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
createPrivateTempFile(thirdCacheFile);

// Configures another job with three regular files.
Job job2 = new Job(conf);
Configuration conf2 = job2.getConfiguration();
conf2.set("user.name", userName);
Job job3 = new Job(conf);
Configuration conf3 = job3.getConfiguration();
conf3.set("user.name", userName);
// add a file that would get failed to localize
DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf3);
// add a file that is already localized by different job
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf3);
// add a file that is never localized
DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf3);

TrackerDistributedCacheManager.determineTimestamps(conf2);
TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
TrackerDistributedCacheManager.determineTimestamps(conf3);
TrackerDistributedCacheManager.determineCacheVisibilities(conf3);

// Task localizing for second job
// Task localizing for third job
// localization for the "firstCacheFile" will fail.
handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
handle = manager.newTaskDistributedCacheManager(new JobID("jt", 3), conf3);
Throwable th = null;
try {
handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
handle.setupCache(conf3, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
JobLocalizer.downloadPrivateCache(conf2);
JobLocalizer.downloadPrivateCache(conf3);
} catch (IOException e) {
th = e;
LOG.info("Exception during setup", e);
Expand Down Expand Up @@ -939,6 +990,13 @@ static void createTempFile(Path p) throws IOException {
createTempFile(p, TEST_FILE_SIZE);
}

static void createTempDir(Path p) throws IOException {
File dir = new File(p.toString());
dir.mkdirs();
FileSystem.LOG.info("created temp directory: " + p);

}

static void createTempFile(Path p, int size) throws IOException {
File f = new File(p.toString());
FileOutputStream os = new FileOutputStream(f);
Expand All @@ -961,12 +1019,30 @@ static void createPrivateTempFile(Path p)
FileUtil.chmod(p.toString(), "0770",true);
}

static void createPublicTempDir(Path p)
throws IOException, InterruptedException {
createTempDir(p);
FileUtil.chmod(p.toString(), "0777",true);
}

static void createPrivateTempDir(Path p)
throws IOException, InterruptedException {
createTempDir(p);
FileUtil.chmod(p.toString(), "0770",true);
}

@Override
protected void tearDown() throws IOException {
new File(firstCacheFile.toString()).delete();
new File(secondCacheFile.toString()).delete();
new File(firstCacheFilePublic.toString()).delete();
new File(secondCacheFilePublic.toString()).delete();

new File(firstCacheFileInDirPublic.toString()).delete();
new File(firstCacheFileInDirPrivate.toString()).delete();
new File(firstCacheDirPrivate.toString()).delete();
new File(firstCacheDirPublic.toString()).delete();

FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
}

Expand Down

0 comments on commit d8adfd9

Please sign in to comment.