Skip to content

Commit

Permalink
[FLINK-5040] [taskmanager] Adjust partition request backoffs
Browse files Browse the repository at this point in the history
The back offs were hard coded before, which would have made it
impossible to react to any potential problems with them.

This closes #2784.
  • Loading branch information
uce committed Nov 11, 2016
1 parent 2742d5c commit 5d5637b
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 22 deletions.
Expand Up @@ -34,6 +34,20 @@ public class TaskManagerOptions {

// @TODO Migrate 'taskmanager.*' config options from ConfigConstants

// ------------------------------------------------------------------------
// Network Options
// ------------------------------------------------------------------------

/** Minimum backoff for partition requests of input channels. */
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.net.request-backoff.initial")
.defaultValue(100);

/** Maximum backoff for partition requests of input channels. */
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.net.request-backoff.max")
.defaultValue(10000);

// ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
Expand All @@ -44,8 +58,8 @@ public class TaskManagerOptions {
*/
public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL =
key("task.cancellation.interval")
.defaultValue(30000L)
.withDeprecatedKeys("task.cancellation-interval");
.defaultValue(30000L)
.withDeprecatedKeys("task.cancellation-interval");

/**
* Timeout in milliseconds after which a task cancellation times out and
Expand All @@ -54,19 +68,19 @@ public class TaskManagerOptions {
*/
public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT =
key("task.cancellation.timeout")
.defaultValue(180000L);
.defaultValue(180000L);

/**
* The maximum number of bytes that a checkpoint alignment may buffer.
* If the checkpoint alignment buffers more than the configured amount of
* data, the checkpoint is aborted (skipped).
*
*
* <p>The default value of {@code -1} indicates that there is no limit.
*/
public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
key("task.checkpoint.alignment.max-size")
.defaultValue(-1L);

// ------------------------------------------------------------------------

/** Not intended to be instantiated */
Expand Down
Expand Up @@ -49,7 +49,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
private final int numberOfSubpartitions;

/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
private final boolean lazyScheduling;
private final boolean sendScheduleOrUpdateConsumersMessage;

public ResultPartitionDeploymentDescriptor(
IntermediateDataSetID resultId,
Expand All @@ -64,7 +64,7 @@ public ResultPartitionDeploymentDescriptor(

checkArgument(numberOfSubpartitions >= 1);
this.numberOfSubpartitions = numberOfSubpartitions;
this.lazyScheduling = lazyScheduling;
this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
}

public IntermediateDataSetID getResultId() {
Expand All @@ -83,8 +83,8 @@ public int getNumberOfSubpartitions() {
return numberOfSubpartitions;
}

public boolean allowLazyScheduling() {
return lazyScheduling;
public boolean sendScheduleOrUpdateConsumersMessage() {
return sendScheduleOrUpdateConsumersMessage;
}

@Override
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import com.google.common.collect.Maps;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
Expand Down Expand Up @@ -520,6 +521,13 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) {

// ------------------------------------------------------------------------

@VisibleForTesting
Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
return inputChannels;
}

// ------------------------------------------------------------------------

/**
* Creates an input gate and all of its input channels.
*/
Expand Down Expand Up @@ -565,7 +573,7 @@ else if (partitionLocation.isRemote()) {
partitionLocation.getConnectionId(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialBackoff(),
networkEnvironment.getPartitionRequestInitialBackoff(),
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
}
Expand Down
Expand Up @@ -346,7 +346,7 @@ public Task(
resultPartitionConsumableNotifier,
ioManager,
networkEnvironment.getDefaultIOMode(),
desc.allowLazyScheduling());
desc.sendScheduleOrUpdateConsumersMessage());

writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);

Expand Down
Expand Up @@ -23,10 +23,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
import org.apache.flink.runtime.io.network.netty.NettyConfig

case class NetworkEnvironmentConfiguration(
numNetworkBuffers: Int,
networkBufferSize: Int,
memoryType: MemoryType,
ioMode: IOMode,
nettyConfig: Option[NettyConfig] = None,
partitionRequestInitialBackoff: Int = 500,
partitinRequestMaxBackoff: Int = 3000)
numNetworkBuffers: Int,
networkBufferSize: Int,
memoryType: MemoryType,
ioMode: IOMode,
partitionRequestInitialBackoff : Int,
partitionRequestMaxBackoff : Int,
nettyConfig: Option[NettyConfig] = None)
Expand Up @@ -1982,7 +1982,7 @@ object TaskManager {
kvStateServer,
netConfig.ioMode,
netConfig.partitionRequestInitialBackoff,
netConfig.partitinRequestMaxBackoff)
netConfig.partitionRequestMaxBackoff)

network.start()

Expand Down Expand Up @@ -2258,11 +2258,18 @@ object TaskManager {

val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC

val initialRequestBackoff = configuration.getInteger(
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL)
val maxRequestBackoff = configuration.getInteger(
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX)

val networkConfig = NetworkEnvironmentConfiguration(
numNetworkBuffers,
pageSize,
memType,
ioMode,
initialRequestBackoff,
maxRequestBackoff,
nettyConfig)

// ----> timeouts, library caching, profiling
Expand Down
Expand Up @@ -55,6 +55,6 @@ public void testSerialization() throws Exception {
assertEquals(partitionId, copy.getPartitionId());
assertEquals(partitionType, copy.getPartitionType());
assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
assertTrue(copy.allowLazyScheduling());
assertTrue(copy.sendScheduleOrUpdateConsumersMessage());
}
}
Expand Up @@ -21,11 +21,14 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
Expand All @@ -42,9 +45,12 @@
import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -271,6 +277,84 @@ public void run() {
assertEquals(IllegalStateException.class, asyncException.get().getClass());
}

/**
* Tests request back off configuration is correctly forwarded to the channels.
*/
@Test
public void testRequestBackoffConfiguration() throws Exception {
ResultPartitionID[] partitionIds = new ResultPartitionID[] {
new ResultPartitionID(),
new ResultPartitionID(),
new ResultPartitionID()
};

InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{
// Local
new InputChannelDeploymentDescriptor(
partitionIds[0],
ResultPartitionLocation.createLocal()),
// Remote
new InputChannelDeploymentDescriptor(
partitionIds[1],
ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
// Unknown
new InputChannelDeploymentDescriptor(
partitionIds[2],
ResultPartitionLocation.createUnknown())};

InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);

int initialBackoff = 137;
int maxBackoff = 1001;

NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
when(netEnv.getResultPartitionManager()).thenReturn(new ResultPartitionManager());
when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
when(netEnv.getPartitionRequestInitialBackoff()).thenReturn(initialBackoff);
when(netEnv.getPartitionRequestMaxBackoff()).thenReturn(maxBackoff);
when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());

SingleInputGate gate = SingleInputGate.create(
"TestTask",
new JobID(),
new ExecutionAttemptID(),
gateDesc,
netEnv,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());

Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();

assertEquals(3, channelMap.size());
InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
assertEquals(LocalInputChannel.class, localChannel.getClass());

InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
assertEquals(RemoteInputChannel.class, remoteChannel.getClass());

InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
assertEquals(UnknownInputChannel.class, unknownChannel.getClass());

InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel};
for (InputChannel ch : channels) {
assertEquals(0, ch.getCurrentBackoff());

assertTrue(ch.increaseBackoff());
assertEquals(initialBackoff, ch.getCurrentBackoff());

assertTrue(ch.increaseBackoff());
assertEquals(initialBackoff * 2, ch.getCurrentBackoff());

assertTrue(ch.increaseBackoff());
assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());

