From 0d2e8b2964b58f5610772c6b5bf39a93b9b0fd95 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 9 Nov 2016 16:07:22 +0100 Subject: [PATCH 1/3] Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels" The reverted commit did not really fix anything, but hid the problem by brute force, sending many more schedule or update consumers messages. --- .../InputChannelDeploymentDescriptor.java | 17 +- ...rtialInputChannelDeploymentDescriptor.java | 2 +- .../ResultPartitionDeploymentDescriptor.java | 25 +-- .../executiongraph/ExecutionJobVertex.java | 10 +- .../executiongraph/IntermediateResult.java | 11 +- .../io/network/NetworkEnvironment.java | 3 - .../io/network/partition/ResultPartition.java | 21 +-- .../partition/ResultPartitionManager.java | 2 - .../runtime/jobgraph/IntermediateDataSet.java | 31 ---- .../flink/runtime/jobgraph/JobVertex.java | 12 +- .../flink/runtime/taskmanager/Task.java | 1 - ...sultPartitionDeploymentDescriptorTest.java | 4 +- .../ExecutionGraphDeploymentTest.java | 2 +- .../io/network/NetworkEnvironmentTest.java | 147 ------------------ .../consumer/LocalInputChannelTest.java | 3 - .../runtime/jobgraph/JobTaskVertexTest.java | 36 +---- .../runtime/taskmanager/TaskManagerTest.java | 20 +-- .../api/graph/StreamingJobGraphGenerator.java | 9 +- 18 files changed, 35 insertions(+), 321 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 0912055b48815..a72b92f38101c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.ConnectionID; @@ -88,7 +89,9 @@ public String toString() { * Creates an input channel deployment descriptor for each partition. */ public static InputChannelDeploymentDescriptor[] fromEdges( - ExecutionEdge[] edges, SimpleSlot consumerSlot) { + ExecutionEdge[] edges, + SimpleSlot consumerSlot, + boolean allowLazyDeployment) throws ExecutionGraphException { final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID(); final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length]; @@ -105,9 +108,11 @@ public static InputChannelDeploymentDescriptor[] fromEdges( // The producing task needs to be RUNNING or already FINISHED if (consumedPartition.isConsumable() && producerSlot != null && - (producerState == ExecutionState.RUNNING - || producerState == ExecutionState.FINISHED)) { - + (producerState == ExecutionState.RUNNING || + producerState == ExecutionState.FINISHED || + producerState == ExecutionState.SCHEDULED || + producerState == ExecutionState.DEPLOYING)) { + final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation(); final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID(); @@ -124,9 +129,11 @@ public static InputChannelDeploymentDescriptor[] fromEdges( partitionLocation = ResultPartitionLocation.createRemote(connectionId); } } - else { + else if (allowLazyDeployment) { // The producing task might not have registered the partition yet partitionLocation = ResultPartitionLocation.createUnknown(); + } else { + throw new ExecutionGraphException("Trying to eagerly schedule a task whose inputs are not ready."); } final ResultPartitionID consumedPartitionId = new ResultPartitionID( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java index 0eac39d2f7d1a..c925f75d4ec54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java @@ -21,10 +21,10 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index e72d4683f7488..2881dde363f86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -48,20 +48,11 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** The number of subpartitions. */ private final int numberOfSubpartitions; - /** - * Flag indicating whether to eagerly deploy consumers. - * - *

If true, the consumers are deployed as soon as the - * runtime result is registered at the result manager of the task manager. - */ - private final boolean eagerlyDeployConsumers; - public ResultPartitionDeploymentDescriptor( IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, ResultPartitionType partitionType, - int numberOfSubpartitions, - boolean eagerlyDeployConsumers) { + int numberOfSubpartitions) { this.resultId = checkNotNull(resultId); this.partitionId = checkNotNull(partitionId); @@ -69,7 +60,6 @@ public ResultPartitionDeploymentDescriptor( checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; - this.eagerlyDeployConsumers = eagerlyDeployConsumers; } public IntermediateDataSetID getResultId() { @@ -88,16 +78,6 @@ public int getNumberOfSubpartitions() { return numberOfSubpartitions; } - /** - * Returns whether consumers should be deployed eagerly (as soon as they - * are registered at the result manager of the task manager). - * - * @return Whether consumers should be deployed eagerly - */ - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } - @Override public String toString() { return String.format("ResultPartitionDeploymentDescriptor [result id: %s, " @@ -129,7 +109,6 @@ public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartiti } return new ResultPartitionDeploymentDescriptor( - resultId, partitionId, partitionType, numberOfSubpartitions, - partition.getIntermediateResult().getEagerlyDeployConsumers()); + resultId, partitionId, partitionType, numberOfSubpartitions); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 47cfde1f639df..a62ed86126e22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.time.Time; @@ -30,13 +32,11 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.api.common.Archiveable; import org.apache.flink.runtime.instance.SlotProvider; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobEdge; -import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; @@ -45,7 +45,6 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; - import scala.Option; import java.io.IOException; @@ -161,8 +160,7 @@ public ExecutionJobVertex( result.getId(), this, numTaskVertices, - result.getResultType(), - result.getEagerlyDeployConsumers()); + result.getResultType()); } // create all task vertices diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 9d57014550aa5..c2c19d153ce0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -46,14 +46,11 @@ public class IntermediateResult { private final ResultPartitionType resultType; - private final boolean eagerlyDeployConsumers; - public IntermediateResult( IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers, - ResultPartitionType resultType, - boolean eagerlyDeployConsumers) { + ResultPartitionType resultType) { this.id = checkNotNull(id); this.producer = checkNotNull(producer); @@ -71,8 +68,6 @@ public IntermediateResult( // The runtime type for this produced result this.resultType = checkNotNull(resultType); - - this.eagerlyDeployConsumers = eagerlyDeployConsumers; } public void setPartition(int partitionNumber, IntermediateResultPartition partition) { @@ -108,10 +103,6 @@ public ResultPartitionType getResultType() { return resultType; } - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } - public int registerConsumer() { final int index = numConsumers; numConsumers++; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index b221ec71080cb..d0032d3366d91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -158,8 +157,6 @@ public void registerTask(Task task) throws IOException { throw new IllegalStateException("Unequal number of writers and partitions."); } - ResultPartitionConsumableNotifier jobManagerNotifier; - synchronized (lock) { if (isShutdown) { throw new IllegalStateException("NetworkEnvironment is shut down"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index f06cb43c5cddc..034b27ab8d4d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; @@ -28,7 +29,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.taskmanager.TaskActions; import org.apache.flink.runtime.taskmanager.TaskManager; import org.slf4j.Logger; @@ -89,14 +89,6 @@ public class ResultPartition implements BufferPoolOwner { /** Type of this partition. Defines the concrete subpartition implementation to use. */ private final ResultPartitionType partitionType; - /** - * Flag indicating whether to eagerly deploy consumers. - * - *

