From ec44c0f14240a6826cb9efd1582fe49489572795 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 18 Mar 2015 14:19:05 +0100 Subject: [PATCH 1/6] [FLINK-1724] [tests] Respect number of task managers in TestingCluster Starting a task manager via TestingUtils does not respect the number of configured task managers and mis-configures the task managers to use local network communication (LocalConnectionManager instead of NettyConnectionManager). --- .../org/apache/flink/runtime/testingUtils/TestingCluster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index f12bc248a6eb5..4a72694155985 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -68,6 +68,6 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1) TaskManager.startTaskManagerActor(configuration, system, HOSTNAME, tmActorName, - singleActorSystem, true, classOf[TestingTaskManager]) + singleActorSystem, numTaskManagers == 1, classOf[TestingTaskManager]) } } From df5847f6985da9b53bad91ddd539c51025ec15d6 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 18 Mar 2015 14:20:59 +0100 Subject: [PATCH 2/6] [FLINK-1724] [tests] Fix test kit actor configuration typo --- .../org/apache/flink/runtime/testingUtils/TestingUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 28e906aa8c68f..9bb3d0b2ea1a2 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -50,7 +50,7 @@ object TestingUtils { |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] |akka.loglevel = $logLevel |akka.stdout-loglevel = OFF - |akka.jvm-exit-on-fata-error = off + |akka.jvm-exit-on-fatal-error = off |akka.log-config-on-start = off """.stripMargin } From 6bfa10fb8f9e12c04b98f3c89398b67545105da8 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 18 Mar 2015 14:23:53 +0100 Subject: [PATCH 3/6] [jobmanager] Add subtask index to state transition debug msg --- .../apache/flink/runtime/executiongraph/Execution.java | 4 ++-- .../flink/runtime/executiongraph/ExecutionVertex.java | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index eda5bdf8cdd14..8ecd7af38c23b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -880,8 +880,8 @@ private boolean transitionState(ExecutionState currentState, ExecutionState targ markTimestamp(targetState); if (LOG.isDebugEnabled()) { - LOG.debug("{} ({}) switched from {} to {}.",this.getVertex().getTaskName(), - getAttemptId(), currentState, targetState); + LOG.debug("{} ({}) switched from {} to {}.", + getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState); } // make sure that the state transition completes normally. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index e5d9db8def14b..7c45d2a1b3cdf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -156,6 +156,14 @@ public String getTaskName() { return this.jobVertex.getJobVertex().getName(); } + public String getTaskNameWithSubtaskIndex() { + return String.format( + "%s (%d/%d)", + jobVertex.getJobVertex().getName(), + subTaskIndex + 1, + getTotalNumberOfParallelSubtasks()); + } + public int getTotalNumberOfParallelSubtasks() { return this.jobVertex.getParallelism(); } From 579a90a77db0f600ef0648f1c64cd11c34bb3107 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 18 Mar 2015 14:24:54 +0100 Subject: [PATCH 4/6] [jobmanager] Fix typo in ResultPartition deployment descriptor --- .../deployment/ResultPartitionDeploymentDescriptor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 4a88f18e81876..2b0bbc1356795 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -80,8 +80,8 @@ public int getNumberOfSubpartitions() { @Override public String toString() { - return String.format("ResultPartitionDeploymentDescriptor [result id: %s," + - "partition id: %s,partition type: %s]", + return String.format("ResultPartitionDeploymentDescriptor [result id: %s, " + + "partition id: %s, partition type: %s]", resultId, partitionId, partitionType); } From 4ee75e0cb76a6abe07e4585b61a6739bae898c40 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 18 Mar 2015 17:26:42 +0100 Subject: [PATCH 5/6] [jobmanager] Fix missing queue scheduling mode --- .../org/apache/flink/runtime/executiongraph/Execution.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 8ecd7af38c23b..7fa4dbde11615 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -470,8 +470,11 @@ void scheduleOrUpdateConsumers(List> allConsumers) { @Override public Boolean call() throws Exception { try { + final ExecutionGraph consumerGraph = consumerVertex.getExecutionGraph(); + consumerVertex.scheduleForExecution( - consumerVertex.getExecutionGraph().getScheduler(), false); + consumerVertex.getExecutionGraph().getScheduler(), + consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed()); } catch (Throwable t) { fail(new IllegalStateException("Could not schedule consumer " + "vertex " + consumerVertex, t)); From 9f2553336628099cb99306658ee4a73c943aff00 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 18 Mar 2015 17:29:58 +0100 Subject: [PATCH 6/6] [FLINK-1709] Add initial SlotCountExceedingParallelismTest --- .../SlotCountExceedingParallelismTest.java | 221 ++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java new file mode 100644 index 0000000000000..fb44bced120a7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorRef; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.types.IntegerRecord; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.BitSet; + +public class SlotCountExceedingParallelismTest { + + // Test configuration + private final static int NUMBER_OF_TMS = 2; + private final static int NUMBER_OF_SLOTS_PER_TM = 2; + private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; + + private static TestingCluster flink; + private static ActorRef jobClient; + + @BeforeClass + public static void setUp() throws Exception { + flink = TestingUtils.startTestingCluster( + NUMBER_OF_SLOTS_PER_TM, + NUMBER_OF_TMS, + TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + + jobClient = JobClient.createJobClientFromConfig( + flink.configuration(), + true, + flink.jobManagerActorSystem()); + } + + @After + public void tearDown() throws Exception { + flink.stop(); + } + + @Test + public void testNoSlotSharingAndBlockingResult() throws Exception { + final String jobName = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)"; + + // Sender with higher parallelism than available slots + JobGraph jobGraph = createTestJobGraph(jobName, PARALLELISM * 2, PARALLELISM); + submitJobGraphAndWait(jobGraph); + + // Receiver with higher parallelism than available slots + jobGraph = createTestJobGraph(jobName, PARALLELISM, PARALLELISM * 2); + submitJobGraphAndWait(jobGraph); + + // Both sender and receiver with higher parallelism than available slots + jobGraph = createTestJobGraph(jobName, PARALLELISM * 2, PARALLELISM * 2); + submitJobGraphAndWait(jobGraph); + } + + // --------------------------------------------------------------------------------------------- + + private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException { + JobClient.submitJobAndWait( + jobGraph, + false, + jobClient, + TestingUtils.TESTING_DURATION()); + } + + private JobGraph createTestJobGraph( + String jobName, + int senderParallelism, + int receiverParallelism) { + + // The sender and receiver invokable logic ensure that each subtask gets the expected data + final AbstractJobVertex sender = new AbstractJobVertex("Sender"); + sender.setInvokableClass(RoundRobinSubtaskIndexSender.class); + sender.getConfiguration().setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, receiverParallelism); + sender.setParallelism(senderParallelism); + + final AbstractJobVertex receiver = new AbstractJobVertex("Receiver"); + receiver.setInvokableClass(SubtaskIndexReceiver.class); + receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, senderParallelism); + receiver.setParallelism(receiverParallelism); + + receiver.connectNewDataSetAsInput( + sender, + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph(jobName, sender, receiver); + + // We need to allow queued scheduling, because there are not enough slots available + // to run all tasks at once. We queue tasks and then let them finish/consume the blocking + // result one after the other. + jobGraph.setAllowQueuedScheduling(true); + + return jobGraph; + } + + /** + * Sends the subtask index a configurable number of times in a round-robin fashion. + */ + public static class RoundRobinSubtaskIndexSender extends AbstractInvokable { + + public final static String CONFIG_KEY = "number-of-times-to-send"; + + private RecordWriter writer; + + private int numberOfTimesToSend; + + @Override + public void registerInputOutput() { + writer = new RecordWriter(getEnvironment().getWriter(0)); + numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0); + } + + @Override + public void invoke() throws Exception { + final IntegerRecord subtaskIndex = new IntegerRecord( + getEnvironment().getIndexInSubtaskGroup()); + + try { + for (int i = 0; i < numberOfTimesToSend; i++) { + writer.emit(subtaskIndex); + } + writer.flush(); + } + finally { + writer.clearBuffers(); + } + } + } + + /** + * Expects to receive the subtask index from a configurable number of sender tasks. + */ + public static class SubtaskIndexReceiver extends AbstractInvokable { + + private final static String CONFIG_KEY = "number-of-indexes-to-receive"; + + private RecordReader reader; + + private int numberOfSubtaskIndexesToReceive; + + /** Each set bit position corresponds to a received subtask index */ + private BitSet receivedSubtaskIndexes; + + @Override + public void registerInputOutput() { + reader = new RecordReader( + getEnvironment().getInputGate(0), + IntegerRecord.class); + + numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0); + receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive); + } + + @Override + public void invoke() throws Exception { + try { + IntegerRecord record; + + int numberOfReceivedSubtaskIndexes = 0; + + while ((record = reader.next()) != null) { + // Check that we don't receive more than expected + numberOfReceivedSubtaskIndexes++; + + if (numberOfReceivedSubtaskIndexes > numberOfSubtaskIndexesToReceive) { + throw new IllegalStateException("Received more records than expected."); + } + + int subtaskIndex = record.getValue(); + + // Check that we only receive each subtask index once + if (receivedSubtaskIndexes.get(subtaskIndex)) { + throw new IllegalStateException("Received expected subtask index twice."); + } + else { + receivedSubtaskIndexes.set(subtaskIndex, true); + } + } + + // Check that we have received all expected subtask indexes + if (receivedSubtaskIndexes.cardinality() != numberOfSubtaskIndexesToReceive) { + throw new IllegalStateException("Finished receive, but did not receive " + + "all expected subtask indexes."); + } + } + finally { + reader.clearBuffers(); + } + } + } +}