Skip to content

Commit

Permalink
[hotfix] Add the switch for keeping both the old mode and the new cre…
Browse files Browse the repository at this point in the history
…dit-based mode
  • Loading branch information
zhijiangW authored and StefanRRichter committed Feb 16, 2018
1 parent a9fdbc4 commit 0093bcb
Show file tree
Hide file tree
Showing 28 changed files with 1,564 additions and 496 deletions.
Expand Up @@ -306,6 +306,17 @@ public class TaskManagerOptions {
.defaultValue(false)
.withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.");

/**
* Config parameter defining whether to enable credit-based flow control or not.
*
* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of
* credit-based flow control.
*/
@Deprecated
public static final ConfigOption<Boolean> NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED =
key("taskmanager.network.credit-based-flow-control.enabled")
.defaultValue(true);

// ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
Expand Down
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;

import java.io.IOException;

/**
* Channel handler to read and write network messages on client side.
*/
public interface NetworkClientHandler extends ChannelHandler {

void addInputChannel(RemoteInputChannel inputChannel) throws IOException;

void removeInputChannel(RemoteInputChannel inputChannel);

void cancelRequestFor(InputChannelID inputChannelId);

/**
* The credit begins to announce after receiving the sender's backlog from buffer response.
* Than means it should only happen after some interactions with the channel to make sure
* the context will not be null.
*
* @param inputChannel The input channel with unannounced credits.
*/
void notifyCreditAvailable(final RemoteInputChannel inputChannel);
}
Expand Up @@ -85,6 +85,8 @@ public class NetworkEnvironment {
/** Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */
private final int extraNetworkBuffersPerGate;

private final boolean enableCreditBased;

private boolean isShutdown;

public NetworkEnvironment(
Expand All @@ -99,7 +101,8 @@ public NetworkEnvironment(
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int extraNetworkBuffersPerGate) {
int extraNetworkBuffersPerGate,
boolean enableCreditBased) {

this.networkBufferPool = checkNotNull(networkBufferPool);
this.connectionManager = checkNotNull(connectionManager);
Expand All @@ -118,6 +121,8 @@ public NetworkEnvironment(
isShutdown = false;
this.networkBuffersPerChannel = networkBuffersPerChannel;
this.extraNetworkBuffersPerGate = extraNetworkBuffersPerGate;

this.enableCreditBased = enableCreditBased;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -223,14 +228,24 @@ public void setupPartition(ResultPartition partition) throws IOException {
@VisibleForTesting
public void setupInputGate(SingleInputGate gate) throws IOException {
BufferPool bufferPool = null;

int maxNumberOfMemorySegments;
try {
int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
// Create a buffer pool for floating buffers and assign exclusive buffers to input channels directly
bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate,
maxNumberOfMemorySegments);
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
if (enableCreditBased) {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
extraNetworkBuffersPerGate : Integer.MAX_VALUE;

// Create a buffer pool for floating buffers and assign exclusive buffers to input channels directly
bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate,
maxNumberOfMemorySegments);
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
} else {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
extraNetworkBuffersPerGate : Integer.MAX_VALUE;

bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
maxNumberOfMemorySegments);
}
gate.setBufferPool(bufferPool);
} catch (Throwable t) {
if (bufferPool != null) {
Expand Down
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;

import java.io.IOException;

/**
* Simple wrapper for the partition readerQueue iterator, which increments a
* sequence number for each returned buffer and remembers the receiver ID.
*/
public interface NetworkSequenceViewReader {

void requestSubpartitionView(
ResultPartitionProvider partitionProvider,
ResultPartitionID resultPartitionId,
int subPartitionIndex) throws IOException;

BufferAndAvailability getNextBuffer() throws IOException, InterruptedException;

/**
* The credits from consumer are added in incremental way.
*
* @param creditDeltas The credit deltas
*/
void addCredit(int creditDeltas);

/**
* Checks whether this reader is available or not.
*
* @return True if the reader is available.
*/
boolean isAvailable();

boolean isRegisteredAsAvailable();

/**
* Updates the value to indicate whether the reader is enqueued in the pipeline or not.
*
* @param isRegisteredAvailable True if this reader is already enqueued in the pipeline.
*/
void setRegisteredAsAvailable(boolean isRegisteredAvailable);

void notifySubpartitionConsumed() throws IOException;

boolean isReleased();

void releaseAllResources() throws IOException;

Throwable getFailureCause();

InputChannelID getReceiverId();

int getSequenceNumber();
}

0 comments on commit 0093bcb

Please sign in to comment.