Skip to content

Commit

Permalink
[FLINK-1968] [runtime] Clean up and improve the distributed cache.
Browse files Browse the repository at this point in the history
 - Gives a proper exception when a non-cached file is accessed
 - Forwards I/O exceptions that happen during file transfer, rather than only returning null when transfer failed
 - Consistently keeps reference counts and copies only when needed
 - Properly removes all files when shutdown
 - Uses a shutdown hook to remove files when process is killed
  • Loading branch information
StephanEwen committed May 11, 2015
1 parent acca10e commit 1c8d866
Show file tree
Hide file tree
Showing 11 changed files with 461 additions and 224 deletions.
Expand Up @@ -21,11 +21,13 @@




import java.io.File; import java.io.File;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.FutureTask; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;


import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
Expand All @@ -37,6 +39,7 @@
public class DistributedCache { public class DistributedCache {


public static class DistributedCacheEntry { public static class DistributedCacheEntry {

public String filePath; public String filePath;
public Boolean isExecutable; public Boolean isExecutable;


Expand All @@ -45,19 +48,45 @@ public DistributedCacheEntry(String filePath, Boolean isExecutable){
this.isExecutable=isExecutable; this.isExecutable=isExecutable;
} }
} }

final static String CACHE_FILE_NUM = "DISTRIBUTED_CACHE_FILE_NUM";

final static String CACHE_FILE_NAME = "DISTRIBUTED_CACHE_FILE_NAME_";

final static String CACHE_FILE_PATH = "DISTRIBUTED_CACHE_FILE_PATH_";


final static String CACHE_FILE_EXE = "DISTRIBUTED_CACHE_FILE_EXE_"; // ------------------------------------------------------------------------


public final static String TMP_PREFIX = "tmp_"; private final Map<String, Future<Path>> cacheCopyTasks;


private Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>(); public DistributedCache(Map<String, Future<Path>> cacheCopyTasks) {
this.cacheCopyTasks = cacheCopyTasks;
}

// ------------------------------------------------------------------------


public File getFile(String name) {
if (name == null) {
throw new NullPointerException("name must not be null");
}

Future<Path> future = cacheCopyTasks.get(name);
if (future == null) {
throw new IllegalArgumentException("File with name '" + name + "' is not available." +
" Did you forget to register the file?");
}

try {
Path tmp = future.get();
return new File(tmp.toString());
}
catch (ExecutionException e) {
throw new RuntimeException("An error occurred while copying the file.", e.getCause());
}
catch (Exception e) {
throw new RuntimeException("Error while getting the file registered under '" + name +
"' from the distributed cache", e);
}
}

// ------------------------------------------------------------------------
// Utilities to read/write cache files from/to the configuration
// ------------------------------------------------------------------------

public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) { public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) {
int num = conf.getInteger(CACHE_FILE_NUM,0) + 1; int num = conf.getInteger(CACHE_FILE_NUM,0) + 1;
conf.setInteger(CACHE_FILE_NUM, num); conf.setInteger(CACHE_FILE_NUM, num);
Expand All @@ -67,29 +96,26 @@ public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, C
} }


public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(Configuration conf) { public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(Configuration conf) {
Map<String, DistributedCacheEntry> cacheFiles = new HashMap<String, DistributedCacheEntry>();
int num = conf.getInteger(CACHE_FILE_NUM, 0); int num = conf.getInteger(CACHE_FILE_NUM, 0);
if (num == 0) {
return Collections.emptySet();
}

Map<String, DistributedCacheEntry> cacheFiles = new HashMap<String, DistributedCacheEntry>();
for (int i = 1; i <= num; i++) { for (int i = 1; i <= num; i++) {
String name = conf.getString(CACHE_FILE_NAME + i, ""); String name = conf.getString(CACHE_FILE_NAME + i, null);
String filePath = conf.getString(CACHE_FILE_PATH + i, ""); String filePath = conf.getString(CACHE_FILE_PATH + i, null);
Boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false); Boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable)); cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable));
} }
return cacheFiles.entrySet(); return cacheFiles.entrySet();
} }


public void setCopyTasks(Map<String, FutureTask<Path>> cpTasks) { private static final String CACHE_FILE_NUM = "DISTRIBUTED_CACHE_FILE_NUM";
this.cacheCopyTasks = cpTasks;
}


public File getFile(String name) { private static final String CACHE_FILE_NAME = "DISTRIBUTED_CACHE_FILE_NAME_";
Path tmp = null;
//The FutureTask.get() method will block until the file is ready. private static final String CACHE_FILE_PATH = "DISTRIBUTED_CACHE_FILE_PATH_";
try {
tmp = cacheCopyTasks.get(name).get(); private static final String CACHE_FILE_EXE = "DISTRIBUTED_CACHE_FILE_EXE_";
} catch (Exception e) {
throw new RuntimeException("Error while getting file from distributed cache", e);
}
return new File(tmp.toString());
}
} }
Expand Up @@ -19,9 +19,10 @@
package org.apache.flink.api.common.functions.util; package org.apache.flink.api.common.functions.util;


import java.io.Serializable; import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.FutureTask; import java.util.concurrent.Future;


import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.Accumulator;
Expand Down Expand Up @@ -51,20 +52,30 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {


private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>(); private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();


private final DistributedCache distributedCache = new DistributedCache(); private final DistributedCache distributedCache;




public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) { public AbstractRuntimeUDFContext(String name,
int numParallelSubtasks, int subtaskIndex,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig)
{
this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig,
Collections.<String, Future<Path>>emptyMap());
}

