From 646323a3637294859ade4b082c3af4b377ccdcc2 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 13 Oct 2014 20:07:59 +0200 Subject: [PATCH] [FLINK-1152] More robust resource release when tasks are canceled during deployment --- .../flink/runtime/filecache/FileCache.java | 26 ++-- .../runtime/io/network/ChannelManager.java | 3 + .../flink/runtime/taskmanager/Task.java | 5 +- .../runtime/taskmanager/TaskManager.java | 140 +++++++++--------- 4 files changed, 94 insertions(+), 80 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java index c6832f758b20c..de8d59ca82e2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.filecache; import java.io.File; @@ -44,6 +43,8 @@ import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The FileCache is used to create the local files for the registered cache files when a task is deployed. @@ -52,9 +53,12 @@ */ public class FileCache { - private LocalFileSystem lfs = new LocalFileSystem(); - + private static final Logger LOG = LoggerFactory.getLogger(FileCache.class); + private static final Object lock = new Object(); + + + private LocalFileSystem lfs = new LocalFileSystem(); private Map, Integer> count = new HashMap, Integer>(); @@ -99,10 +103,11 @@ public Path getTempDir(JobID jobID, String childPath) { } public void shutdown() { - if (this.executorService != null) { - this.executorService.shutdown(); + ScheduledExecutorService es = this.executorService; + if (es != null) { + es.shutdown(); try { - this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS); + es.awaitTermination(5000L, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Error shutting down the file cache", e); } @@ -133,6 +138,7 @@ public static void copy(Path sourcePath, Path targetPath, boolean executable) th IOUtils.copyBytes(fsInput, lfsOutput); new File(targetPath.toString()).setExecutable(executable); } catch (IOException ioe) { + LOG.error("could not copy file to local file cache.", ioe); } } } @@ -159,8 +165,8 @@ public Path call() { synchronized (lock) { copy(new Path(filePath), tmp, this.executable); } - } catch (IOException e1) { - throw new RuntimeException("Error copying a file from hdfs to the local fs", e1); + } catch (IOException e) { + LOG.error("Could not copy file to local file cache.", e); } return tmp; } @@ -192,8 +198,8 @@ public void run() { if (lfs.exists(tmp)) { lfs.delete(tmp, true); } - } catch (IOException e1) { - throw new RuntimeException("Error deleting the file", e1); + } catch (IOException e) { + LOG.error("Could not delete file from local file cache.", e); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java index 38d40ee67afb4..59084e98e6e8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java @@ -197,6 +197,9 @@ public void register(Task task) throws InsufficientResourcesException { */ public void unregister(ExecutionAttemptID executionId, Task task) { final Environment environment = task.getEnvironment(); + if (environment == null) { + return; + } // destroy and remove OUTPUT channels from registered channels and cache for (ChannelID id : environment.getOutputChannelIDs()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index d393e2ebc64ff..12c55bc7279d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -331,8 +331,9 @@ public boolean startExecution() { * the central memory manager */ public void unregisterMemoryManager(MemoryManager memoryManager) { - if (memoryManager != null) { - memoryManager.releaseAll(this.environment.getInvokable()); + RuntimeEnvironment env = this.environment; + if (memoryManager != null && env != null) { + memoryManager.releaseAll(env.getInvokable()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java index 44327e1d97d0f..1498a1c5cdf84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java @@ -570,87 +570,80 @@ public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) { final int taskIndex = tdd.getIndexInSubtaskGroup(); final int numSubtasks = tdd.getCurrentNumberOfSubtasks(); + Task task = null; boolean jarsRegistered = false; + boolean success = false; try { // Now register data with the library manager libraryCacheManager.register(jobID, tdd.getRequiredJarFiles()); - - // library and classloader issues first jarsRegistered = true; - + + // library and classloader issues first final ClassLoader userCodeClassLoader = libraryCacheManager.getClassLoader(jobID); if (userCodeClassLoader == null) { throw new Exception("No user code ClassLoader available."); } - final Task task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(), this); + task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(), this); if (this.runningTasks.putIfAbsent(executionId, task) != null) { throw new Exception("TaskManager contains already a task with executionId " + executionId); } - // another try/finally-success block to ensure that the tasks are removed properly in case of an exception - boolean success = false; - try { - final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId); - final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy); - task.setEnvironment(env); - - // register the task with the network stack and profilers - this.channelManager.register(task); - - final Configuration jobConfig = tdd.getJobConfiguration(); - - boolean enableProfiling = this.profiler != null && jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true); - - // Register environment, input, and output gates for profiling - if (enableProfiling) { - task.registerProfiler(this.profiler, jobConfig); - } - - // now that the task is successfully created and registered, we can start copying the - // distributed cache temp files - Map> cpTasks = new HashMap>(); - for (Entry e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) { - FutureTask cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID); - cpTasks.put(e.getKey(), cp); - } - env.addCopyTasksForCacheFile(cpTasks); - - if (!task.startExecution()) { - throw new Exception("Cannot start task. Task was canceled or failed."); - } + final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId); + final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy); + task.setEnvironment(env); - // final check that we can go (we do this after the registration, so the the "happen's before" - // relationship ensures that either the shutdown removes this task, or we are aware of the shutdown - if (shutdownStarted.get()) { - throw new Exception("Task Manager is shut down."); - } - - success = true; - return new TaskOperationResult(executionId, true); + // register the task with the network stack and profilers + this.channelManager.register(task); + + final Configuration jobConfig = tdd.getJobConfiguration(); + + boolean enableProfiling = this.profiler != null && jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true); + + // Register environment, input, and output gates for profiling + if (enableProfiling) { + task.registerProfiler(this.profiler, jobConfig); } - finally { - if (!success) { - // remove task - this.runningTasks.remove(executionId); - - // delete distributed cache files - for (Entry e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) { - this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), jobID); - } - } + + // now that the task is successfully created and registered, we can start copying the + // distributed cache temp files + Map> cpTasks = new HashMap>(); + for (Entry e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) { + FutureTask cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID); + cpTasks.put(e.getKey(), cp); } - } - catch (Throwable t) { - LOG.error("Could not instantiate task", t); + env.addCopyTasksForCacheFile(cpTasks); - if (jarsRegistered) { - libraryCacheManager.unregister(jobID); + if (!task.startExecution()) { + return new TaskOperationResult(executionId, false, "Task was canceled or failed."); + } + + // final check that we can go (we do this after the registration, so the the "happen's before" + // relationship ensures that either the shutdown removes this task, or we are aware of the shutdown + if (shutdownStarted.get()) { + throw new Exception("Task Manager is shut down."); } + success = true; + return new TaskOperationResult(executionId, true); + } + catch (Throwable t) { + LOG.error("Could not instantiate task", t); return new TaskOperationResult(executionId, false, ExceptionUtils.stringifyException(t)); } + finally { + if (!success) { + this.runningTasks.remove(executionId); + + if (task != null) { + removeAllTaskResources(task); + } + if (jarsRegistered) { + libraryCacheManager.unregister(jobID); + } + } + } } /** @@ -660,7 +653,6 @@ public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) { * the ID of the task to be unregistered */ private void unregisterTask(ExecutionAttemptID executionId) { - // Task de-registration must be atomic final Task task = this.runningTasks.remove(executionId); if (task == null) { @@ -670,22 +662,34 @@ private void unregisterTask(ExecutionAttemptID executionId) { return; } - // remove the local tmp file for unregistered tasks. - for (Entry e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) { - this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID()); - } - + removeAllTaskResources(task); + + // Unregister task from library cache manager + libraryCacheManager.unregister(task.getJobID()); + } + + private void removeAllTaskResources(Task task) { // Unregister task from the byte buffered channel manager - this.channelManager.unregister(executionId, task); + this.channelManager.unregister(task.getExecutionId(), task); // Unregister task from profiling task.unregisterProfiler(this.profiler); // Unregister task from memory manager task.unregisterMemoryManager(this.memoryManager); - - // Unregister task from library cache manager - libraryCacheManager.unregister(task.getJobID()); + + // remove the local tmp file for unregistered tasks. + try { + RuntimeEnvironment re = task.getEnvironment(); + if (re != null) { + for (Entry e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) { + this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID()); + } + } + } + catch (Throwable t) { + LOG.error("Error cleaning up local files from the distributed cache.", t); + } } public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable optionalError) {