Skip to content

Commit

Permalink
[FLINK-1636] [runtime] Add partition request backoff logic to LocalIn…
Browse files Browse the repository at this point in the history
…putChannel
  • Loading branch information
uce committed May 27, 2015
1 parent 0ef2159 commit ceb890f
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 78 deletions.
Expand Up @@ -22,8 +22,13 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import scala.Tuple2;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* An input channel consumes a single {@link ResultSubpartitionView}.
Expand All @@ -43,10 +48,41 @@ public abstract class InputChannel {

protected final SingleInputGate inputGate;

protected InputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId) {
this.inputGate = inputGate;
// - Asynchronous error notification --------------------------------------

private final AtomicReference<Throwable> cause = new AtomicReference<Throwable>();

// - Partition request backoff --------------------------------------------

/** The initial backoff (in ms). */
private final int initialBackoff;

/** The maximum backoff (in ms). */
private final int maxBackoff;

/** The current backoff (in ms) */
private int currentBackoff;

protected InputChannel(
SingleInputGate inputGate,
int channelIndex,
ResultPartitionID partitionId,
Tuple2<Integer, Integer> initialAndMaxBackoff) {

checkArgument(channelIndex >= 0);

int initial = initialAndMaxBackoff._1();
int max = initialAndMaxBackoff._2();

checkArgument(initial >= 0 && initial <= max);

this.inputGate = checkNotNull(inputGate);
this.channelIndex = channelIndex;
this.partitionId = partitionId;
this.partitionId = checkNotNull(partitionId);

this.initialBackoff = initial;
this.maxBackoff = max;
this.currentBackoff = initial == 0 ? -1 : 0;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -109,4 +145,74 @@ protected void notifyAvailableBuffer() {
*/
abstract void releaseAllResources() throws IOException;

// ------------------------------------------------------------------------
// Error notification
// ------------------------------------------------------------------------

/**
* Checks for an error and rethrows it if one was reported.
*/
protected void checkError() throws IOException {
final Throwable t = cause.get();

if (t != null) {
if (t instanceof IOException) {
throw (IOException) t;
}
else {
throw new IOException(t);
}
}
}

/**
* Atomically sets an error for this channel and notifies the input gate about available data to
* trigger querying this channel by the task thread.
*/
protected void setError(Throwable cause) {
if (this.cause.compareAndSet(null, checkNotNull(cause))) {
// Notify the input gate.
notifyAvailableBuffer();
}
}

// ------------------------------------------------------------------------
// Partition request exponential backoff
// ------------------------------------------------------------------------

/**
* Returns the current backoff in ms.
*/
protected int getCurrentBackoff() {
return currentBackoff <= 0 ? 0 : currentBackoff;
}

/**
* Increases the current backoff and returns whether the operation was successful.
*
* @return <code>true</code>, iff the operation was successful. Otherwise, <code>false</code>.
*/
protected boolean increaseBackoff() {
// Backoff is disabled
if (currentBackoff < 0) {
return false;
}

// This is the first time backing off
if (currentBackoff == 0) {
currentBackoff = initialBackoff;

return true;
}

// Continue backing off
else if (currentBackoff < maxBackoff) {
currentBackoff = Math.min(currentBackoff * 2, maxBackoff);

return true;
}

// Reached maximum backoff
return false;
}
}
Expand Up @@ -23,14 +23,18 @@
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -42,6 +46,8 @@ public class LocalInputChannel extends InputChannel implements NotificationListe

private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);

private final Object requestLock = new Object();

/** The local partition manager. */
private final ResultPartitionManager partitionManager;

Expand All @@ -62,7 +68,19 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
ResultPartitionManager partitionManager,
TaskEventDispatcher taskEventDispatcher) {

super(inputGate, channelIndex, partitionId);
this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
new Tuple2<Integer, Integer>(0, 0));
}

LocalInputChannel(
SingleInputGate inputGate,
int channelIndex,
ResultPartitionID partitionId,
ResultPartitionManager partitionManager,
TaskEventDispatcher taskEventDispatcher,
Tuple2<Integer, Integer> initialAndMaxBackoff) {

super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);

this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
Expand All @@ -74,23 +92,59 @@ public class LocalInputChannel extends InputChannel implements NotificationListe

@Override
void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
if (subpartitionView == null) {
LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
this, subpartitionIndex, partitionId);

subpartitionView = partitionManager.createSubpartitionView(
partitionId, subpartitionIndex, inputGate.getBufferProvider());

