From 358259d25edc25251073e26f47e3960a69990098 Mon Sep 17 00:00:00 2001 From: Sachin Goel Date: Sat, 1 Aug 2015 19:25:42 +0530 Subject: [PATCH] [FLINK-2458] [FLINK-2449] [runtime] Access distributed cache entries from Iteration contexts & use of distributed cache from Collection Environments This closes #970 --- .../util/AbstractRuntimeUDFContext.java | 13 +-- .../functions/util/RuntimeUDFContext.java | 9 +- .../common/operators/CollectionExecutor.java | 86 ++++++++++++++++--- .../functions/util/RuntimeUDFContextTest.java | 12 +-- .../api/common/io/RichInputFormatTest.java | 5 +- .../api/common/io/RichOutputFormatTest.java | 5 +- .../operators/GenericDataSinkBaseTest.java | 7 +- .../operators/GenericDataSourceBaseTest.java | 7 +- .../base/FlatMapOperatorCollectionTest.java | 4 +- .../operators/base/JoinOperatorBaseTest.java | 8 +- .../operators/base/MapOperatorTest.java | 7 +- .../base/PartitionMapOperatorTest.java | 6 +- .../base/CoGroupOperatorCollectionTest.java | 5 +- .../base/GroupReduceOperatorTest.java | 6 +- .../operators/base/JoinOperatorBaseTest.java | 6 +- .../operators/base/ReduceOperatorTest.java | 6 +- .../task/AbstractIterativePactTask.java | 9 +- .../util/DistributedRuntimeUDFContext.java | 6 -- .../flink/tez/runtime/RegularProcessor.java | 3 + .../aggregators/AggregatorsITCase.java | 45 +++++++++- 20 files changed, 185 insertions(+), 70 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 13d79e71f3362..71be1e1e4fa52 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -57,17 +57,6 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { private final DistributedCache distributedCache; - - public AbstractRuntimeUDFContext(String name, - int numParallelSubtasks, int subtaskIndex, - ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, - Map> accumulators) - { - this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, - accumulators, Collections.>emptyMap()); - } - public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, @@ -79,7 +68,7 @@ public AbstractRuntimeUDFContext(String name, this.subtaskIndex = subtaskIndex; this.userCodeClassLoader = userCodeClassLoader; this.executionConfig = executionConfig; - this.distributedCache = new DistributedCache(cpTasks); + this.distributedCache = new DistributedCache(Preconditions.checkNotNull(cpTasks)); this.accumulators = Preconditions.checkNotNull(accumulators); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java index 1689138c215e4..c582768370c3b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java @@ -37,18 +37,11 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap initializedBroadcastVars = new HashMap(); private final HashMap> uninitializedBroadcastVars = new HashMap>(); - - - public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, Map> accumulators) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators); - } - + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map> cpTasks, Map> accumulators) { super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks); } - @Override @SuppressWarnings("unchecked") diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index e8e001297ddff..b6d8128c647eb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -27,6 +27,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -37,6 +41,7 @@ import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.aggregators.AggregatorWithName; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; @@ -51,6 +56,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.types.Value; import org.apache.flink.util.Visitor; @@ -64,6 +71,8 @@ public class CollectionExecutor { private final Map, List> intermediateResults; private final Map> accumulators; + + private final Map> cachedFiles; private final Map previousAggregates; @@ -84,7 +93,7 @@ public CollectionExecutor(ExecutionConfig executionConfig) { this.accumulators = new HashMap>(); this.previousAggregates = new HashMap(); this.aggregators = new HashMap>(); - + this.cachedFiles = new HashMap>(); this.classLoader = getClass().getClassLoader(); } @@ -94,7 +103,7 @@ public CollectionExecutor(ExecutionConfig executionConfig) { public JobExecutionResult execute(Plan program) throws Exception { long startTime = System.currentTimeMillis(); - + initCache(program.getCachedFiles()); Collection> sinks = program.getDataSinks(); for (Operator sink : sinks) { execute(sink); @@ -104,7 +113,14 @@ public JobExecutionResult execute(Plan program) throws Exception { Map accumulatorResults = AccumulatorHelper.toResultMap(accumulators); return new JobExecutionResult(null, endTime - startTime, accumulatorResults); } - + + private void initCache(Set> files){ + for(Map.Entry file: files){ + Future doNothing = new CompletedFuture(new Path(file.getValue().filePath)); + cachedFiles.put(file.getKey(), doNothing); + } + }; + private List execute(Operator operator) throws Exception { return execute(operator, 0); } @@ -165,8 +181,8 @@ private void executeDataSink(GenericDataSinkBase sink, int superStep) th // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(typedSink.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) : - new IterationRuntimeUDFContext(typedSink.getName(), 1, 0, classLoader, executionConfig, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(typedSink.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) : + new IterationRuntimeUDFContext(typedSink.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators); } else { ctx = null; } @@ -181,8 +197,8 @@ private List executeDataSource(GenericDataSourceBase source, in // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(source.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) : - new IterationRuntimeUDFContext(source.getName(), 1, 0, classLoader, executionConfig, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(source.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) : + new IterationRuntimeUDFContext(source.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators); } else { ctx = null; } @@ -204,8 +220,10 @@ private List executeUnaryOperator(SingleInputOperator op // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass() + .getClassLoader(), executionConfig, cachedFiles, accumulators) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, + executionConfig, cachedFiles, accumulators); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -243,8 +261,10 @@ private List executeBinaryOperator(DualInputOperator> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -500,8 +520,9 @@ else if (op instanceof GenericDataSourceBase) { private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext { public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, - ExecutionConfig executionConfig, Map> accumulators) { - super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, accumulators); + ExecutionConfig executionConfig, Map> cpTasks, Map> accumulators) { + super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, cpTasks, accumulators); } @Override @@ -521,4 +542,43 @@ public T getPreviousIterationAggregate(String name) { return (T) previousAggregates.get(name); } } + + private static final class CompletedFuture implements Future{ + + private final Path result; + + public CompletedFuture(Path entry) { + try{ + LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem(); + result = entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry); + } catch (Exception e){ + throw new RuntimeException("DistributedCache supports only local files for Collection Environments"); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public Path get() throws InterruptedException, ExecutionException { + return result; + } + + @Override + public Path get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return get(); + } + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java index 9189d5b90014e..5e8f891b3b7eb 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java @@ -24,10 +24,12 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.core.fs.Path; import org.junit.Test; @@ -36,7 +38,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableNotFound() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(),new HashMap>()); try { ctx.getBroadcastVariable("some name"); @@ -66,7 +68,7 @@ public void testBroadcastVariableNotFound() { @Test public void testBroadcastVariableSimple() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4)); ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0)); @@ -100,7 +102,7 @@ public void testBroadcastVariableSimple() { @Test public void testBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -125,7 +127,7 @@ public void testBroadcastVariableWithInitializer() { @Test public void testResetBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -148,7 +150,7 @@ public void testResetBroadcastVariableWithInitializer() { @Test public void testBroadcastVariableWithInitializerAndMismatch() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java index 126a51117f6cc..2a9cef345bc01 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java @@ -20,10 +20,12 @@ package org.apache.flink.api.common.io; import java.util.HashMap; +import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; +import org.apache.flink.core.fs.Path; import org.apache.flink.types.Value; import org.junit.Assert; import org.junit.Test; @@ -36,8 +38,7 @@ public class RichInputFormatTest { @Test public void testCheckRuntimeContextAccess() { final SerializedInputFormat inputFormat = new SerializedInputFormat(); - inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1, - getClass().getClassLoader(), new ExecutionConfig(), new HashMap>())); + inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>())); Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java index 8d410391fdda7..10d03ab21e811 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java @@ -20,10 +20,12 @@ package org.apache.flink.api.common.io; import java.util.HashMap; +import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; +import org.apache.flink.core.fs.Path; import org.apache.flink.types.Value; import org.junit.Assert; import org.junit.Test; @@ -36,8 +38,7 @@ public class RichOutputFormatTest { @Test public void testCheckRuntimeContextAccess() { final SerializedOutputFormat inputFormat = new SerializedOutputFormat(); - inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1, - getClass().getClassLoader(), new ExecutionConfig(), new HashMap>())); + inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>())); Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java index e30e5361fabf5..9a4630545eb7d 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java @@ -26,10 +26,12 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.common.operators.util.TestRichOutputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.core.fs.Path; import org.apache.flink.types.Nothing; import org.junit.Test; import java.util.HashMap; +import java.util.concurrent.Future; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; @@ -87,15 +89,16 @@ public void testDataSourceWithRuntimeContext() { ExecutionConfig executionConfig = new ExecutionConfig(); final HashMap> accumulatorMap = new HashMap>(); + final HashMap> cpTasks = new HashMap<>(); executionConfig.disableObjectReuse(); in.reset(); - sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, accumulatorMap), executionConfig); + sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(out.output, asList(TestIOData.RICH_NAMES)); executionConfig.enableObjectReuse(); out.clear(); in.reset(); - sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, accumulatorMap), executionConfig); + sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(out.output, asList(TestIOData.RICH_NAMES)); } catch(Exception e){ e.printStackTrace(); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java index 64d33583ee7a1..50b8d805599d0 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java @@ -25,10 +25,12 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.common.operators.util.TestRichInputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.core.fs.Path; import org.junit.Test; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Future; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; @@ -73,13 +75,14 @@ public void testDataSourceWithRuntimeContext() { in, new OperatorInformation(BasicTypeInfo.STRING_TYPE_INFO), "testSource"); final HashMap> accumulatorMap = new HashMap>(); + final HashMap> cpTasks = new HashMap<>(); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, accumulatorMap), executionConfig); + List resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); in.reset(); executionConfig.enableObjectReuse(); - List resultRegular = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, accumulatorMap), executionConfig); + List resultRegular = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(asList(TestIOData.RICH_NAMES), resultMutableSafe); assertEquals(asList(TestIOData.RICH_NAMES), resultRegular); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 734324b373007..745cf09c0d586 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; @@ -36,6 +37,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Future; @SuppressWarnings("serial") public class FlatMapOperatorCollectionTest implements Serializable { @@ -74,7 +76,7 @@ private void testExecuteOnCollection(FlatMapFunction udf, List result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap>()), executionConfig); + .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); Assert.assertEquals(input.size(), result.size()); Assert.assertEquals(input, result); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index 98f75bc81a3a6..6d4ff33df227c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; import org.junit.Test; @@ -36,6 +37,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @SuppressWarnings("serial") @@ -117,11 +119,13 @@ public void join(String first, String second, Collector out) throws Exc try { final HashMap> accumulatorMap = new HashMap>(); + final HashMap> cpTasks = new HashMap<>(); + ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig); + List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig); + List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index 2c98a1701acc9..0dbe1b607214e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; @@ -36,6 +37,7 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.junit.Test; @SuppressWarnings("serial") @@ -105,11 +107,12 @@ public void close() throws Exception { List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); final HashMap> accumulatorMap = new HashMap>(); + final HashMap> cpTasks = new HashMap<>(); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 28c6d821f43af..47f30de889900 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -24,10 +24,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; @@ -80,9 +82,9 @@ public void close() throws Exception { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index 71a2eb7bea091..025fcfbfb9a7a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.builder.Tuple2Builder; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; @@ -40,6 +41,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Future; @SuppressWarnings("serial") public class CoGroupOperatorCollectionTest implements Serializable { @@ -71,7 +73,8 @@ public void testExecuteOnCollection() { ExecutionConfig executionConfig = new ExecutionConfig(); final HashMap> accumulators = new HashMap>(); - final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, accumulators); + final HashMap> cpTasks = new HashMap<>(); + final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, cpTasks, accumulators); { SumCoGroup udf1 = new SumCoGroup(); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index 08f4acd11f0c5..9e1684da9f075 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; import org.junit.Test; @@ -37,6 +38,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Arrays.asList; @@ -163,9 +165,9 @@ public void close() throws Exception { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index 21fcfb3b261a3..7390af28e9b85 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; import org.junit.Test; @@ -38,6 +39,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Future; @SuppressWarnings({ "unchecked", "serial" }) public class JoinOperatorBaseTest implements Serializable { @@ -105,9 +107,9 @@ public void join(Tuple3 first, Tuple2 try { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap>()), executionConfig); + List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap>()), executionConfig); + List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); assertEquals(expected, new HashSet>(resultSafe)); assertEquals(expected, new HashSet>(resultRegular)); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index 7cd9771eba701..29faf035c322f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.junit.Test; import java.util.ArrayList; @@ -35,6 +36,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Arrays.asList; @@ -140,9 +142,9 @@ public void close() throws Exception { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); Set> resultSetRegular = new HashSet>(resultRegular); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java index 67d8f56373bbb..efe74f9ef9469 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.core.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.aggregators.Aggregator; @@ -55,6 +56,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.Future; /** * The abstract base class for all tasks able to participate in an iteration. @@ -169,7 +171,8 @@ protected void closeLocalStrategiesAndCaches() { public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), this.accumulatorMap); + env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), + env.getDistributedCacheEntries(), this.accumulatorMap); } // -------------------------------------------------------------------------------------------- @@ -359,9 +362,9 @@ private TypeSerializer getOutputSerializer() { private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext { public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, + ExecutionConfig executionConfig, Map> cpTasks, Map> accumulatorMap) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulatorMap); + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java index 4b480baf25fdf..f74989ec28bfa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java @@ -41,12 +41,6 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap> broadcastVars = new HashMap>(); - - public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, Map> accumulators) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators); - } - public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map> cpTasks, Map> accumulators) { super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks); diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java index 5bfa49be219ad..b117bab2e504a 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.operators.PactDriver; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.tez.util.EncodingUtils; @@ -40,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; public class RegularProcessor extends AbstractLogicalIOProcessor { @@ -70,6 +72,7 @@ public void initialize() throws Exception { getContext().getTaskIndex(), getClass().getClassLoader(), new ExecutionConfig(), + new HashMap>(), new HashMap>()); this.task = new TezTask(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java index 44544d37cbac5..8b98b29d9efc6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java @@ -18,8 +18,14 @@ package org.apache.flink.test.iterative.aggregators; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; import java.util.Random; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.After; import org.junit.Assert; @@ -44,6 +50,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.junit.Assert.assertEquals; + /** * Test the functionality of aggregators in bulk and delta iterative cases. */ @@ -54,6 +62,10 @@ public class AggregatorsITCase extends MultipleProgramsTestBase { private static final int parallelism = 2; private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements"; + private static String testString = "Et tu, Brute?"; + private static String testName = "testing_caesar"; + private static String testPath; + public AggregatorsITCase(TestExecutionMode mode){ super(mode); } @@ -66,7 +78,9 @@ public AggregatorsITCase(TestExecutionMode mode){ @Before public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); + File tempFile = tempFolder.newFile(); + testPath = tempFile.toString(); + resultPath = tempFile.toURI().toString(); } @After @@ -74,6 +88,35 @@ public void after() throws Exception{ compareResultsByLinesInMemory(expected, resultPath); } + @Test + public void testDistributedCacheWithIterations() throws Exception{ + File tempFile = new File(testPath); + FileWriter writer = new FileWriter(tempFile); + writer.write(testString); + writer.close(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.registerCachedFile(resultPath, testName); + + IterativeDataSet solution = env.fromElements(1L).iterate(2); + solution.closeWith(env.generateSequence(1,2).filter(new RichFilterFunction() { + @Override + public void open(Configuration parameters) throws Exception{ + File file = getRuntimeContext().getDistributedCache().getFile(testName); + BufferedReader reader = new BufferedReader(new FileReader(file)); + String output = reader.readLine(); + reader.close(); + assertEquals(output, testString); + } + @Override + public boolean filter(Long value) throws Exception { + return false; + } + }).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat()); + env.execute(); + expected = testString; // this will be a useless verification now. + } + @Test public void testAggregatorWithoutParameterForIterate() throws Exception { /*