diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java index 53b4cc13ef01f..caff8deb125a4 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java @@ -30,6 +30,7 @@ import eu.stratosphere.api.common.aggregators.ConvergenceCriterion; import eu.stratosphere.api.common.aggregators.LongSumAggregator; import eu.stratosphere.api.common.cache.DistributedCache; +import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry; import eu.stratosphere.api.common.distributions.DataDistribution; import eu.stratosphere.api.common.typeutils.TypeSerializerFactory; import eu.stratosphere.compiler.CompilerException; @@ -206,8 +207,8 @@ public JobGraph compileJobGraph(OptimizedPlan program) { } // add registered cache file into job configuration - for (Entry e: program.getOriginalPactPlan().getCachedFiles()) { - DistributedCache.addCachedFile(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration()); + for (Entry e : program.getOriginalPactPlan().getCachedFiles()) { + DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration()); } JobGraph graph = this.jobGraph; diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java index 6d4b35683fde5..27d8b66166458 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry; import java.util.ArrayList; import java.util.Calendar; @@ -66,7 +67,7 @@ public class Plan implements Visitable> { */ protected int maxNumberMachines; - protected HashMap cacheFile = new HashMap(); + protected HashMap cacheFile = new HashMap(); // ------------------------------------------------------------------------ @@ -301,28 +302,28 @@ public void accept(Visitor> visitor) { /** * register cache files in program level - * @param filePath The files must be stored in a place that can be accessed from all workers (most commonly HDFS) + * @param entry containing all relevant information * @param name user defined name of that file * @throws java.io.IOException */ - public void registerCachedFile(String filePath, String name) throws IOException { + public void registerCachedFile(String name, DistributedCacheEntry entry) throws IOException { if (!this.cacheFile.containsKey(name)) { try { - URI u = new URI(filePath); + URI u = new URI(entry.filePath); if (!u.getPath().startsWith("/")) { - u = new URI(new File(filePath).getAbsolutePath()); + u = new File(entry.filePath).toURI(); } FileSystem fs = FileSystem.get(u); if (fs.exists(new Path(u.getPath()))) { - this.cacheFile.put(name, u.toString()); + this.cacheFile.put(name, new DistributedCacheEntry(u.toString(), entry.isExecutable)); } else { - throw new RuntimeException("File " + u.toString() + " doesn't exist."); + throw new IOException("File " + u.toString() + " doesn't exist."); } } catch (URISyntaxException ex) { - throw new RuntimeException("Invalid path: " + filePath, ex); + throw new IOException("Invalid path: " + entry.filePath, ex); } } else { - throw new RuntimeException("cache file " + name + "already exists!"); + throw new IOException("cache file " + name + "already exists!"); } } @@ -330,7 +331,7 @@ public void registerCachedFile(String filePath, String name) throws IOException * return the registered caches files * @return Set of (name, filePath) pairs */ - public Set> getCachedFiles() { + public Set> getCachedFiles() { return this.cacheFile.entrySet(); } } diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java index bd07ec8a9e352..0ddb46d1ff907 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java @@ -25,35 +25,49 @@ import eu.stratosphere.core.fs.Path; /** - * DistributedCache provides static method to write the registered cache files into job configuration or decode + * DistributedCache provides static methods to write the registered cache files into job configuration or decode * them from job configuration. It also provides user access to the file locally. */ public class DistributedCache { + + public static class DistributedCacheEntry { + public String filePath; + public Boolean isExecutable; + + public DistributedCacheEntry(String filePath, Boolean isExecutable){ + this.filePath=filePath; + this.isExecutable=isExecutable; + } + } final static String CACHE_FILE_NUM = "DISTRIBUTED_CACHE_FILE_NUM"; final static String CACHE_FILE_NAME = "DISTRIBUTED_CACHE_FILE_NAME_"; final static String CACHE_FILE_PATH = "DISTRIBUTED_CACHE_FILE_PATH_"; + + final static String CACHE_FILE_EXE = "DISTRIBUTED_CACHE_FILE_EXE_"; public final static String TMP_PREFIX = "tmp_"; private Map> cacheCopyTasks = new HashMap>(); - public static void addCachedFile(String name, String filePath, Configuration conf) { + public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) { int num = conf.getInteger(CACHE_FILE_NUM,0) + 1; conf.setInteger(CACHE_FILE_NUM, num); conf.setString(CACHE_FILE_NAME + num, name); - conf.setString(CACHE_FILE_PATH + num, filePath); + conf.setString(CACHE_FILE_PATH + num, e.filePath); + conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute()); } - public static Set> getCachedFile(Configuration conf) { - Map cacheFiles = new HashMap(); - int num = conf.getInteger(CACHE_FILE_NUM,0); + public static Set> readFileInfoFromConfig(Configuration conf) { + Map cacheFiles = new HashMap(); + int num = conf.getInteger(CACHE_FILE_NUM, 0); for (int i = 1; i <= num; i++) { String name = conf.getString(CACHE_FILE_NAME + i, ""); String filePath = conf.getString(CACHE_FILE_PATH + i, ""); - cacheFiles.put(name, filePath); + Boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false); + cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable)); } return cacheFiles.entrySet(); } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java index 9b316ccd12646..9ece3c2792a09 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java @@ -29,6 +29,7 @@ import eu.stratosphere.api.common.InvalidProgramException; import eu.stratosphere.api.common.JobExecutionResult; import eu.stratosphere.api.common.Plan; +import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry; import eu.stratosphere.api.common.io.InputFormat; import eu.stratosphere.api.java.io.CollectionInputFormat; import eu.stratosphere.api.java.io.CsvReader; @@ -87,7 +88,7 @@ public abstract class ExecutionEnvironment { private final List> sinks = new ArrayList>(); - private final List> cacheFile = new ArrayList>(); + private final List> cacheFile = new ArrayList(); private int degreeOfParallelism = -1; @@ -542,8 +543,8 @@ public JobExecutionResult execute() throws Exception { /** * Registers a file at the distributed cache under the given name. The file will be accessible * from any user-defined function in the (distributed) runtime under a local path. Files - * may be local files, or files in a distributed file system. The runtime will copy the files - * temporarily to a local cache, if needed. + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. + * The runtime will copy the files temporarily to a local cache, if needed. *