public AbstractRuntimeUDFContext(String name,
int numParallelSubtasks, int subtaskIndex,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks)
{
this.name = name; this.name = name;
this.numParallelSubtasks = numParallelSubtasks; this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex; this.subtaskIndex = subtaskIndex;
this.userCodeClassLoader = userCodeClassLoader; this.userCodeClassLoader = userCodeClassLoader;
this.executionConfig = executionConfig; this.executionConfig = executionConfig;
} this.distributedCache = new DistributedCache(cpTasks);

public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, FutureTask<Path>> cpTasks) {
this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
this.distributedCache.setCopyTasks(cpTasks);
} }


@Override @Override
Expand Down
Expand Up @@ -21,7 +21,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.FutureTask; import java.util.concurrent.Future;


import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
Expand All @@ -42,7 +42,7 @@ public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex,
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig); super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
} }


public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, FutureTask<Path>> cpTasks) { public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks); super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks);
} }


Expand Down
18 changes: 8 additions & 10 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Expand Up @@ -168,17 +168,15 @@ public int hashCode() {
* @throws IOException * @throws IOException
* thrown if a reference to the file system instance could not be obtained * thrown if a reference to the file system instance could not be obtained
*/ */
public static FileSystem getLocalFileSystem() throws IOException { public static FileSystem getLocalFileSystem() {

// this should really never fail.
URI localUri;

try { try {
localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///"); URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
} catch (URISyntaxException e) { return get(localUri);
throw new IOException("Cannot create URI for local file system"); }
catch (Exception e) {
throw new RuntimeException("Cannot create URI for local file system");
} }

return get(localUri);
} }


/** /**
Expand All @@ -193,7 +191,7 @@ public static FileSystem getLocalFileSystem() throws IOException {
* thrown if a reference to the file system instance could not be obtained * thrown if a reference to the file system instance could not be obtained
*/ */
public static FileSystem get(URI uri) throws IOException { public static FileSystem get(URI uri) throws IOException {
FileSystem fs = null; FileSystem fs;


synchronized (SYNCHRONIZATION_OBJECT) { synchronized (SYNCHRONIZATION_OBJECT) {


Expand Down
Expand Up @@ -18,58 +18,62 @@


package org.apache.flink.runtime.execution; package org.apache.flink.runtime.execution;


import akka.actor.ActorRef;
import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.memorymanager.MemoryManager;


import java.util.Map; import java.util.Map;
import java.util.concurrent.FutureTask; import java.util.concurrent.Future;


/** /**
* The user code of every task runs inside an <code>Environment</code> object. * The Environment gives the code executed in a task access to the task's properties
* The environment provides important services to the task. It keeps track of * (such as name, parallelism), the configurations, the data stream readers and writers,
* setting up the communication channels and provides access to input splits, * as well as the various components that are provided by the TaskManager, such as
* memory manager, etc. * memory manager, I/O manager, ...
*/ */
public interface Environment { public interface Environment {


/** /**
* Returns the ID of the job from the original job graph. It is used by the library cache manager to find the * Returns the ID of the job that the task belongs to.
* required
* libraries for executing the assigned Nephele task.
* *
* @return the ID of the job from the original job graph * @return the ID of the job from the original job graph
*/ */
JobID getJobID(); JobID getJobID();


/** /**
* Gets the ID of the jobVertex that this task corresponds to. * Gets the ID of the JobVertex for which this task executes a parallel subtask.
* *
* @return The JobVertexID of this task. * @return The JobVertexID of this task.
*/ */
JobVertexID getJobVertexId(); JobVertexID getJobVertexId();


/** /**
* Returns the task configuration object which was attached to the original JobVertex. * Gets the ID of the task execution attempt.
* *
* @return the task configuration object which was attached to the original JobVertex. * @return The ID of the task execution attempt.
*/
ExecutionAttemptID getExecutionId();

/**
* Returns the task-wide configuration object, originally attache to the job vertex.
*
* @return The task-wide configuration
*/ */
Configuration getTaskConfiguration(); Configuration getTaskConfiguration();


/** /**
* Returns the job configuration object which was attached to the original {@link JobGraph}. * Returns the job-wide configuration object that was attached to the JobGraph.
* *
* @return the job configuration object which was attached to the original {@link JobGraph} * @return The job-wide configuration
*/ */
Configuration getJobConfiguration(); Configuration getJobConfiguration();


Expand Down Expand Up @@ -132,7 +136,7 @@ public interface Environment {
*/ */
ClassLoader getUserClassLoader(); ClassLoader getUserClassLoader();


Map<String, FutureTask<Path>> getCopyTask(); Map<String, Future<Path>> getDistributedCacheEntries();


BroadcastVariableManager getBroadcastVariableManager(); BroadcastVariableManager getBroadcastVariableManager();


Expand All @@ -155,13 +159,4 @@ public interface Environment {


InputGate[] getAllInputGates(); InputGate[] getAllInputGates();



/**
* Returns the proxy object for the accumulator protocol.
*/
// THIS DOES NOT BELONG HERE, THIS TOTALLY BREAKS COMPONENTIZATION.
// THE EXECUTED TASKS HAVE BEEN KEPT INDEPENDENT OF ANY RPC OR ACTOR
// COMMUNICATION !!!
ActorRef getJobManager();

} }

0 comments on commit 1c8d866

Please sign in to comment.