Skip to content

Commit

Permalink
MAPREDUCE-4493. Distibuted Cache Compatability Issues (Robert Evans v…
Browse files Browse the repository at this point in the history
…ia tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1367713 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
tgravescs committed Jul 31, 2012
1 parent 75be1e0 commit 735b50e
Show file tree
Hide file tree
Showing 20 changed files with 63 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ Deprecated Properties
*---+---+
|mapred.compress.map.output | mapreduce.map.output.compress
*---+---+
|mapred.create.symlink | mapreduce.job.cache.symlink.create
|mapred.create.symlink | NONE - symlinking is always on
*---+---+
|mapreduce.job.cache.symlink.create | NONE - symlinking is always on
*---+---+
|mapred.data.field.separator | mapreduce.fieldsel.data.field.separator
*---+---+
Expand Down
3 changes: 3 additions & 0 deletions hadoop-mapreduce-project/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,9 @@ Release 0.23.3 - UNRELEASED

MAPREDUCE-4496. AM logs link is missing user name (Jason Lowe via bobby)

MAPREDUCE-4493. Distibuted Cache Compatability Issues (Robert Evans
via tgraves)

Release 0.23.2 - UNRELEASED

INCOMPATIBLE CHANGES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class LocalDistributedCacheManager {
* @throws IOException
*/
public void setup(JobConf conf) throws IOException {
boolean mkLinks = DistributedCache.getSymlink(conf);
File workDir = new File(System.getProperty("user.dir"));

// Generate YARN local resources objects corresponding to the distributed
Expand Down Expand Up @@ -145,11 +144,9 @@ public void setup(JobConf conf) throws IOException {
throw new IOException(e);
}
String pathString = path.toUri().toString();
if(mkLinks) {
String link = entry.getKey();
String target = new File(path.toUri()).getPath();
symlink(workDir, target, link);
}
String link = entry.getKey();
String target = new File(path.toUri()).getPath();
symlink(workDir, target, link);

if (resource.getType() == LocalResourceType.ARCHIVE) {
localArchives.add(pathString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ public Object answer(InvocationOnMock args) throws Throwable {
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try {
manager.setup(conf);
Expand Down Expand Up @@ -197,7 +196,6 @@ public Object answer(InvocationOnMock args) throws Throwable {

conf.set(MRJobConfig.CACHE_FILES, "");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try {
manager.setup(conf);
Expand Down Expand Up @@ -268,7 +266,6 @@ public Object answer(InvocationOnMock args) throws Throwable {
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try {
manager.setup(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ private void testWithConf(Configuration conf) throws IOException,
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.createSymlink();
job.setMaxMapAttempts(1); // speed up failures

job.submit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
* Jars may be optionally added to the classpath of the tasks, a rudimentary
* software distribution mechanism. Files have execution permissions.
* Optionally users can also direct it to symlink the distributed cache file(s)
* into the working directory of the task.</p>
* In older version of Hadoop Map/Reduce users could optionally ask for symlinks
* to be created in the working directory of the child task. In the current
* version symlinks are always created. If the URL does not have a fragment
* the name of the file or directory will be used. If multiple files or
* directories map to the same link name, the last one added, will be used. All
* others will not even be downloaded.</p>
*
* <p><code>DistributedCache</code> tracks modification timestamps of the cache
* files. Clearly the cache files should not be modified by the application
Expand Down Expand Up @@ -91,8 +95,7 @@
*
* public void configure(JobConf job) {
* // Get the cached archives/files
* localArchives = DistributedCache.getLocalCacheArchives(job);
* localFiles = DistributedCache.getLocalCacheFiles(job);
* File f = new File("./map.zip/some/file/in/zip.txt");
* }
*
* public void map(K key, V value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ private static void setupPipesJob(JobConf conf) throws IOException {
// add default debug script only when executable is expressed as
// <path>#<executable>
if (exec.contains("#")) {
DistributedCache.createSymlink(conf);
// set default gdb commands for map and reduce task
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,9 +1049,10 @@ public void addArchiveToClassPath(Path archive)
}

/**
* This method allows you to create symlinks in the current working directory
* of the task to all the cache files/archives
* Originally intended to enable symlinks, but currently symlinks cannot be
* disabled.
*/
@Deprecated
public void createSymlink() {
ensureState(JobState.DEFINE);
DistributedCache.createSymlink(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ public Class<? extends Partitioner<?,?>> getPartitionerClass()
public String getUser();

/**
* This method checks to see if symlinks are to be create for the
* localized cache files in the current working directory
* @return true if symlinks are to be created- else return false
* Originally intended to check if symlinks should be used, but currently
* symlinks cannot be disabled.
* @return true
*/
@Deprecated
public boolean getSymlink();

/**
Expand All @@ -251,14 +252,22 @@ public Class<? extends Partitioner<?,?>> getPartitionerClass()
* Return the path array of the localized caches
* @return A path array of localized caches
* @throws IOException
* @deprecated the array returned only includes the items the were
* downloaded. There is no way to map this to what is returned by
* {@link #getCacheArchives()}.
*/
@Deprecated
public Path[] getLocalCacheArchives() throws IOException;

/**
* Return the path array of the localized files
* @return A path array of localized files
* @throws IOException
* @deprecated the array returned only includes the items the were
* downloaded. There is no way to map this to what is returned by
* {@link #getCacheFiles()}.
*/
@Deprecated
public Path[] getLocalCacheFiles() throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ private void copyAndConfigureFiles(Job job, Path submitJobDir,
//should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile, ue);
}
DistributedCache.createSymlink(conf);
}
}

Expand Down Expand Up @@ -225,7 +224,6 @@ private void copyAndConfigureFiles(Job job, Path submitJobDir,
//should not throw an uri excpetion
throw new IOException("Failed to create uri for " + tmpArchives, ue);
}
DistributedCache.createSymlink(conf);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public interface MRJobConfig {

public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";

/**
* @deprecated Symlinks are always on and cannot be disabled.
*/
@Deprecated
public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";

public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
* Jars may be optionally added to the classpath of the tasks, a rudimentary
* software distribution mechanism. Files have execution permissions.
* Optionally users can also direct it to symlink the distributed cache file(s)
* into the working directory of the task.</p>
* In older version of Hadoop Map/Reduce users could optionally ask for symlinks
* to be created in the working directory of the child task. In the current
* version symlinks are always created. If the URL does not have a fragment
* the name of the file or directory will be used. If multiple files or
* directories map to the same link name, the last one added, will be used. All
* others will not even be downloaded.</p>
*
* <p><code>DistributedCache</code> tracks modification timestamps of the cache
* files. Clearly the cache files should not be modified by the application
Expand Down Expand Up @@ -98,8 +102,7 @@
*
* public void configure(JobConf job) {
* // Get the cached archives/files
* localArchives = DistributedCache.getLocalCacheArchives(job);
* localFiles = DistributedCache.getLocalCacheFiles(job);
* File f = new File("./map.zip/some/file/in/zip.txt");
* }
*
* public void map(K key, V value,
Expand Down Expand Up @@ -375,32 +378,26 @@ public static Path[] getArchiveClassPaths(Configuration conf) {
}

/**
* This method allows you to create symlinks in the current working directory
* of the task to all the cache files/archives.
* Intended to be used by user code.
* Originally intended to enable symlinks, but currently symlinks cannot be
* disabled. This is a NO-OP.
* @param conf the jobconf
* @deprecated Use {@link Job#createSymlink()} instead
* @deprecated This is a NO-OP.
*/
@Deprecated
public static void createSymlink(Configuration conf){
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
//NOOP
}

/**
* This method checks to see if symlinks are to be create for the
* localized cache files in the current working directory
* Used by internal DistributedCache code.
* Originally intended to check if symlinks should be used, but currently
* symlinks cannot be disabled.
* @param conf the jobconf
* @return true if symlinks are to be created- else return false
* @deprecated Use {@link JobContext#getSymlink()} instead
* @return true
* @deprecated symlinks are always created.
*/
@Deprecated
public static boolean getSymlink(Configuration conf){
String result = conf.get(MRJobConfig.CACHE_SYMLINK);
if ("yes".equals(result)){
return true;
}
return false;
return true;
}

private static boolean[] parseBooleans(String[] strs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ private static void addDeprecatedKeys() {
new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS});
Configuration.addDeprecation("mapred.cache.archives.timestamps",
new String[] {MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS});
Configuration.addDeprecation("mapred.create.symlink",
new String[] {MRJobConfig.CACHE_SYMLINK});
Configuration.addDeprecation("mapred.working.dir",
new String[] {MRJobConfig.WORKING_DIR});
Configuration.addDeprecation("user.name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,10 @@ static void setupCache(String cacheDir, FileSystem fs)
fs.copyFromLocalFile(tarPath1, cachePath);
fs.copyFromLocalFile(tarPath2, cachePath);
}

public static TestResult launchMRCache(String indir,
String outdir, String cacheDir,
JobConf conf, String input)
throws IOException {
setupCache(cacheDir, FileSystem.get(conf));
return launchMRCache(indir,outdir, cacheDir, conf, input, false);
}

public static TestResult launchMRCache(String indir,
String outdir, String cacheDir,
JobConf conf, String input,
boolean withSymlink)
JobConf conf, String input)
throws IOException {
String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
.toString().replace(' ', '+');
Expand Down Expand Up @@ -256,24 +247,13 @@ public static TestResult launchMRCache(String indir,
conf.setNumReduceTasks(1);
conf.setSpeculativeExecution(false);
URI[] uris = new URI[6];
if (!withSymlink) {
conf.setMapperClass(MRCaching.MapClass.class);
uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
} else {
DistributedCache.createSymlink(conf);
conf.setMapperClass(MRCaching.MapClass2.class);
uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
}
conf.setMapperClass(MRCaching.MapClass2.class);
uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
DistributedCache.addCacheFile(uris[0], conf);

// Save expected file sizes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public void testWithDFS() throws IOException {
"/cachedir",
mr.createJobConf(),
"The quick brown fox\nhas many silly\n"
+ "red fox sox\n", false);
+ "red fox sox\n");
assertTrue("Archives not matching", ret.isOutputOk);
// launch MR cache with symlinks
ret = MRCaching.launchMRCache("/testing/wc/input",
"/testing/wc/output",
"/cachedir",
mr.createJobConf(),
"The quick brown fox\nhas many silly\n"
+ "red fox sox\n", true);
+ "red fox sox\n");
assertTrue("Archives not matching", ret.isOutputOk);
} finally {
if (fileSys != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public void testRandomWriter() throws IOException, InterruptedException,
Path outputDir =
new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
Expand Down Expand Up @@ -462,7 +463,6 @@ public void testDistributedCache() throws Exception {
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.createSymlink();
job.setMaxMapAttempts(1); // speed up failures

job.submit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ private Job runSpecTest(boolean mapspec, boolean redspec)

// Creates the Job Configuration
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.createSymlink();
job.setMaxMapAttempts(2);

job.submit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ public int run(String[] args) throws Exception {
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
}

System.out.println("Running on " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@ public int run(String[] args) throws Exception {
LOG.error(e.getMessage());
return -1;
}
job.addCacheFile(partitionUri);
job.createSymlink();
job.addCacheFile(partitionUri);
long end = System.currentTimeMillis();
System.out.println("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,6 @@ protected void setJobConf() throws IOException {
if (!b)
fail(LINK_URI);
}
DistributedCache.createSymlink(jobConf_);
// set the jobconf for the caching parameters
if (cacheArchives != null)
DistributedCache.setCacheArchives(archiveURIs, jobConf_);
Expand Down

0 comments on commit 735b50e

Please sign in to comment.