Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public int getNumberOfSubpartitions() {

@Override
public String toString() {
return String.format("ResultPartitionDeploymentDescriptor [result id: %s," +
"partition id: %s,partition type: %s]",
return String.format("ResultPartitionDeploymentDescriptor [result id: %s, "
+ "partition id: %s, partition type: %s]",
resultId, partitionId, partitionType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,11 @@ void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
@Override
public Boolean call() throws Exception {
try {
final ExecutionGraph consumerGraph = consumerVertex.getExecutionGraph();

consumerVertex.scheduleForExecution(
consumerVertex.getExecutionGraph().getScheduler(), false);
consumerVertex.getExecutionGraph().getScheduler(),
consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
} catch (Throwable t) {
fail(new IllegalStateException("Could not schedule consumer " +
"vertex " + consumerVertex, t));
Expand Down Expand Up @@ -880,8 +883,8 @@ private boolean transitionState(ExecutionState currentState, ExecutionState targ
markTimestamp(targetState);

if (LOG.isDebugEnabled()) {
LOG.debug("{} ({}) switched from {} to {}.",this.getVertex().getTaskName(),
getAttemptId(), currentState, targetState);
LOG.debug("{} ({}) switched from {} to {}.",
getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState);
}

// make sure that the state transition completes normally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ public String getTaskName() {
return this.jobVertex.getJobVertex().getName();
}

public String getTaskNameWithSubtaskIndex() {
return String.format(
"%s (%d/%d)",
jobVertex.getJobVertex().getName(),
subTaskIndex + 1,
getTotalNumberOfParallelSubtasks());
}

public int getTotalNumberOfParallelSubtasks() {
return this.jobVertex.getParallelism();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.types.IntegerRecord;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.BitSet;

public class SlotCountExceedingParallelismTest {

// Test configuration
private final static int NUMBER_OF_TMS = 2;
private final static int NUMBER_OF_SLOTS_PER_TM = 2;
private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;

private static TestingCluster flink;
private static ActorRef jobClient;

@BeforeClass
public static void setUp() throws Exception {
flink = TestingUtils.startTestingCluster(
NUMBER_OF_SLOTS_PER_TM,
NUMBER_OF_TMS,
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());

jobClient = JobClient.createJobClientFromConfig(
flink.configuration(),
true,
flink.jobManagerActorSystem());
}

@After
public void tearDown() throws Exception {
flink.stop();
}

@Test
public void testNoSlotSharingAndBlockingResult() throws Exception {
final String jobName = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";

// Sender with higher parallelism than available slots
JobGraph jobGraph = createTestJobGraph(jobName, PARALLELISM * 2, PARALLELISM);
submitJobGraphAndWait(jobGraph);

// Receiver with higher parallelism than available slots
jobGraph = createTestJobGraph(jobName, PARALLELISM, PARALLELISM * 2);
submitJobGraphAndWait(jobGraph);

// Both sender and receiver with higher parallelism than available slots
jobGraph = createTestJobGraph(jobName, PARALLELISM * 2, PARALLELISM * 2);
submitJobGraphAndWait(jobGraph);
}

// ---------------------------------------------------------------------------------------------

private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException {
JobClient.submitJobAndWait(
jobGraph,
false,
jobClient,
TestingUtils.TESTING_DURATION());
}

private JobGraph createTestJobGraph(
String jobName,
int senderParallelism,
int receiverParallelism) {

// The sender and receiver invokable logic ensure that each subtask gets the expected data
final AbstractJobVertex sender = new AbstractJobVertex("Sender");
sender.setInvokableClass(RoundRobinSubtaskIndexSender.class);
sender.getConfiguration().setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, receiverParallelism);
sender.setParallelism(senderParallelism);

final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
receiver.setInvokableClass(SubtaskIndexReceiver.class);
receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, senderParallelism);
receiver.setParallelism(receiverParallelism);

receiver.connectNewDataSetAsInput(
sender,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);

final JobGraph jobGraph = new JobGraph(jobName, sender, receiver);

// We need to allow queued scheduling, because there are not enough slots available
// to run all tasks at once. We queue tasks and then let them finish/consume the blocking
// result one after the other.
jobGraph.setAllowQueuedScheduling(true);

return jobGraph;
}

/**
* Sends the subtask index a configurable number of times in a round-robin fashion.
*/
public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {

public final static String CONFIG_KEY = "number-of-times-to-send";

private RecordWriter<IntegerRecord> writer;

private int numberOfTimesToSend;

@Override
public void registerInputOutput() {
writer = new RecordWriter<IntegerRecord>(getEnvironment().getWriter(0));
numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
}

@Override
public void invoke() throws Exception {
final IntegerRecord subtaskIndex = new IntegerRecord(
getEnvironment().getIndexInSubtaskGroup());

try {
for (int i = 0; i < numberOfTimesToSend; i++) {
writer.emit(subtaskIndex);
}
writer.flush();
}
finally {
writer.clearBuffers();
}
}
}

/**
* Expects to receive the subtask index from a configurable number of sender tasks.
*/
public static class SubtaskIndexReceiver extends AbstractInvokable {

private final static String CONFIG_KEY = "number-of-indexes-to-receive";

private RecordReader<IntegerRecord> reader;

private int numberOfSubtaskIndexesToReceive;

/** Each set bit position corresponds to a received subtask index */
private BitSet receivedSubtaskIndexes;

@Override
public void registerInputOutput() {
reader = new RecordReader<IntegerRecord>(
getEnvironment().getInputGate(0),
IntegerRecord.class);

numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);
}

@Override
public void invoke() throws Exception {
try {
IntegerRecord record;

int numberOfReceivedSubtaskIndexes = 0;

while ((record = reader.next()) != null) {
// Check that we don't receive more than expected
numberOfReceivedSubtaskIndexes++;

if (numberOfReceivedSubtaskIndexes > numberOfSubtaskIndexesToReceive) {
throw new IllegalStateException("Received more records than expected.");
}

int subtaskIndex = record.getValue();

// Check that we only receive each subtask index once
if (receivedSubtaskIndexes.get(subtaskIndex)) {
throw new IllegalStateException("Received expected subtask index twice.");
}
else {
receivedSubtaskIndexes.set(subtaskIndex, true);
}
}

// Check that we have received all expected subtask indexes
if (receivedSubtaskIndexes.cardinality() != numberOfSubtaskIndexesToReceive) {
throw new IllegalStateException("Finished receive, but did not receive "
+ "all expected subtask indexes.");
}
}
finally {
reader.clearBuffers();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)

TaskManager.startTaskManagerActor(configuration, system, HOSTNAME, tmActorName,
singleActorSystem, true, classOf[TestingTaskManager])
singleActorSystem, numTaskManagers == 1, classOf[TestingTaskManager])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object TestingUtils {
|akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
|akka.loglevel = $logLevel
|akka.stdout-loglevel = OFF
|akka.jvm-exit-on-fata-error = off
|akka.jvm-exit-on-fatal-error = off
|akka.log-config-on-start = off
""".stripMargin
}
Expand Down