// The lock is required to request only once in the presence of retriggered requests.
synchronized (requestLock) {
if (subpartitionView == null) {
throw new IOException("Error requesting subpartition.");
LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
this, subpartitionIndex, partitionId);

try {
subpartitionView = partitionManager.createSubpartitionView(
partitionId, subpartitionIndex, inputGate.getBufferProvider());
}
catch (PartitionNotFoundException notFound) {
if (increaseBackoff()) {
inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
return;
}
else {
throw notFound;
}
}

if (subpartitionView == null) {
throw new IOException("Error requesting subpartition.");
}

getNextLookAhead();
}
}
}

getNextLookAhead();
/**
* Retriggers a subpartition request.
*/
void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) throws IOException, InterruptedException {
synchronized (requestLock) {
checkState(subpartitionView == null, "Already requested partition.");

timer.schedule(new TimerTask() {
@Override
public void run() {
try {
requestSubpartition(subpartitionIndex);
}
catch (Throwable t) {
setError(t);
}
}
}, getCurrentBackoff());
}
}

@Override
Buffer getNextBuffer() throws IOException, InterruptedException {
checkError();
checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");

// After subscribe notification
Expand Down Expand Up @@ -119,6 +173,7 @@ Buffer getNextBuffer() throws IOException, InterruptedException {

@Override
void sendTaskEvent(TaskEvent event) throws IOException {
checkError();
checkState(subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");

if (!taskEventDispatcher.publish(partitionId, event)) {
Expand Down
Expand Up @@ -34,9 +34,7 @@
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand All @@ -56,12 +54,6 @@ public class RemoteInputChannel extends InputChannel {
/** The connection manager to use connect to the remote partition provider. */
private final ConnectionManager connectionManager;

/**
* An asynchronous error notification. Set by either the network I/O thread or the thread
* failing a partition request.
*/
private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

/**
* The received buffers. Received buffers are enqueued by the network I/O thread and the queue
* is consumed by the receiving task thread.
Expand All @@ -83,12 +75,6 @@ public class RemoteInputChannel extends InputChannel {
*/
private int expectedSequenceNumber = 0;

/** The current backoff time (in ms) for partition requests. */
private int nextRequestBackoffMs;

/** The maximum backoff time (in ms) after which a request fails */
private final int maxRequestBackoffMs;

RemoteInputChannel(
SingleInputGate inputGate,
int channelIndex,
Expand All @@ -108,15 +94,10 @@ public class RemoteInputChannel extends InputChannel {
ConnectionManager connectionManager,
Tuple2<Integer, Integer> initialAndMaxBackoff) {

super(inputGate, channelIndex, partitionId);
super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);

this.connectionId = checkNotNull(connectionId);
this.connectionManager = checkNotNull(connectionManager);

checkArgument(initialAndMaxBackoff._1() <= initialAndMaxBackoff._2());

this.nextRequestBackoffMs = initialAndMaxBackoff._1();
this.maxRequestBackoffMs = initialAndMaxBackoff._2();
}

// ------------------------------------------------------------------------
Expand All @@ -143,17 +124,9 @@ void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedE
void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, InterruptedException {
checkState(partitionRequestClient != null, "Missing initial subpartition request.");

// Disabled
if (nextRequestBackoffMs == 0) {
failPartitionRequest();
}
else if (nextRequestBackoffMs <= maxRequestBackoffMs) {
partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, nextRequestBackoffMs);

// Exponential backoff
nextRequestBackoffMs = nextRequestBackoffMs < maxRequestBackoffMs
? Math.min(nextRequestBackoffMs * 2, maxRequestBackoffMs)
: maxRequestBackoffMs + 1; // Fail the next request
if (increaseBackoff()) {
partitionRequestClient.requestSubpartition(
partitionId, subpartitionIndex, this, getCurrentBackoff());
}
else {
failPartitionRequest();
Expand Down Expand Up @@ -230,7 +203,7 @@ void releaseAllResources() throws IOException {
}

public void failPartitionRequest() {
onError(new PartitionNotFoundException(partitionId));
setError(new PartitionNotFoundException(partitionId));
}

@Override
Expand Down Expand Up @@ -305,26 +278,7 @@ public void onFailedPartitionRequest() {
}

public void onError(Throwable cause) {
if (error.compareAndSet(null, cause)) {
// Notify the input gate to trigger querying of this channel
notifyAvailableBuffer();
}
}

/**
* Checks whether this channel got notified about an error.
*/
private void checkError() throws IOException {
final Throwable t = error.get();

if (t != null) {
if (t instanceof IOException) {
throw (IOException) t;
}
else {
throw new IOException(t);
}
}
setError(cause);
}

public static class BufferReorderingException extends IOException {
Expand Down

0 comments on commit ceb890f

Please sign in to comment.