If true, the consumers are deployed as soon as the - * runtime result is registered at the result manager of the task manager. - */ - private final boolean doEagerDeployment; - /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; @@ -137,7 +129,6 @@ public ResultPartition( JobID jobId, ResultPartitionID partitionId, ResultPartitionType partitionType, - boolean doEagerDeployment, int numberOfSubpartitions, ResultPartitionManager partitionManager, ResultPartitionConsumableNotifier partitionConsumableNotifier, @@ -149,7 +140,6 @@ public ResultPartition( this.jobId = checkNotNull(jobId); this.partitionId = checkNotNull(partitionId); this.partitionType = checkNotNull(partitionType); - this.doEagerDeployment = doEagerDeployment; this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; this.partitionManager = checkNotNull(partitionManager); this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); @@ -365,15 +355,6 @@ public Throwable getFailureCause() { return cause; } - /** - * Deploys consumers if eager deployment is activated - */ - public void deployConsumers() { - if (doEagerDeployment) { - partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions); - } - } - /** * Releases buffers held by this result partition. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java index 6edae6fbf3856..9da3e14a5097a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java @@ -58,8 +58,6 @@ public void registerResultPartition(ResultPartition partition) throws IOExceptio throw new IllegalStateException("Result partition already registered."); } - partition.deployConsumers(); - LOG.debug("Registered {}.", partition); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java index c30c78e522150..2d9faa83b8368 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java @@ -45,14 +45,6 @@ public class IntermediateDataSet implements java.io.Serializable { // The type of partition to use at runtime private final ResultPartitionType resultType; - - /** - * Flag indicating whether to eagerly deploy consumers. - * - *

If true, the consumers are deployed as soon as the - * runtime result is registered at the result manager of the task manager. - */ - private boolean eagerlyDeployConsumers; // -------------------------------------------------------------------------------------------- @@ -87,29 +79,6 @@ public List getConsumers() { public ResultPartitionType getResultType() { return resultType; } - - /** - * Sets the flag indicating whether to eagerly deploy consumers (default: - * false). - * - * @param eagerlyDeployConsumers If true, the consumers are - * deployed as soon as the runtime result is - * registered at the result manager of the - * task manager. Default is false. - */ - public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) { - this.eagerlyDeployConsumers = eagerlyDeployConsumers; - } - - /** - * Returns whether consumers should be deployed eagerly (as soon as they - * are registered at the result manager of the task manager). - * - * @return Whether consumers should be deployed eagerly - */ - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 8ddc9f5c9b7ed..2bda9d85ed9c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -382,7 +382,7 @@ public JobEdge connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPa } public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) { - return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED, false); + return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED); } public JobEdge connectNewDataSetAsInput( @@ -390,17 +390,7 @@ public JobEdge connectNewDataSetAsInput( DistributionPattern distPattern, ResultPartitionType partitionType) { - return connectNewDataSetAsInput(input, distPattern, partitionType, false); - } - - public JobEdge connectNewDataSetAsInput( - JobVertex input, - DistributionPattern distPattern, - ResultPartitionType partitionType, - boolean eagerlyDeployConsumers) { - IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); - dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers); JobEdge edge = new JobEdge(dataSet, this, distPattern); this.inputs.add(edge); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 827451e67d478..6907606de2740 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -341,7 +341,6 @@ public Task( jobId, partitionId, desc.getPartitionType(), - desc.getEagerlyDeployConsumers(), desc.getNumberOfSubpartitions(), networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index d2fcc7b55cb6f..4b1e546f86c85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -45,8 +45,7 @@ public void testSerialization() throws Exception { resultId, partitionId, partitionType, - numberOfSubpartitions, - eagerlyDeployConsumers); + numberOfSubpartitions); ResultPartitionDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); @@ -55,6 +54,5 @@ public void testSerialization() throws Exception { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); - assertEquals(eagerlyDeployConsumers, copy.getEagerlyDeployConsumers()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 63da1ab710c9e..d4acd8c45796b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -308,7 +308,7 @@ public void testNoResourceAvailableFailure() throws Exception { v1.setInvokableClass(BatchTask.class); v2.setInvokableClass(BatchTask.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java deleted file mode 100644 index 13da18e6aed56..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; -import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.Test; -import scala.Some; -import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.impl.Promise; - -import java.util.concurrent.TimeUnit; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class NetworkEnvironmentTest { - /** - * Registers a task with an eager and non-eager partition at the network - * environment and verifies that there is exactly on schedule or update - * message to the job manager for the eager partition. - */ - @Test - @SuppressWarnings("unchecked") - public void testEagerlyDeployConsumers() throws Exception { - // Mock job manager => expected interactions will be verified - final ActorGateway jobManager = mock(ActorGateway.class); - when(jobManager.ask(anyObject(), any(FiniteDuration.class))) - .thenReturn(new Promise.DefaultPromise<>().future()); - - // Network environment setup - NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration( - 20, - 1024, - MemoryType.HEAP, - IOManager.IOMode.SYNC, - Some.empty(), - 0, - 0); - - NetworkEnvironment env = new NetworkEnvironment( - new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize(), config.memoryType()), - new LocalConnectionManager(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - config.ioMode(), - config.partitionRequestInitialBackoff(), - config.partitinRequestMaxBackoff()); - - env.start(); - - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new ActorGatewayResultPartitionConsumableNotifier( - TestingUtils.defaultExecutionContext(), - jobManager, - new FiniteDuration(30L, TimeUnit.SECONDS)); - - // Register mock task - JobID jobId = new JobID(); - Task mockTask = mock(Task.class); - - ResultPartition[] partitions = new ResultPartition[2]; - partitions[0] = createPartition(mockTask, "p1", jobId, true, env, resultPartitionConsumableNotifier); - partitions[1] = createPartition(mockTask, "p2", jobId, false, env, resultPartitionConsumableNotifier); - - ResultPartitionWriter[] writers = new ResultPartitionWriter[2]; - writers[0] = new ResultPartitionWriter(partitions[0]); - writers[1] = new ResultPartitionWriter(partitions[1]); - - when(mockTask.getAllInputGates()).thenReturn(new SingleInputGate[0]); - when(mockTask.getAllWriters()).thenReturn(writers); - when(mockTask.getProducedPartitions()).thenReturn(partitions); - - env.registerTask(mockTask); - - // Verify - ResultPartitionID eagerPartitionId = partitions[0].getPartitionId(); - - verify(jobManager, times(1)).ask( - eq(new ScheduleOrUpdateConsumers(jobId, eagerPartitionId)), - any(FiniteDuration.class)); - } - - /** - * Helper to create a mock result partition. - */ - private static ResultPartition createPartition( - Task owningTask, - String name, - JobID jobId, - boolean eagerlyDeployConsumers, - NetworkEnvironment env, - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier) { - - return new ResultPartition( - name, - owningTask, - jobId, - new ResultPartitionID(), - ResultPartitionType.PIPELINED, - eagerlyDeployConsumers, - 1, - env.getResultPartitionManager(), - resultPartitionConsumableNotifier, - mock(IOManager.class), - env.getDefaultIOMode()); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 19bb67e94c829..2d3797dc70529 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Lists; - import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.execution.CancelTaskException; @@ -41,7 +40,6 @@ import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; - import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -122,7 +120,6 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception { jobId, partitionIds[i], ResultPartitionType.PIPELINED, - false, parallelism, partitionManager, partitionConsumableNotifier, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java index c3ba909a3b7cf..48f06b001faa2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.jobgraph; -import java.io.IOException; - import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.common.io.InitializeOnMaster; @@ -29,10 +27,11 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.operators.util.TaskConfig; import org.junit.Test; +import java.io.IOException; + import static org.junit.Assert.*; @SuppressWarnings("serial") @@ -130,36 +129,7 @@ public void testInputFormatVertex() { fail(e.getMessage()); } } - - /** - * Verifies correct setting of eager deploy settings. - */ - @Test - public void testEagerlyDeployConsumers() throws Exception { - JobVertex producer = new JobVertex("producer"); - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL); - assertFalse(edge.getSource().getEagerlyDeployConsumers()); - } - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); - assertFalse(edge.getSource().getEagerlyDeployConsumers()); - } - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true); - assertTrue(edge.getSource().getEagerlyDeployConsumers()); - } - } - + // -------------------------------------------------------------------------------------------- private static final class TestingOutputFormat extends DiscardingOutputFormat implements InitializeOnMaster { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index ad107b1446be5..15947f9f38be7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -18,11 +18,7 @@ package org.apache.flink.runtime.taskmanager; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Kill; -import akka.actor.Props; -import akka.actor.Status; +import akka.actor.*; import akka.japi.Creator; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; @@ -35,11 +31,7 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; -import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionLocation; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.deployment.*; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; @@ -630,7 +622,7 @@ public void testRunJobWithForwardChannel() { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List irpdd = new ArrayList(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -775,7 +767,7 @@ public void testCancellingDependentAndStateUpdateFails() { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List irpdd = new ArrayList(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -1427,9 +1419,7 @@ public void testFailingScheduleOrUpdateConsumersMessage() throws Exception { new IntermediateDataSetID(), new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, - 1, - false // don't deploy eagerly but with the first completed memory buffer - ); + 1); final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, "TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 2065a1688d2e4..48be2e982da9a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -392,20 +392,17 @@ private void connect(Integer headOfChain, StreamEdge edge) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } else if (partitioner instanceof RescalePartitioner){ jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } else { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.ALL_TO_ALL, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } // set strategy name so that web interface can show it. jobEdge.setShipStrategyName(partitioner.toString()); From 2742d5c1761ca02d871333e91a8ecbc6d0a52f6c Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 9 Nov 2016 18:25:06 +0100 Subject: [PATCH 2/3] [FLINK-5040] [jobmanager] Set correct input channel types with eager scheduling --- .../ResultPartitionDeploymentDescriptor.java | 17 +- .../executiongraph/ExecutionVertex.java | 21 +- .../runtime/io/network/PartitionState.java | 18 +- .../io/network/partition/ResultPartition.java | 8 +- .../flink/runtime/jobgraph/ScheduleMode.java | 10 +- .../flink/runtime/taskmanager/Task.java | 10 +- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../InputChannelDeploymentDescriptorTest.java | 206 ++++++++++++++++++ ...sultPartitionDeploymentDescriptorTest.java | 6 +- .../ExecutionVertexDeploymentTest.java | 106 ++++++--- .../partition/ResultPartitionTest.java | 92 ++++++++ .../consumer/LocalInputChannelTest.java | 3 +- .../runtime/jobgraph/ScheduleModeTest.java | 36 +++ .../runtime/taskmanager/TaskManagerTest.java | 19 +- .../flink/runtime/taskmanager/TaskTest.java | 5 +- 15 files changed, 491 insertions(+), 68 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 2881dde363f86..2ecde80c17c79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -47,12 +47,16 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** The number of subpartitions. */ private final int numberOfSubpartitions; + + /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ + private final boolean lazyScheduling; public ResultPartitionDeploymentDescriptor( IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, ResultPartitionType partitionType, - int numberOfSubpartitions) { + int numberOfSubpartitions, + boolean lazyScheduling) { this.resultId = checkNotNull(resultId); this.partitionId = checkNotNull(partitionId); @@ -60,6 +64,7 @@ public ResultPartitionDeploymentDescriptor( checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; + this.lazyScheduling = lazyScheduling; } public IntermediateDataSetID getResultId() { @@ -78,6 +83,10 @@ public int getNumberOfSubpartitions() { return numberOfSubpartitions; } + public boolean allowLazyScheduling() { + return lazyScheduling; + } + @Override public String toString() { return String.format("ResultPartitionDeploymentDescriptor [result id: %s, " @@ -87,7 +96,7 @@ public String toString() { // ------------------------------------------------------------------------ - public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition) { + public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition, boolean lazyScheduling) { final IntermediateDataSetID resultId = partition.getIntermediateResult().getId(); final IntermediateResultPartitionID partitionId = partition.getPartitionId(); @@ -102,13 +111,13 @@ public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartiti if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) { if (partition.getConsumers().size() > 1) { - new IllegalStateException("Currently, only a single consumer group per partition is supported."); + throw new IllegalStateException("Currently, only a single consumer group per partition is supported."); } numberOfSubpartitions = partition.getConsumers().get(0).size(); } return new ResultPartitionDeploymentDescriptor( - resultId, partitionId, partitionType, numberOfSubpartitions); + resultId, partitionId, partitionType, numberOfSubpartitions, lazyScheduling); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index e7f000c4fd921..01e8660d6f8a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -568,21 +568,24 @@ TaskDeploymentDescriptor createDeploymentDescriptor( ExecutionAttemptID executionId, SimpleSlot targetSlot, TaskStateHandles taskStateHandles, - int attemptNumber) { - + int attemptNumber) throws ExecutionGraphException { + // Produced intermediate results - List producedPartitions = new ArrayList(resultPartitions.size()); + List producedPartitions = new ArrayList<>(resultPartitions.size()); + + // Consumed intermediate results + List consumedPartitions = new ArrayList<>(inputEdges.length); + + boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { - producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition)); + producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling)); } - - // Consumed intermediate results - List consumedPartitions = new ArrayList(); - + + for (ExecutionEdge[] edges : inputEdges) { InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor - .fromEdges(edges, targetSlot); + .fromEdges(edges, targetSlot, lazyScheduling); // If the produced partition has multiple consumers registered, we // need to request the one matching our sub task index. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java index 083412b6ecaf4..59357fcaa3d29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java @@ -23,18 +23,25 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; +import java.io.Serializable; + /** * Contains information about the state of a result partition. */ -public class PartitionState { +public class PartitionState implements Serializable { + + private static final long serialVersionUID = -4693651272083825031L; + private final IntermediateDataSetID intermediateDataSetID; private final IntermediateResultPartitionID intermediateResultPartitionID; private final ExecutionState executionState; public PartitionState( - IntermediateDataSetID intermediateDataSetID, - IntermediateResultPartitionID intermediateResultPartitionID, - ExecutionState executionState) { + IntermediateDataSetID intermediateDataSetID, + IntermediateResultPartitionID intermediateResultPartitionID, + @Nullable ExecutionState executionState) { + this.intermediateDataSetID = Preconditions.checkNotNull(intermediateDataSetID); this.intermediateResultPartitionID = Preconditions.checkNotNull(intermediateResultPartitionID); this.executionState = executionState; @@ -48,6 +55,9 @@ public IntermediateResultPartitionID getIntermediateResultPartitionID() { return intermediateResultPartitionID; } + /** + * Returns the execution state of the partition producer or null if it is not available. + */ public ExecutionState getExecutionState() { return executionState; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 034b27ab8d4d3..834318cded9b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -96,6 +96,8 @@ public class ResultPartition implements BufferPoolOwner { private final ResultPartitionConsumableNotifier partitionConsumableNotifier; + private final boolean sendScheduleOrUpdateConsumersMessage; + // - Runtime state -------------------------------------------------------- private final AtomicBoolean isReleased = new AtomicBoolean(); @@ -133,7 +135,8 @@ public ResultPartition( ResultPartitionManager partitionManager, ResultPartitionConsumableNotifier partitionConsumableNotifier, IOManager ioManager, - IOMode defaultIoMode) { + IOMode defaultIoMode, + boolean sendScheduleOrUpdateConsumersMessage) { this.owningTaskName = checkNotNull(owningTaskName); this.taskActions = checkNotNull(taskActions); @@ -143,6 +146,7 @@ public ResultPartition( this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; this.partitionManager = checkNotNull(partitionManager); this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); + this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; // Create the subpartitions. switch (partitionType) { @@ -437,7 +441,7 @@ private void checkInProduceState() { * Notifies pipelined consumers of this result partition once. */ private void notifyPipelinedConsumers() { - if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) { + if (sendScheduleOrUpdateConsumersMessage && !hasNotifiedPipelinedConsumers && partitionType.isPipelined()) { partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions); hasNotifiedPipelinedConsumers = true; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java index 9405067ec9c45..6a98e46290e7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java @@ -27,5 +27,13 @@ public enum ScheduleMode { LAZY_FROM_SOURCES, /** Schedules all tasks immediately. */ - EAGER + EAGER; + + /** + * Returns whether we are allowed to deploy consumers lazily. + */ + public boolean allowLazyDeployment() { + return this == LAZY_FROM_SOURCES; + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 6907606de2740..4f3dd5462ba46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -345,7 +345,8 @@ public Task( networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, ioManager, - networkEnvironment.getDefaultIOMode()); + networkEnvironment.getDefaultIOMode(), + desc.allowLazyScheduling()); writers[counter] = new ResultPartitionWriter(producedPartitions[counter]); @@ -568,6 +569,7 @@ else if (current == ExecutionState.CANCELING) { // ---------------------------------------------------------------- LOG.info("Registering task at network: " + this); + network.registerTask(this); // next, kick off the background copying of files for the distributed cache @@ -1135,7 +1137,11 @@ public void onPartitionStateUpdate( final SingleInputGate inputGate = inputGatesById.get(resultId); if (inputGate != null) { - if (partitionState == ExecutionState.RUNNING) { + if (partitionState == ExecutionState.RUNNING || + partitionState == ExecutionState.FINISHED || + partitionState == ExecutionState.SCHEDULED || + partitionState == ExecutionState.DEPLOYING) { + // Retrigger the partition request inputGate.retriggerPartitionRequest(partitionId); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9af5355de7e53..b2e1002017688 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -948,7 +948,7 @@ class JobManager( if (execution != null) execution.getState else null case None => // Nothing to do. This is not an error, because the request is received when a sending - // task fails during a remote partition request. + // task fails or is not yet available during a remote partition request. log.debug(s"Cannot find execution graph for job $jobId.") null diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java new file mode 100644 index 0000000000000..e9e8901326b52 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.junit.Test; + +import java.net.InetAddress; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class InputChannelDeploymentDescriptorTest { + + /** + * Tests the deployment descriptors for local, remote, and unknown partition + * locations (with lazy deployment allowed and all execution states for the + * producers). + */ + @Test + public void testMixedLocalRemoteUnknownDeployment() throws Exception { + boolean allowLazyDeployment = true; + + ResourceID consumerResourceId = ResourceID.generate(); + ExecutionVertex consumer = mock(ExecutionVertex.class); + SimpleSlot consumerSlot = mockSlot(consumerResourceId); + + // Local and remote channel are only allowed for certain execution + // states. + for (ExecutionState state : ExecutionState.values()) { + // Local partition + ExecutionVertex localProducer = mockExecutionVertex(state, consumerResourceId); + IntermediateResultPartition localPartition = mockPartition(localProducer); + ResultPartitionID localPartitionId = new ResultPartitionID(localPartition.getPartitionId(), localProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge localEdge = new ExecutionEdge(localPartition, consumer, 0); + + // Remote partition + ExecutionVertex remoteProducer = mockExecutionVertex(state, ResourceID.generate()); // new resource ID + IntermediateResultPartition remotePartition = mockPartition(remoteProducer); + ResultPartitionID remotePartitionId = new ResultPartitionID(remotePartition.getPartitionId(), remoteProducer.getCurrentExecutionAttempt().getAttemptId()); + ConnectionID remoteConnectionId = new ConnectionID(remoteProducer.getCurrentAssignedResource().getTaskManagerLocation(), 0); + ExecutionEdge remoteEdge = new ExecutionEdge(remotePartition, consumer, 1); + + // Unknown partition + ExecutionVertex unknownProducer = mockExecutionVertex(state, null); // no assigned resource + IntermediateResultPartition unknownPartition = mockPartition(unknownProducer); + ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2); + + InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{localEdge, remoteEdge, unknownEdge}, + consumerSlot, + allowLazyDeployment); + + assertEquals(3, desc.length); + + // These states are allowed + if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHED || + state == ExecutionState.SCHEDULED || state == ExecutionState.DEPLOYING) { + + // Create local or remote channels + assertEquals(localPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isLocal()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + assertEquals(remotePartitionId, desc[1].getConsumedPartitionId()); + assertTrue(desc[1].getConsumedPartitionLocation().isRemote()); + assertEquals(remoteConnectionId, desc[1].getConsumedPartitionLocation().getConnectionId()); + } else { + // Unknown (lazy deployment allowed) + assertEquals(localPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + assertEquals(remotePartitionId, desc[1].getConsumedPartitionId()); + assertTrue(desc[1].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[1].getConsumedPartitionLocation().getConnectionId()); + } + + assertEquals(unknownPartitionId, desc[2].getConsumedPartitionId()); + assertTrue(desc[2].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[2].getConsumedPartitionLocation().getConnectionId()); + } + } + + @Test + public void testUnknownChannelWithoutLazyDeploymentThrows() throws Exception { + ResourceID consumerResourceId = ResourceID.generate(); + ExecutionVertex consumer = mock(ExecutionVertex.class); + SimpleSlot consumerSlot = mockSlot(consumerResourceId); + + + // Unknown partition + ExecutionVertex unknownProducer = mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource + IntermediateResultPartition unknownPartition = mockPartition(unknownProducer); + ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2); + + // This should work if lazy deployment is allowed + boolean allowLazyDeployment = true; + + InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{unknownEdge}, + consumerSlot, + allowLazyDeployment); + + assertEquals(1, desc.length); + + assertEquals(unknownPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + + try { + // Fail if lazy deployment is *not* allowed + allowLazyDeployment = false; + + InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{unknownEdge}, + consumerSlot, + allowLazyDeployment); + + fail("Did not throw expected ExecutionGraphException"); + } catch (ExecutionGraphException ignored) { + } + } + + // ------------------------------------------------------------------------ + + private static SimpleSlot mockSlot(ResourceID resourceId) { + SimpleSlot slot = mock(SimpleSlot.class); + when(slot.getTaskManagerLocation()).thenReturn(new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 5000)); + when(slot.getTaskManagerID()).thenReturn(resourceId); + + return slot; + } + + private static ExecutionVertex mockExecutionVertex(ExecutionState state, ResourceID resourceId) { + ExecutionVertex vertex = mock(ExecutionVertex.class); + + Execution exec = mock(Execution.class); + when(exec.getState()).thenReturn(state); + when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID()); + + if (resourceId != null) { + SimpleSlot slot = mockSlot(resourceId); + when(exec.getAssignedResource()).thenReturn(slot); + when(vertex.getCurrentAssignedResource()).thenReturn(slot); + } else { + when(exec.getAssignedResource()).thenReturn(null); // no resource + when(vertex.getCurrentAssignedResource()).thenReturn(null); + } + + when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); + + return vertex; + } + + private static IntermediateResultPartition mockPartition(ExecutionVertex producer) { + IntermediateResultPartition partition = mock(IntermediateResultPartition.class); + when(partition.isConsumable()).thenReturn(true); + + IntermediateResult result = mock(IntermediateResult.class); + when(result.getConnectionIndex()).thenReturn(0); + + when(partition.getIntermediateResult()).thenReturn(result); + when(partition.getPartitionId()).thenReturn(new IntermediateResultPartitionID()); + + when(partition.getProducer()).thenReturn(producer); + + return partition; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 4b1e546f86c85..4223b49d2d24a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ResultPartitionDeploymentDescriptorTest { @@ -38,14 +39,14 @@ public void testSerialization() throws Exception { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); ResultPartitionType partitionType = ResultPartitionType.PIPELINED; int numberOfSubpartitions = 24; - boolean eagerlyDeployConsumers = true; ResultPartitionDeploymentDescriptor orig = new ResultPartitionDeploymentDescriptor( resultId, partitionId, partitionType, - numberOfSubpartitions); + numberOfSubpartitions, + true); ResultPartitionDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); @@ -54,5 +55,6 @@ public void testSerialization() throws Exception { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); + assertTrue(copy.allowLazyScheduling()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 54aeff919fd19..8bc39a7864bb5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -18,20 +18,37 @@ package org.apache.flink.runtime.executiongraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; - -import static org.junit.Assert.*; - +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.testingUtils.TestingUtils; - import org.junit.Test; +import java.util.Collection; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ERROR_MESSAGE; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleFailingActorGateway; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class ExecutionVertexDeploymentTest { @Test @@ -48,7 +65,7 @@ public void testDeployCall() { final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); vertex.deployToSlot(slot); @@ -58,8 +75,7 @@ public void testDeployCall() { try { vertex.deployToSlot(slot); fail("Scheduled from wrong state"); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // as expected } @@ -67,8 +83,7 @@ public void testDeployCall() { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -82,12 +97,12 @@ public void testDeployWithSynchronousAnswer() { final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); final Instance instance = getInstance( - new ActorTaskManagerGateway( - new SimpleActorGateway(TestingUtils.directExecutionContext()))); + new ActorTaskManagerGateway( + new SimpleActorGateway(TestingUtils.directExecutionContext()))); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -99,8 +114,7 @@ public void testDeployWithSynchronousAnswer() { try { vertex.deployToSlot(slot); fail("Scheduled from wrong state"); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // as expected } @@ -109,8 +123,7 @@ public void testDeployWithSynchronousAnswer() { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -123,7 +136,7 @@ public void testDeployWithAsynchronousAnswer() { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( new ActorTaskManagerGateway( @@ -138,8 +151,7 @@ public void testDeployWithAsynchronousAnswer() { try { vertex.deployToSlot(slot); fail("Scheduled from wrong state"); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // as expected } @@ -149,16 +161,14 @@ public void testDeployWithAsynchronousAnswer() { try { vertex.deployToSlot(slot); fail("Scheduled from wrong state"); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // as expected } assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -171,7 +181,7 @@ public void testDeployFailedSynchronous() { final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( new ActorTaskManagerGateway( @@ -189,8 +199,7 @@ public void testDeployFailedSynchronous() { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -202,7 +211,7 @@ public void testDeployFailedAsynchronously() { final JobVertexID jid = new JobVertexID(); final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( new ActorTaskManagerGateway( @@ -229,8 +238,7 @@ public void testDeployFailedAsynchronously() { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -247,7 +255,7 @@ public void testFailExternallyDuringDeploy() { final ExecutionJobVertex ejv = getExecutionVertex(jid, ec); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( new ActorTaskManagerGateway( @@ -270,8 +278,7 @@ public void testFailExternallyDuringDeploy() { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -288,7 +295,7 @@ public void testFailCallOvertakesDeploymentAnswer() { final ExecutionJobVertex ejv = getExecutionVertex(jid, context); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId(); @@ -334,10 +341,37 @@ public void testFailCallOvertakesDeploymentAnswer() { assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); assertTrue(queue.isEmpty()); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } + + /** + * Tests that the lazy scheduling flag is correctly forwarded to the produced partition descriptors. + */ + @Test + public void testTddProducedPartitionsLazyScheduling() throws Exception { + TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext(); + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), context); + IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED); + ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + + Slot root = mock(Slot.class); + when(root.getSlotNumber()).thenReturn(1); + SimpleSlot slot = mock(SimpleSlot.class); + when(slot.getRoot()).thenReturn(root); + + for (ScheduleMode mode : ScheduleMode.values()) { + vertex.getExecutionGraph().setScheduleMode(mode); + + TaskDeploymentDescriptor tdd = vertex.createDeploymentDescriptor(new ExecutionAttemptID(), slot, null, 1); + + Collection producedPartitions = tdd.getProducedPartitions(); + + assertEquals(1, producedPartitions.size()); + ResultPartitionDeploymentDescriptor desc = producedPartitions.iterator().next(); + assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage()); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java new file mode 100644 index 0000000000000..f6fddfa6d2ed9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; +import org.apache.flink.runtime.taskmanager.TaskActions; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ResultPartitionTest { + + /** + * Tests the schedule or update consumers message sending behaviour depending on the relevant flags. + */ + @Test + public void testSendScheduleOrUpdateConsumersMessage() throws Exception { + { + // Pipelined, send message => notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, times(1)).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + + { + // Pipelined, don't send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, false); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + + { + // Blocking, send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, true); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + + { + // Blocking, don't send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, false); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + } + + // ------------------------------------------------------------------------ + + private static ResultPartition createPartition( + ResultPartitionConsumableNotifier notifier, + ResultPartitionType type, + boolean sendScheduleOrUpdateConsumersMessage) { + return new ResultPartition( + "TestTask", + mock(TaskActions.class), + new JobID(), + new ResultPartitionID(), + type, + 1, + mock(ResultPartitionManager.class), + notifier, + mock(IOManager.class), + IOManager.IOMode.SYNC, + sendScheduleOrUpdateConsumersMessage); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 2d3797dc70529..4ca1d1fa7c49f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -124,7 +124,8 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception { partitionManager, partitionConsumableNotifier, ioManager, - ASYNC); + ASYNC, + true); // Create a buffer pool for this partition partition.registerBufferPool( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java new file mode 100644 index 0000000000000..144ef12fa8802 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ScheduleModeTest { + + /** + * Test that schedule modes set the lazy deployment flag correctly. + */ + @Test + public void testAllowLazyDeployment() throws Exception { + assertTrue(ScheduleMode.LAZY_FROM_SOURCES.allowLazyDeployment()); + assertFalse(ScheduleMode.EAGER.allowLazyDeployment()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 15947f9f38be7..22f0c608f9438 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -18,7 +18,11 @@ package org.apache.flink.runtime.taskmanager; -import akka.actor.*; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Kill; +import akka.actor.Props; +import akka.actor.Status; import akka.japi.Creator; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; @@ -31,7 +35,11 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.deployment.*; +import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionLocation; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; @@ -622,7 +630,7 @@ public void testRunJobWithForwardChannel() { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List irpdd = new ArrayList(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -767,7 +775,7 @@ public void testCancellingDependentAndStateUpdateFails() { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List irpdd = new ArrayList(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -1419,7 +1427,8 @@ public void testFailingScheduleOrUpdateConsumersMessage() throws Exception { new IntermediateDataSetID(), new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, - 1); + 1, + true); final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, "TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 1eebe122f66ab..5d26050cf6982 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -553,6 +553,9 @@ public void testOnPartitionStateUpdate() throws Exception { } expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING); + expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING); + expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING); + expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING); expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING); expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING); @@ -568,7 +571,7 @@ public void testOnPartitionStateUpdate() throws Exception { assertEquals(expected.get(state), newTaskState); } - verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); + verify(inputGate, times(4)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } /** From ca797d986874a71648e8670df2b5c4d7455eeb7f Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 10 Nov 2016 11:15:47 +0100 Subject: [PATCH 3/3] [FLINK-5040] [taskmanager] Adjust partition request backoffs The back offs were hard coded before, which would have made it impossible to react to any potential problems with them. --- .../configuration/TaskManagerOptions.java | 24 ++++-- .../ResultPartitionDeploymentDescriptor.java | 8 +- .../partition/consumer/SingleInputGate.java | 10 ++- .../flink/runtime/taskmanager/Task.java | 2 +- .../NetworkEnvironmentConfiguration.scala | 14 ++-- .../runtime/taskmanager/TaskManager.scala | 9 +- ...sultPartitionDeploymentDescriptorTest.java | 2 +- .../consumer/SingleInputGateTest.java | 84 +++++++++++++++++++ ...kManagerComponentsStartupShutdownTest.java | 4 +- .../runtime/taskmanager/TaskManagerTest.java | 5 ++ 10 files changed, 140 insertions(+), 22 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index e5d36aae74e4c..6f6238b6e7332 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -34,6 +34,20 @@ public class TaskManagerOptions { // @TODO Migrate 'taskmanager.*' config options from ConfigConstants + // ------------------------------------------------------------------------ + // Network Options + // ------------------------------------------------------------------------ + + /** Minimum backoff for partition requests of input channels. */ + public static final ConfigOption NETWORK_REQUEST_BACKOFF_INITIAL = + key("taskmanager.net.request-backoff.initial") + .defaultValue(100); + + /** Maximum backoff for partition requests of input channels. */ + public static final ConfigOption NETWORK_REQUEST_BACKOFF_MAX = + key("taskmanager.net.request-backoff.max") + .defaultValue(10000); + // ------------------------------------------------------------------------ // Task Options // ------------------------------------------------------------------------ @@ -44,8 +58,8 @@ public class TaskManagerOptions { */ public static final ConfigOption TASK_CANCELLATION_INTERVAL = key("task.cancellation.interval") - .defaultValue(30000L) - .withDeprecatedKeys("task.cancellation-interval"); + .defaultValue(30000L) + .withDeprecatedKeys("task.cancellation-interval"); /** * Timeout in milliseconds after which a task cancellation times out and @@ -54,19 +68,19 @@ public class TaskManagerOptions { */ public static final ConfigOption TASK_CANCELLATION_TIMEOUT = key("task.cancellation.timeout") - .defaultValue(180000L); + .defaultValue(180000L); /** * The maximum number of bytes that a checkpoint alignment may buffer. * If the checkpoint alignment buffers more than the configured amount of * data, the checkpoint is aborted (skipped). - * + * *

