Skip to content

Commit

Permalink
[FLINK-2292][FLINK-1573] add live per-task accumulators
Browse files Browse the repository at this point in the history
This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.

This closes #896.
  • Loading branch information
mxm committed Jul 15, 2015
1 parent d592ee6 commit 8261ed5
Show file tree
Hide file tree
Showing 84 changed files with 1,362 additions and 599 deletions.
Expand Up @@ -40,7 +40,6 @@
* client
*/
public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {

/**
* @param value
* The value to add to the accumulator object
Expand Down
Expand Up @@ -20,8 +20,8 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
Expand Down Expand Up @@ -100,10 +100,12 @@ public interface RuntimeContext {
<V, A extends Serializable> Accumulator<V, A> getAccumulator(String name);

/**
* For system internal usage only. Use getAccumulator(...) to obtain a
* accumulator. Use this as read-only.
* Returns a map of all registered accumulators for this task.
* The returned map must not be modified.
* @deprecated Use getAccumulator(..) to obtain the value of an accumulator.
*/
HashMap<String, Accumulator<?, ?>> getAllAccumulators();
@Deprecated
Map<String, Accumulator<?, ?>> getAllAccumulators();

/**
* Convenience function to create a counter object for integers.
Expand Down
Expand Up @@ -21,10 +21,10 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;

import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
Expand Down Expand Up @@ -53,32 +53,34 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {

private final ExecutionConfig executionConfig;

private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
private final Map<String, Accumulator<?, ?>> accumulators;

private final DistributedCache distributedCache;


public AbstractRuntimeUDFContext(String name,
int numParallelSubtasks, int subtaskIndex,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig)
ExecutionConfig executionConfig,
Map<String, Accumulator<?,?>> accumulators)
{
this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig,
Collections.<String, Future<Path>>emptyMap());
accumulators, Collections.<String, Future<Path>>emptyMap());
}

public AbstractRuntimeUDFContext(String name,
int numParallelSubtasks, int subtaskIndex,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks)
{
Map<String, Accumulator<?,?>> accumulators,
Map<String, Future<Path>> cpTasks) {
this.name = name;
this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex;
this.userCodeClassLoader = userCodeClassLoader;
this.executionConfig = executionConfig;
this.distributedCache = new DistributedCache(cpTasks);
this.accumulators = Preconditions.checkNotNull(accumulators);
}

@Override
Expand Down Expand Up @@ -137,8 +139,8 @@ public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name)
}

@Override
public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
return this.accumulators;
public Map<String, Accumulator<?, ?>> getAllAccumulators() {
return Collections.unmodifiableMap(this.accumulators);
}

@Override
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.Future;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
Expand All @@ -38,12 +39,14 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();


public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators);
}

public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks);
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);
}


Expand Down
Expand Up @@ -184,8 +184,8 @@ private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> op
// build the runtime context and compute broadcast variables, if necessary
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators);

for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
Expand All @@ -197,9 +197,6 @@ private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> op

List<OUT> result = typedOp.executeOnCollections(inputData, ctx, executionConfig);

if (ctx != null) {
AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators());
}
return result;
}

Expand All @@ -226,8 +223,8 @@ private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?,
// build the runtime context and compute broadcast variables, if necessary
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators);

for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
Expand All @@ -239,9 +236,6 @@ private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?,

List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig);

if (ctx != null) {
AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators());
}
return result;
}

Expand Down Expand Up @@ -485,8 +479,9 @@ else if (op instanceof GenericDataSourceBase) {

private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {

public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, ExecutionConfig executionConfig) {
super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig);
public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader,
ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, accumulators);
}

@Override
Expand Down
Expand Up @@ -22,9 +22,11 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.junit.Test;

Expand All @@ -34,7 +36,7 @@ public class RuntimeUDFContextTest {
@Test
public void testBroadcastVariableNotFound() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());

try {
ctx.getBroadcastVariable("some name");
Expand Down Expand Up @@ -64,7 +66,7 @@ public void testBroadcastVariableNotFound() {
@Test
public void testBroadcastVariableSimple() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
Expand Down Expand Up @@ -98,7 +100,7 @@ public void testBroadcastVariableSimple() {
@Test
public void testBroadcastVariableWithInitializer() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand All @@ -123,7 +125,7 @@ public void testBroadcastVariableWithInitializer() {
@Test
public void testResetBroadcastVariableWithInitializer() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand All @@ -146,7 +148,7 @@ public void testResetBroadcastVariableWithInitializer() {
@Test
public void testBroadcastVariableWithInitializerAndMismatch() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.operators.base;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
Expand All @@ -33,6 +34,7 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

@SuppressWarnings("serial")
Expand Down Expand Up @@ -72,7 +74,7 @@ private void testExecuteOnCollection(FlatMapFunction<String, String> udf, List<S
}
// run on collections
final List<String> result = getTestFlatMapOperator(udf)
.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig), executionConfig);
.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);

Assert.assertEquals(input.size(), result.size());
Assert.assertEquals(input, result);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.*;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
Expand All @@ -33,6 +34,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -114,11 +116,12 @@ public void join(String first, String second, Collector<Integer> out) throws Exc


try {
final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableObjectReuse();
List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
executionConfig.enableObjectReuse();
List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);

assertEquals(expected, resultSafe);
assertEquals(expected, resultRegular);
Expand Down
Expand Up @@ -22,10 +22,12 @@
import static java.util.Arrays.asList;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
Expand Down Expand Up @@ -102,11 +104,12 @@ public void close() throws Exception {
parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName);

List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableObjectReuse();
List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
executionConfig.enableObjectReuse();
List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);

assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
Expand Down
Expand Up @@ -22,10 +22,12 @@
import static java.util.Arrays.asList;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
Expand Down Expand Up @@ -78,9 +80,9 @@ public void close() throws Exception {

ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableObjectReuse();
List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
executionConfig.enableObjectReuse();
List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);

assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
Expand Down
Expand Up @@ -60,7 +60,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// get input data
DataSet<String> text = getTextDataSet(env);

Expand Down
18 changes: 10 additions & 8 deletions flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
Expand Up @@ -408,14 +408,16 @@ public List<T> collect() throws Exception {
JobExecutionResult res = getExecutionEnvironment().execute();

ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
try {
return SerializedListAccumulator.deserializeList(accResult, serializer);
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot find type class of collected data type.", e);
}
catch (IOException e) {
throw new RuntimeException("Serialization error while deserializing collected data", e);
if (accResult != null) {
try {
return SerializedListAccumulator.deserializeList(accResult, serializer);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot find type class of collected data type.", e);
} catch (IOException e) {
throw new RuntimeException("Serialization error while deserializing collected data", e);
}
} else {
throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
}
}

Expand Down

0 comments on commit 8261ed5

Please sign in to comment.