Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import eu.stratosphere.api.common.aggregators.ConvergenceCriterion;
import eu.stratosphere.api.common.aggregators.LongSumAggregator;
import eu.stratosphere.api.common.cache.DistributedCache;
import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import eu.stratosphere.api.common.distributions.DataDistribution;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.compiler.CompilerException;
Expand Down Expand Up @@ -206,8 +207,8 @@ public JobGraph compileJobGraph(OptimizedPlan program) {
}

// add registered cache file into job configuration
for (Entry<String, String> e: program.getOriginalPactPlan().getCachedFiles()) {
DistributedCache.addCachedFile(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration());
for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration());
}
JobGraph graph = this.jobGraph;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;

import java.util.ArrayList;
import java.util.Calendar;
Expand Down Expand Up @@ -66,7 +67,7 @@ public class Plan implements Visitable<Operator<?>> {
*/
protected int maxNumberMachines;

protected HashMap<String, String> cacheFile = new HashMap<String, String>();
protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap();

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -301,36 +302,36 @@ public void accept(Visitor<Operator<?>> visitor) {

/**
* register cache files in program level
* @param filePath The files must be stored in a place that can be accessed from all workers (most commonly HDFS)
* @param entry containing all relevant information
* @param name user defined name of that file
* @throws java.io.IOException
*/
public void registerCachedFile(String filePath, String name) throws IOException {
public void registerCachedFile(String name, DistributedCacheEntry entry) throws IOException {
if (!this.cacheFile.containsKey(name)) {
try {
URI u = new URI(filePath);
URI u = new URI(entry.filePath);
if (!u.getPath().startsWith("/")) {
u = new URI(new File(filePath).getAbsolutePath());
u = new File(entry.filePath).toURI();
}
FileSystem fs = FileSystem.get(u);
if (fs.exists(new Path(u.getPath()))) {
this.cacheFile.put(name, u.toString());
this.cacheFile.put(name, new DistributedCacheEntry(u.toString(), entry.isExecutable));
} else {
throw new RuntimeException("File " + u.toString() + " doesn't exist.");
throw new IOException("File " + u.toString() + " doesn't exist.");
}
} catch (URISyntaxException ex) {
throw new RuntimeException("Invalid path: " + filePath, ex);
throw new IOException("Invalid path: " + entry.filePath, ex);
}
} else {
throw new RuntimeException("cache file " + name + "already exists!");
throw new IOException("cache file " + name + "already exists!");
}
}

/**
* return the registered caches files
* @return Set of (name, filePath) pairs
*/
public Set<Entry<String,String>> getCachedFiles() {
public Set<Entry<String,DistributedCacheEntry>> getCachedFiles() {
return this.cacheFile.entrySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,49 @@
import eu.stratosphere.core.fs.Path;

/**
* DistributedCache provides static method to write the registered cache files into job configuration or decode
* DistributedCache provides static methods to write the registered cache files into job configuration or decode
* them from job configuration. It also provides user access to the file locally.
*/
public class DistributedCache {

public static class DistributedCacheEntry {
public String filePath;
public Boolean isExecutable;

public DistributedCacheEntry(String filePath, Boolean isExecutable){
this.filePath=filePath;
this.isExecutable=isExecutable;
}
}

final static String CACHE_FILE_NUM = "DISTRIBUTED_CACHE_FILE_NUM";

final static String CACHE_FILE_NAME = "DISTRIBUTED_CACHE_FILE_NAME_";

final static String CACHE_FILE_PATH = "DISTRIBUTED_CACHE_FILE_PATH_";

final static String CACHE_FILE_EXE = "DISTRIBUTED_CACHE_FILE_EXE_";

public final static String TMP_PREFIX = "tmp_";

private Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();

public static void addCachedFile(String name, String filePath, Configuration conf) {
public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) {
int num = conf.getInteger(CACHE_FILE_NUM,0) + 1;
conf.setInteger(CACHE_FILE_NUM, num);
conf.setString(CACHE_FILE_NAME + num, name);
conf.setString(CACHE_FILE_PATH + num, filePath);
conf.setString(CACHE_FILE_PATH + num, e.filePath);
conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute());
}

public static Set<Entry<String,String>> getCachedFile(Configuration conf) {
Map<String, String> cacheFiles = new HashMap<String, String>();
int num = conf.getInteger(CACHE_FILE_NUM,0);
public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(Configuration conf) {
Map<String, DistributedCacheEntry> cacheFiles = new HashMap<String, DistributedCacheEntry>();
int num = conf.getInteger(CACHE_FILE_NUM, 0);
for (int i = 1; i <= num; i++) {
String name = conf.getString(CACHE_FILE_NAME + i, "");
String filePath = conf.getString(CACHE_FILE_PATH + i, "");
cacheFiles.put(name, filePath);
Boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable));
}
return cacheFiles.entrySet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.java.io.CollectionInputFormat;
import eu.stratosphere.api.java.io.CsvReader;
Expand Down Expand Up @@ -87,7 +88,7 @@ public abstract class ExecutionEnvironment {

private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();

private final List<Tuple2<String, String>> cacheFile = new ArrayList<Tuple2<String, String>>();
private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList();

private int degreeOfParallelism = -1;

Expand Down Expand Up @@ -542,8 +543,8 @@ public JobExecutionResult execute() throws Exception {
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files, or files in a distributed file system. The runtime will copy the files
* temporarily to a local cache, if needed.
* may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via
* {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access
Expand All @@ -554,7 +555,26 @@ public JobExecutionResult execute() throws Exception {
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
this.cacheFile.add(new Tuple2<String, String>(filePath, name));
registerCachedFile(filePath, name, false);
}

/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via
* {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access
* {@link eu.stratosphere.api.common.cache.DistributedCache} via
* {@link eu.stratosphere.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
* @param executable flag indicating whether the file should be executable
*/
public void registerCachedFile(String filePath, String name, boolean executable){
this.cacheFile.add(new Tuple2(name, new DistributedCacheEntry(name, executable)));
}

/**
Expand All @@ -565,7 +585,7 @@ public void registerCachedFile(String filePath, String name){
* @throws IOException Thrown if checks for existence and sanity fail.
*/
protected void registerCachedFilesWithPlan(Plan p) throws IOException {
for (Tuple2<String, String> entry : cacheFile) {
for (Tuple2<String, DistributedCacheEntry> entry : cacheFile) {
p.registerCachedFile(entry.f0, entry.f1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.security.UserGroupInformation;

import eu.stratosphere.api.common.cache.DistributedCache;
import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
Expand Down Expand Up @@ -666,7 +667,7 @@ public List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescripto

// retrieve the registered cache files from job configuration and create the local tmp file.
Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
for (Entry<String, String> e: DistributedCache.getCachedFile(tdd.getJobConfiguration())) {
for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
cpTasks.put(e.getKey(), cp);
}
Expand Down Expand Up @@ -801,8 +802,9 @@ private void unregisterTask(final ExecutionVertexID id) {
}

// remove the local tmp file for unregistered tasks.
for (Entry<String, String> e: DistributedCache.getCachedFile(task.getEnvironment().getJobConfiguration())) {
this.fileCache.deleteTmpFile(e.getKey(), task.getJobID());
for (Entry<String, DistributedCacheEntry> e: DistributedCache
.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
}
// Unregister task from the byte buffered channel manager
this.channelManager.unregister(id, task);
Expand Down
Loading