assertTrue(ch.increaseBackoff());
assertEquals(maxBackoff, ch.getCurrentBackoff());

assertFalse(ch.increaseBackoff());
}
}

/**
* Returns whether the stack trace represents a Thread in a blocking queue
* poll call.
Expand Down
Expand Up @@ -104,7 +104,7 @@ public void testComponentsStartupShutdown() {
config);

final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(), 0, 0);
32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, Option.<NettyConfig>empty());

ResourceID taskManagerId = ResourceID.generate();

Expand All @@ -121,7 +121,7 @@ public void testComponentsStartupShutdown() {
null,
netConf.ioMode(),
netConf.partitionRequestInitialBackoff(),
netConf.partitinRequestMaxBackoff());
netConf.partitionRequestMaxBackoff());

network.start();

Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.BlobKey;
Expand Down Expand Up @@ -903,6 +904,8 @@ public void testRemotePartitionNotFound() throws Exception {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);

taskManager = TestingUtils.createTaskManager(
system,
Expand Down Expand Up @@ -998,6 +1001,8 @@ public void testLocalPartitionNotFound() throws Exception {
jobManager = new AkkaActorGateway(jm, leaderSessionID);

final Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);

taskManager = TestingUtils.createTaskManager(
system,
Expand Down

0 comments on commit 5d5637b

Please sign in to comment.