* The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via * {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access @@ -554,7 +555,26 @@ public JobExecutionResult execute() throws Exception { * @param name The name under which the file is registered. */ public void registerCachedFile(String filePath, String name){ - this.cacheFile.add(new Tuple2(filePath, name)); + registerCachedFile(filePath, name, false); + } + + /** + * Registers a file at the distributed cache under the given name. The file will be accessible + * from any user-defined function in the (distributed) runtime under a local path. Files + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. + * The runtime will copy the files temporarily to a local cache, if needed. + *

+ * The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via + * {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access + * {@link eu.stratosphere.api.common.cache.DistributedCache} via + * {@link eu.stratosphere.api.common.functions.RuntimeContext#getDistributedCache()}. + * + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path") + * @param name The name under which the file is registered. + * @param executable flag indicating whether the file should be executable + */ + public void registerCachedFile(String filePath, String name, boolean executable){ + this.cacheFile.add(new Tuple2(name, new DistributedCacheEntry(name, executable))); } /** @@ -565,7 +585,7 @@ public void registerCachedFile(String filePath, String name){ * @throws IOException Thrown if checks for existence and sanity fail. */ protected void registerCachedFilesWithPlan(Plan p) throws IOException { - for (Tuple2 entry : cacheFile) { + for (Tuple2 entry : cacheFile) { p.registerCachedFile(entry.f0, entry.f1); } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index 3b478cfa35b6c..6b0acc1b35b2c 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -50,6 +50,7 @@ import org.apache.hadoop.security.UserGroupInformation; import eu.stratosphere.api.common.cache.DistributedCache; +import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry; import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.GlobalConfiguration; @@ -666,7 +667,7 @@ public List submitTasks(final List> cpTasks = new HashMap>(); - for (Entry e: DistributedCache.getCachedFile(tdd.getJobConfiguration())) { + for (Entry e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) { FutureTask cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID); cpTasks.put(e.getKey(), cp); } @@ -801,8 +802,9 @@ private void unregisterTask(final ExecutionVertexID id) { } // remove the local tmp file for unregistered tasks. - for (Entry e: DistributedCache.getCachedFile(task.getEnvironment().getJobConfiguration())) { - this.fileCache.deleteTmpFile(e.getKey(), task.getJobID()); + for (Entry e: DistributedCache + .readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) { + this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID()); } // Unregister task from the byte buffered channel manager this.channelManager.unregister(id, task); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java index 26eb9feb9f5fd..24250abecc714 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java @@ -13,6 +13,8 @@ package eu.stratosphere.pact.runtime.cache; +import eu.stratosphere.api.common.cache.DistributedCache; +import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -25,11 +27,12 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import eu.stratosphere.api.common.cache.DistributedCache; +import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry; import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.GlobalConfiguration; import eu.stratosphere.core.fs.FSDataInputStream; import eu.stratosphere.core.fs.FSDataOutputStream; +import eu.stratosphere.core.fs.FileStatus; import eu.stratosphere.core.fs.FileSystem; import eu.stratosphere.core.fs.Path; import eu.stratosphere.core.fs.local.LocalFileSystem; @@ -38,48 +41,56 @@ import eu.stratosphere.nephele.util.IOUtils; /** - * FileCache is used to create the local tmp file for the registered cache file when a task is deployed. Also when the - * task is unregistered, it will remove the local tmp file. Given that another task from the same job may be registered - * shortly after, there exists a 5 second delay before clearing the local tmp file. + * The FileCache is used to create the local files for the registered cache files when a task is deployed. + * The files will be removed when the task is unregistered after a 5 second delay. + * A given file x will be placed in "/tmp_/". */ public class FileCache { private LocalFileSystem lfs = new LocalFileSystem(); + private static final Object lock = new Object(); + private Map, Integer> count = new HashMap, Integer>(); private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE); /** * If the file doesn't exists locally, it will copy the file to the temp directory. + * @param name file identifier + * @param entry entry containing all relevant information + * @param jobID + * @return copy task */ - public FutureTask createTmpFile(String name, String filePath, JobID jobID) { - + public FutureTask createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) { synchronized (count) { - Pair key = new ImmutablePair(jobID,name); + Pair key = new ImmutablePair(jobID, name); if (count.containsKey(key)) { count.put(key, count.get(key) + 1); } else { count.put(key, 1); } } - CopyProcess cp = new CopyProcess(name, filePath, jobID); + CopyProcess cp = new CopyProcess(name, entry, jobID); FutureTask copyTask = new FutureTask(cp); executorService.submit(copyTask); return copyTask; } /** - * Leave a 5 seconds delay to clear the local file. + * Deletes the local file after a 5 second delay. + * @param name file identifier + * @param entry entry containing all relevant information + * @param jobID */ - public void deleteTmpFile(String name, JobID jobID) { - DeleteProcess dp = new DeleteProcess(name, jobID, count.get(new ImmutablePair(jobID,name))); + public void deleteTmpFile(String name, DistributedCacheEntry entry, JobID jobID) { + DeleteProcess dp = new DeleteProcess(name, entry, jobID, count.get(new ImmutablePair(jobID,name))); executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS); } - public Path getTempDir(JobID jobID, String name) { + public Path getTempDir(JobID jobID, String childPath) { return new Path(GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), DistributedCache.TMP_PREFIX + jobID.toString() + "_" + name); + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), DistributedCache.TMP_PREFIX + jobID.toString() + "/" + childPath); } public void shutdown() { @@ -93,6 +104,30 @@ public void shutdown() { } } + public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException { + FileSystem sFS = sourcePath.getFileSystem(); + FileSystem tFS = targetPath.getFileSystem(); + if (!tFS.exists(targetPath)) { + if (sFS.getFileStatus(sourcePath).isDir()) { + tFS.mkdirs(targetPath); + FileStatus[] contents = sFS.listStatus(sourcePath); + for (FileStatus content : contents) { + String distPath = content.getPath().toString(); + if (content.isDir() && distPath.endsWith("/")) { + distPath = distPath.substring(0, distPath.length() - 1); + } + String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/")); + copy(content.getPath(), new Path(localPath), executable); + } + } else { + FSDataOutputStream lfsOutput = tFS.create(targetPath, false); + FSDataInputStream fsInput = sFS.open(sourcePath); + IOUtils.copyBytes(fsInput, lfsOutput); + new File(targetPath.toString()).setExecutable(executable); + } + } + } + /** * Asynchronous file copy process */ @@ -100,21 +135,20 @@ private class CopyProcess implements Callable { private JobID jobID; private String name; private String filePath; + private Boolean executable; - public CopyProcess(String name, String filePath, JobID jobID) { + public CopyProcess(String name, DistributedCacheEntry e, JobID jobID) { this.name = name; - this.filePath = filePath; + this.filePath = e.filePath; + this.executable = e.isExecutable; this.jobID = jobID; } + @Override public Path call() { - Path tmp = getTempDir(jobID, name); + Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1)); try { - if (!lfs.exists(tmp)) { - FSDataOutputStream lfsOutput = lfs.create(tmp, false); - Path distributedPath = new Path(filePath); - FileSystem fs = distributedPath.getFileSystem(); - FSDataInputStream fsInput = fs.open(distributedPath); - IOUtils.copyBytes(fsInput, lfsOutput); + synchronized (lock) { + copy(new Path(filePath), tmp, this.executable); } } catch (IOException e1) { throw new RuntimeException("Error copying a file from hdfs to the local fs", e1); @@ -122,27 +156,30 @@ public Path call() { return tmp; } } + /** * If no task is using this file after 5 seconds, clear it. */ private class DeleteProcess implements Runnable { private String name; + private String filePath; private JobID jobID; private int oldCount; - public DeleteProcess(String name, JobID jobID, int c) { + public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) { this.name = name; + this.filePath = e.filePath; this.jobID = jobID; this.oldCount = c; } - + @Override public void run() { synchronized (count) { if (count.get(new ImmutablePair(jobID, name)) != oldCount) { return; } } - Path tmp = getTempDir(jobID, name); + Path tmp = getTempDir(jobID, ""); try { if (lfs.exists(tmp)) { lfs.delete(tmp, true); diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java index 40b649a552424..2ea9ddfe5ff2b 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java @@ -24,8 +24,8 @@ import com.google.common.base.Charsets; import com.google.common.io.Files; +import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry; -import eu.stratosphere.core.fs.Path; import eu.stratosphere.core.fs.local.LocalFileSystem; import eu.stratosphere.nephele.jobgraph.JobID; @@ -67,13 +67,13 @@ public void createTmpCacheFile() { public void testFileReuseForNextTask() { JobID jobID = new JobID(); String filePath = f.toURI().toString(); - fileCache.createTmpFile("test_file", filePath, jobID); + fileCache.createTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException("Interrupted error", e); } - fileCache.deleteTmpFile("test_file", jobID); + fileCache.deleteTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID); try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -81,17 +81,17 @@ public void testFileReuseForNextTask() { } //new task comes after 1 second try { - Assert.assertTrue("Local cache file should not be deleted when another task comes in 5 seconds!", lfs.exists(fileCache.getTempDir(jobID, "test_file"))); + Assert.assertTrue("Local cache file should not be deleted when another task comes in 5 seconds!", lfs.exists(fileCache.getTempDir(jobID, "cacheFile"))); } catch (IOException e) { throw new RuntimeException("Interrupted error", e); } - fileCache.createTmpFile("test_file", filePath, jobID); + fileCache.createTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException("Interrupted error", e); } - fileCache.deleteTmpFile("test_file", jobID); + fileCache.deleteTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID); try { Thread.sleep(7000); } catch (InterruptedException e) { @@ -99,7 +99,7 @@ public void testFileReuseForNextTask() { } //no task comes in 7 seconds try { - Assert.assertTrue("Local cache file should be deleted when no task comes in 5 seconds!", !lfs.exists(fileCache.getTempDir(jobID, "test_file"))); + Assert.assertTrue("Local cache file should be deleted when no task comes in 5 seconds!", !lfs.exists(fileCache.getTempDir(jobID, "cacheFile"))); } catch (IOException e) { throw new RuntimeException("Interrupted error", e); } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java index 9fdcc424d882a..6e46f82e1d892 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java @@ -14,6 +14,7 @@ package eu.stratosphere.test.distributedCache; import eu.stratosphere.api.common.Plan; +import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry; import eu.stratosphere.api.java.record.operators.FileDataSink; import eu.stratosphere.api.java.record.operators.FileDataSource; import eu.stratosphere.api.java.record.functions.MapFunction; @@ -134,7 +135,7 @@ protected void preSubmit() throws Exception { protected Plan getTestJob() { Plan plan = getPlan(1 , textPath, resultPath); try { - plan.registerCachedFile(cachePath, "cache_test"); + plan.registerCachedFile("cache_test", new DistributedCacheEntry(cachePath, false)); } catch (IOException ex) { throw new RuntimeException(ex); }