From 11557c004450bcbbe680f1575f0e41d164424eae Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 10 Feb 2017 16:11:08 +0100 Subject: [PATCH 1/6] [docs] improve some documentation around network buffers --- .../SpanningRecordSerializer.java | 2 +- .../io/network/buffer/LocalBufferPool.java | 32 ++++++++++++++----- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index cb5665b0077c9..6c541a9b72357 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -43,7 +43,7 @@ public class SpanningRecordSerializer implements R /** Intermediate data serialization */ private final DataOutputSerializer serializationBuffer; - /** Intermediate buffer for data serialization */ + /** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}) */ private ByteBuffer dataBuffer; /** Intermediate buffer for length serialization */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 86e68704897d8..b4ea2723a8fca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -45,30 +45,46 @@ */ class LocalBufferPool implements BufferPool { + /** Global network buffer pool to get buffers from. */ private final NetworkBufferPool networkBufferPool; - // The minimum number of required segments for this pool + /** The minimum number of required segments for this pool */ private final int numberOfRequiredMemorySegments; - // The currently available memory segments. These are segments, which have been requested from - // the network buffer pool and are currently not handed out as Buffer instances. + /** + * The currently available memory segments. These are segments, which have been requested from + * the network buffer pool and are currently not handed out as Buffer instances. + */ private final Queue availableMemorySegments = new ArrayDeque(); - // Buffer availability listeners, which need to be notified when a Buffer becomes available. - // Listeners can only be registered at a time/state where no Buffer instance was available. + /** + * Buffer availability listeners, which need to be notified when a Buffer becomes available. + * Listeners can only be registered at a time/state where no Buffer instance was available. + */ private final Queue> registeredListeners = new ArrayDeque>(); - // The current size of this pool + /** The current size of this pool */ private int currentPoolSize; - // Number of all memory segments, which have been requested from the network buffer pool and are - // somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments). + /** + * Number of all memory segments, which have been requested from the network buffer pool and are + * somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments). + */ private int numberOfRequestedMemorySegments; private boolean isDestroyed; private BufferPoolOwner owner; + /** + * Local buffer pool based on the given networkBufferPool with a minimal number of + * network buffers being available. + * + * @param networkBufferPool + * global network buffer pool to get buffers from + * @param numberOfRequiredMemorySegments + * minimum number of network buffers + */ LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) { this.networkBufferPool = networkBufferPool; this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments; From cd999061d04ae803c79473241ac1f9b39c1f2731 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 10 Feb 2017 16:12:19 +0100 Subject: [PATCH 2/6] [hotfix][network] add some assertions documenting on which locks we rely --- .../flink/runtime/io/network/buffer/LocalBufferPool.java | 4 ++++ .../flink/runtime/io/network/buffer/NetworkBufferPool.java | 2 ++ 2 files changed, 6 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index b4ea2723a8fca..d6a4cf79e0e37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -281,11 +281,15 @@ public String toString() { // ------------------------------------------------------------------------ private void returnMemorySegment(MemorySegment segment) { + assert Thread.holdsLock(availableMemorySegments); + numberOfRequestedMemorySegments--; networkBufferPool.recycle(segment); } private void returnExcessMemorySegments() { + assert Thread.holdsLock(availableMemorySegments); + while (numberOfRequestedMemorySegments > currentPoolSize) { MemorySegment segment = availableMemorySegments.poll(); if (segment == null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index dc23341c93909..e601ac512176f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -266,6 +266,8 @@ public void destroyAllBufferPools() { // Must be called from synchronized block private void redistributeBuffers() throws IOException { + assert Thread.holdsLock(factoryLock); + int numManagedBufferPools = managedBufferPools.size(); if (numManagedBufferPools == 0) { From 8f529bb3f42916c816c5091228569952917ad9b5 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 1 Mar 2017 14:33:44 +0100 Subject: [PATCH 3/6] [FLINK-4545] remove fixed-size BufferPool instances These were unused except for unit tests and will be replaced with bounded BufferPool instances. --- .../BackPressureStatsTrackerITCase.java | 2 +- .../io/network/NetworkEnvironment.java | 4 +- .../io/network/buffer/BufferPoolFactory.java | 4 +- .../io/network/buffer/NetworkBufferPool.java | 22 ++------ .../netty/PartitionRequestServerHandler.java | 2 +- .../io/network/MockNetworkEnvironment.java | 2 +- .../network/api/writer/RecordWriterTest.java | 2 +- .../network/buffer/BufferPoolFactoryTest.java | 31 +++-------- .../network/buffer/NetworkBufferPoolTest.java | 55 ++++++------------- .../consumer/LocalInputChannelTest.java | 4 +- .../io/BarrierBufferMassiveRandomTest.java | 4 +- 11 files changed, 41 insertions(+), 91 deletions(-) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java index 30a86a26c8749..194312900fa04 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -108,7 +108,7 @@ public void testBackPressuredProducer() throws Exception { // // 1) Consume all buffers at first (no buffers for the test task) // - testBufferPool = networkBufferPool.createBufferPool(1, false); + testBufferPool = networkBufferPool.createBufferPool(1); final List buffers = new ArrayList<>(); while (true) { Buffer buffer = testBufferPool.requestBuffer(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 5cf2c263f3b8b..8e85ffea246de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -171,7 +171,7 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false); + bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions()); partition.registerBufferPool(bufferPool); resultPartitionManager.registerResultPartition(partition); @@ -198,7 +198,7 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false); + bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels()); gate.setBufferPool(bufferPool); } catch (Throwable t) { if (bufferPool != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java index 23321f41dce81..e953158913d92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java @@ -29,9 +29,9 @@ public interface BufferPoolFactory { * Tries to create a buffer pool, which is guaranteed to provide at least the number of required * buffers. * - *

The buffer pool is either of dynamic size or fixed. + *

The buffer pool is of dynamic size with at least numRequiredBuffers buffers. */ - BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException; + BufferPool createBufferPool(int numRequiredBuffers) throws IOException; /** * Destroy callback for updating factory book keeping. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index e601ac512176f..5345fbbe96af5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -58,9 +58,7 @@ public class NetworkBufferPool implements BufferPoolFactory { private final Object factoryLock = new Object(); - private final Set managedBufferPools = new HashSet(); - - public final Set allBufferPools = new HashSet(); + private final Set allBufferPools = new HashSet(); private int numTotalRequiredBuffers; @@ -182,7 +180,7 @@ public int countBuffers() { // ------------------------------------------------------------------------ @Override - public BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException { + public BufferPool createBufferPool(int numRequiredBuffers) throws IOException { // It is necessary to use a separate lock from the one used for buffer // requests to ensure deadlock freedom for failure cases. synchronized (factoryLock) { @@ -209,12 +207,6 @@ public BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) // non-fixed size buffers. LocalBufferPool localBufferPool = new LocalBufferPool(this, numRequiredBuffers); - // The fixed size pools get their share of buffers and don't change - // it during their lifetime. - if (!isFixedSize) { - managedBufferPools.add(localBufferPool); - } - allBufferPools.add(localBufferPool); redistributeBuffers(); @@ -231,8 +223,6 @@ public void destroyBufferPool(BufferPool bufferPool) { synchronized (factoryLock) { if (allBufferPools.remove(bufferPool)) { - managedBufferPools.remove(bufferPool); - numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments(); try { @@ -246,7 +236,7 @@ public void destroyBufferPool(BufferPool bufferPool) { /** * Destroys all buffer pools that allocate their buffers from this - * buffer pool (created via {@link #createBufferPool(int, boolean)}). + * buffer pool (created via {@link #createBufferPool(int)}). */ public void destroyAllBufferPools() { synchronized (factoryLock) { @@ -258,7 +248,7 @@ public void destroyAllBufferPools() { } // some sanity checks - if (allBufferPools.size() > 0 || managedBufferPools.size() > 0 || numTotalRequiredBuffers > 0) { + if (allBufferPools.size() > 0 || numTotalRequiredBuffers > 0) { throw new IllegalStateException("NetworkBufferPool is not empty after destroying all LocalBufferPools"); } } @@ -268,7 +258,7 @@ public void destroyAllBufferPools() { private void redistributeBuffers() throws IOException { assert Thread.holdsLock(factoryLock); - int numManagedBufferPools = managedBufferPools.size(); + int numManagedBufferPools = allBufferPools.size(); if (numManagedBufferPools == 0) { return; // necessary to avoid div by zero when no managed pools @@ -285,7 +275,7 @@ private void redistributeBuffers() throws IOException { int bufferPoolIndex = 0; - for (LocalBufferPool bufferPool : managedBufferPools) { + for (LocalBufferPool bufferPool : allBufferPools) { int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0; bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java index 12b52ecb4eba3..36c1234ae1ab9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java @@ -67,7 +67,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler Date: Fri, 10 Feb 2017 14:36:37 +0100 Subject: [PATCH 4/6] [FLINK-4545] remove (unused) persistent partition type --- .../network/partition/ResultPartitionType.java | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java index 65d49ed41f3df..43d3a52d3b2c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java @@ -20,14 +20,9 @@ public enum ResultPartitionType { - BLOCKING(true, false, false), + BLOCKING(false, false), - PIPELINED(false, true, true), - - PIPELINED_PERSISTENT(true, true, true); - - /** Does the partition live longer than the consuming task? */ - private final boolean isPersistent; + PIPELINED(true, true); /** Can the partition be consumed while being produced? */ private final boolean isPipelined; @@ -38,8 +33,7 @@ public enum ResultPartitionType { /** * Specifies the behaviour of an intermediate result partition at runtime. */ - ResultPartitionType(boolean isPersistent, boolean isPipelined, boolean hasBackPressure) { - this.isPersistent = isPersistent; + ResultPartitionType(boolean isPipelined, boolean hasBackPressure) { this.isPipelined = isPipelined; this.hasBackPressure = hasBackPressure; } @@ -55,8 +49,4 @@ public boolean isBlocking() { public boolean isPipelined() { return isPipelined; } - - public boolean isPersistent() { - return isPersistent; - } } From 91cea2917e9453f9de5c02472d99d4fc0d090dda Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Mon, 6 Mar 2017 12:36:02 +0100 Subject: [PATCH 5/6] [FLINK-4545] remove JobVertex#connectNewDataSetAsInput variant without partition type This removes JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) and requires the developer to call JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) instead and think about the partition type to add. --- .../plantranslate/JobGraphGenerator.java | 4 +- .../flink/runtime/jobgraph/JobVertex.java | 4 -- .../ExecutionGraphConstructionTest.java | 38 ++++++------ .../ExecutionGraphDeploymentTest.java | 6 +- .../ExecutionGraphSignalsTest.java | 11 ++-- .../ExecutionVertexLocalityTest.java | 3 +- .../executiongraph/PointwisePatternTest.java | 15 ++--- .../executiongraph/VertexSlotSharingTest.java | 5 +- .../flink/runtime/jobgraph/JobGraphTest.java | 61 ++++++++++--------- .../runtime/jobgraph/JobTaskVertexTest.java | 5 +- .../jobgraph/jsonplan/JsonGeneratorTest.java | 4 +- .../LeaderChangeJobRecoveryTest.java | 3 +- .../LeaderChangeStateCleanupTest.java | 3 +- ...TaskCancelAsyncProducerConsumerITCase.java | 3 +- .../CoLocationConstraintITCase.scala | 11 ++-- .../runtime/jobmanager/JobManagerITCase.scala | 40 ++++++++---- .../runtime/jobmanager/RecoveryITCase.scala | 18 +++--- .../jobmanager/SlotSharingITCase.scala | 19 +++--- ...askManagerFailsWithSlotSharingITCase.scala | 12 ++-- .../runtime/NetworkStackThroughputITCase.java | 10 ++- .../ZooKeeperLeaderElectionITCase.java | 4 +- .../taskmanager/TaskManagerFailsITCase.scala | 7 ++- 22 files changed, 163 insertions(+), 123 deletions(-) diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index caeb43fa3410d..61e5327b71a20 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -1289,7 +1289,7 @@ private void finalizeBulkIteration(IterationDescriptor descr) { syncConfig.setNumberOfIterations(maxNumIterations); // connect the sync task - sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); + sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); // ----------------------------- create the iteration tail ------------------------------ @@ -1425,7 +1425,7 @@ private void finalizeWorksetIteration(IterationDescriptor descr) { syncConfig.setNumberOfIterations(maxNumIterations); // connect the sync task - sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); + sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); } // ----------------------------- create the iteration tails ----------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 9dcaeeb7c4f93..260bd74ba8a1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -400,10 +400,6 @@ public JobEdge connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPa return edge; } - public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) { - return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED); - } - public JobEdge connectNewDataSetAsInput( JobVertex input, DistributionPattern distPattern, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index fa483848b36ab..e1bad5657225c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -104,11 +104,11 @@ public void testCreateSimpleGraphBipartite() throws Exception { v4.setInvokableClass(AbstractInvokable.class); v5.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); - v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); - v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); - v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); - v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2, v3, v4, v5)); @@ -153,7 +153,7 @@ public void testAttachViaDataSets() throws Exception { v3.setInvokableClass(AbstractInvokable.class); // this creates an intermediate result for v1 - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); // create results for v2 and v3 IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED); @@ -193,7 +193,7 @@ public void testAttachViaDataSets() throws Exception { v4.connectDataSetAsInput(v2result, DistributionPattern.ALL_TO_ALL); v4.connectDataSetAsInput(v3result_1, DistributionPattern.ALL_TO_ALL); - v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL); List ordered2 = new ArrayList(Arrays.asList(v4, v5)); @@ -230,7 +230,7 @@ public void testAttachViaIds() throws Exception { v3.setInvokableClass(AbstractInvokable.class); // this creates an intermediate result for v1 - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); // create results for v2 and v3 IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED); @@ -269,7 +269,7 @@ public void testAttachViaIds() throws Exception { v4.connectIdInput(v2result.getId(), DistributionPattern.ALL_TO_ALL); v4.connectIdInput(v3result_1.getId(), DistributionPattern.ALL_TO_ALL); - v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL); List ordered2 = new ArrayList(Arrays.asList(v4, v5)); @@ -558,11 +558,11 @@ public void testCannotConnectWrongOrder() throws Exception { v4.setInvokableClass(AbstractInvokable.class); v5.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); - v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); - v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); - v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); - v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2, v3, v5, v4)); @@ -625,11 +625,11 @@ public void testSetupInputSplits() { v4.setInvokableClass(AbstractInvokable.class); v5.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); - v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); - v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); - v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); - v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); v3.setInputSplitSource(source1); v5.setInputSplitSource(source2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index f119671c10a5f..eb52051e33e01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -85,9 +85,9 @@ public void testBuildDeploymentDescriptor() { v3.setInvokableClass(BatchTask.class); v4.setInvokableClass(BatchTask.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); - v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); - v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutor(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index 64b9aa2503af9..27844c16f4734 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -119,15 +120,15 @@ public void prepare() throws Exception { v4.setParallelism(dop[3]); v5.setParallelism(dop[4]); - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); mockNumberOfInputs(1,0); - v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); mockNumberOfInputs(3,1); - v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); mockNumberOfInputs(3,2); - v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); mockNumberOfInputs(4,3); - v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); mockNumberOfInputs(4,2); List ordered = new ArrayList(Arrays.asList(v1, v2, v3, v4, v5)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index cfd466575c807..eb85a3387b87d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -200,7 +201,7 @@ private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws target.setInvokableClass(NoOpInvokable.class); DistributionPattern connectionPattern = allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE; - target.connectNewDataSetAsInput(source, connectionPattern); + target.connectNewDataSetAsInput(source, connectionPattern, ResultPartitionType.PIPELINED); JobGraph testJob = new JobGraph(jobId, "test job", source, target); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 5629c0b050e0b..8ff0032ebcf4a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -62,7 +63,7 @@ public void testNToN() throws Exception { v1.setInvokableClass(AbstractInvokable.class); v2.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2)); @@ -109,7 +110,7 @@ public void test2NToN() throws Exception { v1.setInvokableClass(AbstractInvokable.class); v2.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2)); @@ -157,7 +158,7 @@ public void test3NToN() throws Exception { v1.setInvokableClass(AbstractInvokable.class); v2.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2)); @@ -206,7 +207,7 @@ public void testNTo2N() throws Exception { v1.setInvokableClass(AbstractInvokable.class); v2.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2)); @@ -253,7 +254,7 @@ public void testNTo7N() throws Exception { v1.setInvokableClass(AbstractInvokable.class); v2.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2)); @@ -320,7 +321,7 @@ private void testLowToHigh(int lowDop, int highDop) throws Exception { v1.setInvokableClass(AbstractInvokable.class); v2.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2)); @@ -378,7 +379,7 @@ private void testHighToLow(int highDop, int lowDop) throws Exception { v1.setInvokableClass(AbstractInvokable.class); v2.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); List ordered = new ArrayList(Arrays.asList(v1, v2)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index bf17485e45321..90e3368adf19e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; @@ -67,8 +68,8 @@ public void testAssignSlotSharingGroup() { v4.setInvokableClass(AbstractInvokable.class); v5.setInvokableClass(AbstractInvokable.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); - v5.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); SlotSharingGroup jg1 = new SlotSharingGroup(); v2.setSlotSharingGroup(jg1); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index 74f1adf04223f..9f06c6a991045 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.junit.Test; public class JobGraphTest { @@ -44,8 +45,8 @@ public void testSerialization() { JobVertex source1 = new JobVertex("source1"); JobVertex source2 = new JobVertex("source2"); JobVertex target = new JobVertex("target"); - target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE); - target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL); + target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); jg.addVertex(source1); jg.addVertex(source2); @@ -84,11 +85,11 @@ public void testTopologicalSort1() { JobVertex intermediate1 = new JobVertex("intermediate1"); JobVertex intermediate2 = new JobVertex("intermediate2"); - target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE); - target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE); - target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE); - intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE); - intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); + target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); JobGraph graph = new JobGraph("TestGraph", source1, source2, intermediate1, intermediate2, target1, target2); @@ -121,19 +122,19 @@ public void testTopologicalSort2() { JobVertex l13 = new JobVertex("layer 1 - 3"); JobVertex l2 = new JobVertex("layer 2"); - root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE); - root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); - root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE); + root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE); - l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE); + l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE); + l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE); - l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); + l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); + l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); JobGraph graph = new JobGraph("TestGraph", source1, source2, root, l11, l13, l12, l2); @@ -177,10 +178,10 @@ public void testTopologicalSort3() { JobVertex op2 = new JobVertex("op2"); JobVertex op3 = new JobVertex("op3"); - op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE); - op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE); - op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE); - op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE); + op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3); List sorted = graph.getVerticesSortedTopologicallyFromSources(); @@ -206,10 +207,10 @@ public void testTopoSortCyclicGraphNoSources() { JobVertex v3 = new JobVertex("3"); JobVertex v4 = new JobVertex("4"); - v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); - v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE); - v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); + v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4); try { @@ -236,12 +237,12 @@ public void testTopoSortCyclicGraphIntermediateCycle() { JobVertex v4 = new JobVertex("4"); JobVertex target = new JobVertex("target"); - v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE); - v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); - v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE); - v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); - target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); + v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4, source, target); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java index 48f06b001faa2..d94b93e9c0743 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.operators.util.TaskConfig; import org.junit.Test; @@ -41,7 +42,7 @@ public class JobTaskVertexTest { public void testConnectDirectly() { JobVertex source = new JobVertex("source"); JobVertex target = new JobVertex("target"); - target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE); + target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); assertTrue(source.isInputVertex()); assertFalse(source.isOutputVertex()); @@ -62,7 +63,7 @@ public void testConnectMultipleTargets() { JobVertex source = new JobVertex("source"); JobVertex target1= new JobVertex("target1"); JobVertex target2 = new JobVertex("target2"); - target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE); + target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); target2.connectDataSetAsInput(source.getProducedDataSets().get(0), DistributionPattern.ALL_TO_ALL); assertTrue(source.isInputVertex()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java index d1d5f03b2ebc4..62b9b4037fa23 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java @@ -64,8 +64,8 @@ public void testGeneratorWithoutAnyAttachements() { join2.connectNewDataSetAsInput(join1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); join2.connectNewDataSetAsInput(source3, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); - sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE); - sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL); + sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); JobGraph jg = new JobGraph("my job", source1, source2, source3, intermediate1, intermediate2, join1, join2, sink1, sink2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java index fe330229bf588..49d0239cd5e6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -129,7 +130,7 @@ public JobGraph createBlockingJob(int parallelism) { sender.setParallelism(parallelism); receiver.setParallelism(parallelism); - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); sender.setSlotSharingGroup(slotSharingGroup); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java index 19cc44422979c..7ae99746831dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -272,7 +273,7 @@ public JobGraph createBlockingJob(int parallelism) { sender.setParallelism(parallelism); receiver.setParallelism(parallelism); - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); sender.setSlotSharingGroup(slotSharingGroup); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index 876e90868ed1b..5a14b409da1b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -91,7 +92,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { JobVertex consumer = new JobVertex("AsyncConsumer"); consumer.setParallelism(1); consumer.setInvokableClass(AsyncConsumer.class); - consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE); + consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID()); producer.setSlotSharingGroup(slot); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala index 12e2d638ecb98..d4b4cbf927518 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala @@ -19,18 +19,18 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorSystem -import akka.actor.Status.Success import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, JobVertex} +import org.apache.flink.runtime.io.network.partition.ResultPartitionType +import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob} +import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob} import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + import scala.concurrent.duration._ @RunWith(classOf[JUnitRunner]) @@ -60,7 +60,8 @@ class CoLocationConstraintITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID) sender.setSlotSharingGroup(sharingGroup) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 31e72ddacdaab..5374d01caff21 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -26,6 +26,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint} import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture +import org.apache.flink.runtime.io.network.partition.ResultPartitionType import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobSnapshottingSettings} import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode} import org.apache.flink.runtime.jobmanager.Tasks._ @@ -180,7 +181,8 @@ class JobManagerITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Pointwise Job", sender, receiver) @@ -215,7 +217,8 @@ class JobManagerITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Bipartite Job", sender, receiver) @@ -251,8 +254,10 @@ class JobManagerITCase(_system: ActorSystem) sender2.setParallelism(2 * num_tasks) receiver.setParallelism(3 * num_tasks) - receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) - receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) + receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) + receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2) @@ -296,8 +301,10 @@ class JobManagerITCase(_system: ActorSystem) sender2.setParallelism(2 * num_tasks) receiver.setParallelism(3 * num_tasks) - receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) - receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) + receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) + receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2) @@ -338,8 +345,10 @@ class JobManagerITCase(_system: ActorSystem) forwarder.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL) - receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL) + forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED) + receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver) @@ -375,7 +384,8 @@ class JobManagerITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Pointwise Job", sender, receiver) @@ -423,7 +433,8 @@ class JobManagerITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Pointwise Job", sender, receiver) @@ -468,7 +479,8 @@ class JobManagerITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Pointwise job", sender, receiver) @@ -508,7 +520,8 @@ class JobManagerITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Pointwise job", sender, receiver) @@ -556,7 +569,8 @@ class JobManagerITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Pointwise job", sender, receiver) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index b96369fd36f67..f3ab40905782d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -18,22 +18,23 @@ package org.apache.flink.runtime.jobmanager -import akka.actor.{PoisonPill, ActorSystem} +import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, JobVertex} +import org.apache.flink.runtime.io.network.partition.ResultPartitionType +import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob} +import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils} import org.junit.runner.RunWith import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import org.scalatest.junit.JUnitRunner -import scala.concurrent.duration._ +import scala.concurrent.duration._ import language.postfixOps @RunWith(classOf[JUnitRunner]) @@ -81,7 +82,8 @@ class RecoveryITCase(_system: ActorSystem) sender.setParallelism(NUM_TASKS) receiver.setParallelism(NUM_TASKS) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val executionConfig = new ExecutionConfig() executionConfig.setNumberOfExecutionRetries(1); @@ -125,7 +127,8 @@ class RecoveryITCase(_system: ActorSystem) sender.setParallelism(NUM_TASKS) receiver.setParallelism(NUM_TASKS) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val sharingGroup = new SlotSharingGroup sender.setSlotSharingGroup(sharingGroup) @@ -173,7 +176,8 @@ class RecoveryITCase(_system: ActorSystem) sender.setParallelism(NUM_TASKS) receiver.setParallelism(NUM_TASKS) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val sharingGroup = new SlotSharingGroup sender.setSlotSharingGroup(sharingGroup) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala index f986e7374d7a1..4fffd68dde7e9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala @@ -19,18 +19,18 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorSystem -import akka.actor.Status.Success import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph} -import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver} +import org.apache.flink.runtime.io.network.partition.ResultPartitionType +import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} +import org.apache.flink.runtime.jobmanager.Tasks.{AgnosticBinaryReceiver, Receiver, Sender} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob} +import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob} import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + import scala.concurrent.duration._ @RunWith(classOf[JUnitRunner]) @@ -60,7 +60,8 @@ class SlotSharingITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID) sender.setSlotSharingGroup(sharingGroup) @@ -107,8 +108,10 @@ class SlotSharingITCase(_system: ActorSystem) sender2.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) - receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) + receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) + receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala index d0136f05dd9c8..9775d335e534c 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala @@ -18,12 +18,13 @@ package org.apache.flink.runtime.jobmanager -import akka.actor.{Kill, ActorSystem, PoisonPill} +import akka.actor.{ActorSystem, Kill, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph} +import org.apache.flink.runtime.io.network.partition.ResultPartitionType +import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob} @@ -32,6 +33,7 @@ import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + import scala.concurrent.duration._ @RunWith(classOf[JUnitRunner]) @@ -61,7 +63,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val sharingGroup = new SlotSharingGroup() sender.setSlotSharingGroup(sharingGroup) @@ -110,7 +113,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val sharingGroup = new SlotSharingGroup() sender.setSlotSharingGroup(sharingGroup) diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index 3d0e5ab46ab0a..b38df61f5f153 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.io.network.api.reader.RecordReader; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -129,11 +130,14 @@ private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver); if (useForwarder) { - forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL); - consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL); + forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED); + consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED); } else { - consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL); + consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED); } return jobGraph; diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index eacdeb4a5b366..df4f370883ec0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -165,7 +166,8 @@ public void testJobExecutionOnClusterWithLeaderReelection() throws Exception { sender.setParallelism(parallelism); receiver.setParallelism(parallelism); - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED); SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); sender.setSlotSharingGroup(slotSharingGroup); diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index e141cc229de99..424fc8bd29998 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.client.JobExecutionException +import org.apache.flink.runtime.io.network.partition.ResultPartitionType import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable} @@ -94,7 +95,8 @@ class TaskManagerFailsITCase(_system: ActorSystem) receiver.setInvokableClass(classOf[BlockingReceiver]) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val jobID = jobGraph.getJobID @@ -146,7 +148,8 @@ class TaskManagerFailsITCase(_system: ActorSystem) receiver.setInvokableClass(classOf[BlockingReceiver]) sender.setParallelism(num_tasks) receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED) val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val jobID = jobGraph.getJobID From 83d1404b106b558679e4c9ef16123fbc6b5eac72 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Mon, 6 Mar 2017 12:37:56 +0100 Subject: [PATCH 6/6] [FLINK-4545] remove unused IntermediateDataSet constructors These were implying a default result partition type which we want the developer to actively decide upon. --- .../flink/runtime/jobgraph/IntermediateDataSet.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java index 2d9faa83b8368..f02aaa36454ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java @@ -47,14 +47,6 @@ public class IntermediateDataSet implements java.io.Serializable { private final ResultPartitionType resultType; // -------------------------------------------------------------------------------------------- - - public IntermediateDataSet(JobVertex producer) { - this(new IntermediateDataSetID(), producer); - } - - public IntermediateDataSet(IntermediateDataSetID id, JobVertex producer) { - this(id, ResultPartitionType.PIPELINED, producer); - } public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) { this.id = checkNotNull(id);