The default value of {@code -1} indicates that there is no limit. */ public static final ConfigOption TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = key("task.checkpoint.alignment.max-size") .defaultValue(-1L); - + // ------------------------------------------------------------------------ /** Not intended to be instantiated */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 2ecde80c17c79..14c7d2ae5e1de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -49,7 +49,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { private final int numberOfSubpartitions; /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ - private final boolean lazyScheduling; + private final boolean sendScheduleOrUpdateConsumersMessage; public ResultPartitionDeploymentDescriptor( IntermediateDataSetID resultId, @@ -64,7 +64,7 @@ public ResultPartitionDeploymentDescriptor( checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; - this.lazyScheduling = lazyScheduling; + this.sendScheduleOrUpdateConsumersMessage = lazyScheduling; } public IntermediateDataSetID getResultId() { @@ -83,8 +83,8 @@ public int getNumberOfSubpartitions() { return numberOfSubpartitions; } - public boolean allowLazyScheduling() { - return lazyScheduling; + public boolean sendScheduleOrUpdateConsumersMessage() { + return sendScheduleOrUpdateConsumersMessage; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index af5fd89c0f2a2..8f57542d16002 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Maps; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.taskmanager.TaskActions; @@ -520,6 +521,13 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) { // ------------------------------------------------------------------------ + @VisibleForTesting + Map getInputChannels() { + return inputChannels; + } + + // ------------------------------------------------------------------------ + /** * Creates an input gate and all of its input channels. */ @@ -565,7 +573,7 @@ else if (partitionLocation.isRemote()) { partitionLocation.getConnectionId(), networkEnvironment.getConnectionManager(), networkEnvironment.getPartitionRequestInitialBackoff(), - networkEnvironment.getPartitionRequestInitialBackoff(), + networkEnvironment.getPartitionRequestMaxBackoff(), metrics ); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 4f3dd5462ba46..b960e68cb2b97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -346,7 +346,7 @@ public Task( resultPartitionConsumableNotifier, ioManager, networkEnvironment.getDefaultIOMode(), - desc.allowLazyScheduling()); + desc.sendScheduleOrUpdateConsumersMessage()); writers[counter] = new ResultPartitionWriter(producedPartitions[counter]); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala index 14589a176f68e..6a5966535635d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala @@ -23,10 +23,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode import org.apache.flink.runtime.io.network.netty.NettyConfig case class NetworkEnvironmentConfiguration( - numNetworkBuffers: Int, - networkBufferSize: Int, - memoryType: MemoryType, - ioMode: IOMode, - nettyConfig: Option[NettyConfig] = None, - partitionRequestInitialBackoff: Int = 500, - partitinRequestMaxBackoff: Int = 3000) + numNetworkBuffers: Int, + networkBufferSize: Int, + memoryType: MemoryType, + ioMode: IOMode, + partitionRequestInitialBackoff : Int, + partitionRequestMaxBackoff : Int, + nettyConfig: Option[NettyConfig] = None) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 4bb2da460aa2c..dd5d2183d3bbc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1982,7 +1982,7 @@ object TaskManager { kvStateServer, netConfig.ioMode, netConfig.partitionRequestInitialBackoff, - netConfig.partitinRequestMaxBackoff) + netConfig.partitionRequestMaxBackoff) network.start() @@ -2258,11 +2258,18 @@ object TaskManager { val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC + val initialRequestBackoff = configuration.getInteger( + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL) + val maxRequestBackoff = configuration.getInteger( + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX) + val networkConfig = NetworkEnvironmentConfiguration( numNetworkBuffers, pageSize, memType, ioMode, + initialRequestBackoff, + maxRequestBackoff, nettyConfig) // ----> timeouts, library caching, profiling diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 4223b49d2d24a..3ed8236d7a163 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -55,6 +55,6 @@ public void testSerialization() throws Exception { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); - assertTrue(copy.allowLazyScheduling()); + assertTrue(copy.sendScheduleOrUpdateConsumersMessage()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 8f9ea9efd3763..0b7b10dd77948 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -21,11 +21,14 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -42,9 +45,12 @@ import org.junit.Test; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -271,6 +277,84 @@ public void run() { assertEquals(IllegalStateException.class, asyncException.get().getClass()); } + /** + * Tests request back off configuration is correctly forwarded to the channels. + */ + @Test + public void testRequestBackoffConfiguration() throws Exception { + ResultPartitionID[] partitionIds = new ResultPartitionID[] { + new ResultPartitionID(), + new ResultPartitionID(), + new ResultPartitionID() + }; + + InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{ + // Local + new InputChannelDeploymentDescriptor( + partitionIds[0], + ResultPartitionLocation.createLocal()), + // Remote + new InputChannelDeploymentDescriptor( + partitionIds[1], + ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))), + // Unknown + new InputChannelDeploymentDescriptor( + partitionIds[2], + ResultPartitionLocation.createUnknown())}; + + InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs); + + int initialBackoff = 137; + int maxBackoff = 1001; + + NetworkEnvironment netEnv = mock(NetworkEnvironment.class); + when(netEnv.getResultPartitionManager()).thenReturn(new ResultPartitionManager()); + when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher()); + when(netEnv.getPartitionRequestInitialBackoff()).thenReturn(initialBackoff); + when(netEnv.getPartitionRequestMaxBackoff()).thenReturn(maxBackoff); + when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager()); + + SingleInputGate gate = SingleInputGate.create( + "TestTask", + new JobID(), + new ExecutionAttemptID(), + gateDesc, + netEnv, + mock(TaskActions.class), + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + + Map channelMap = gate.getInputChannels(); + + assertEquals(3, channelMap.size()); + InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId()); + assertEquals(LocalInputChannel.class, localChannel.getClass()); + + InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId()); + assertEquals(RemoteInputChannel.class, remoteChannel.getClass()); + + InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId()); + assertEquals(UnknownInputChannel.class, unknownChannel.getClass()); + + InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel}; + for (InputChannel ch : channels) { + assertEquals(0, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff * 2, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(maxBackoff, ch.getCurrentBackoff()); + + assertFalse(ch.increaseBackoff()); + } + } + /** * Returns whether the stack trace represents a Thread in a blocking queue * poll call. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 0bcd1cee0a2b6..f9434e2731a1b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -104,7 +104,7 @@ public void testComponentsStartupShutdown() { config); final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration( - 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.empty(), 0, 0); + 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, Option.empty()); ResourceID taskManagerId = ResourceID.generate(); @@ -121,7 +121,7 @@ public void testComponentsStartupShutdown() { null, netConf.ioMode(), netConf.partitionRequestInitialBackoff(), - netConf.partitinRequestMaxBackoff()); + netConf.partitionRequestMaxBackoff()); network.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 22f0c608f9438..fd9ff05d3147d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.blob.BlobKey; @@ -903,6 +904,8 @@ public void testRemotePartitionNotFound() throws Exception { final int dataPort = NetUtils.getAvailablePort(); Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); taskManager = TestingUtils.createTaskManager( system, @@ -998,6 +1001,8 @@ public void testLocalPartitionNotFound() throws Exception { jobManager = new AkkaActorGateway(jm, leaderSessionID); final Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); taskManager = TestingUtils.createTaskManager( system,