From 2a424a6120b7f1be8f357522a3fe964e77bd4cce Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 14 Oct 2016 15:15:50 +0200 Subject: [PATCH 1/2] [FLINK-4829] protect user accumulators against concurrent updates --- .../flink/runtime/accumulators/AccumulatorRegistry.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java index 41af2a99f9d67..44714e7169f69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** @@ -44,7 +45,8 @@ public class AccumulatorRegistry { new HashMap>(); /* User-defined Accumulator values stored for the executing task. */ - private final Map> userAccumulators = new HashMap<>(); + private final Map> userAccumulators = + new ConcurrentHashMap<>(4); /* The reporter reference that is handed to the reporting tasks. */ private final ReadWriteReporter reporter; From f162142b1d274895e16712e42f0f32a43e187db9 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 17 Oct 2016 14:19:00 +0200 Subject: [PATCH 2/2] [FLINK-4829] snapshot accumulators on a best-effort basis Heartbeats should not fail when accumulators could not be snapshotted. Instead, we should simply skip the reporting of the failed accumulator. Eventually, the accumulator will be reported; at the latest, when the job finishes. --- .../flink/runtime/taskmanager/TaskManager.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f8f333e2a5ef4..1017ea0cba261 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -35,6 +35,7 @@ import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} @@ -1335,9 +1336,15 @@ class TaskManager( runningTasks.asScala foreach { case (execID, task) => - val registry = task.getAccumulatorRegistry - val accumulators = registry.getSnapshot - accumulatorEvents.append(accumulators) + try { + val registry = task.getAccumulatorRegistry + val accumulators = registry.getSnapshot + accumulatorEvents.append(accumulators) + } catch { + case e: Exception => + log.warn("Failed to take accumulator snapshot for task {}.", + execID, ExceptionUtils.getRootCause(e)) + } } currentJobManager foreach {