Skip to content

Commit

Permalink
[FLINK-22643][network] Reuse tpc connections between taskmanagers
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Jan 18, 2022
1 parent 90b48cb commit 3cbd0a1
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 9 deletions.
Expand Up @@ -26,6 +26,12 @@
<td>Boolean</td>
<td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td>
</tr>
<tr>
<td><h5>taskmanager.network.max-num-tcp-connections</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>The maximum number of tpc connections between taskmanagers for data communication.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Expand Up @@ -44,6 +44,12 @@
<td>Boolean</td>
<td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td>
</tr>
<tr>
<td><h5>taskmanager.network.max-num-tcp-connections</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>The maximum number of tpc connections between taskmanagers for data communication.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Expand Up @@ -152,6 +152,15 @@ public class NettyShuffleEnvironmentOptions {
.defaultValue("1gb")
.withDescription("Maximum memory size for network buffers.");

/** The maximum number of tpc connections between taskmanagers for data communication. */
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Integer> MAX_NUM_TCP_CONNECTIONS =
key("taskmanager.network.max-num-tcp-connections")
.intType()
.defaultValue(1)
.withDescription(
"The maximum number of tpc connections between taskmanagers for data communication.");

/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input
* channel).
Expand Down
Expand Up @@ -115,7 +115,10 @@ static NettyShuffleEnvironment createNettyShuffleEnvironment(
ConnectionManager connectionManager =
nettyConfig != null
? new NettyConnectionManager(
resultPartitionManager, taskEventPublisher, nettyConfig)
resultPartitionManager,
taskEventPublisher,
nettyConfig,
config.getMaxNumberOfConnections())
: new LocalConnectionManager();

NetworkBufferPool networkBufferPool =
Expand Down
Expand Up @@ -43,14 +43,16 @@ public class NettyConnectionManager implements ConnectionManager {
public NettyConnectionManager(
ResultPartitionProvider partitionProvider,
TaskEventPublisher taskEventPublisher,
NettyConfig nettyConfig) {
NettyConfig nettyConfig,
int maxNumberOfConnections) {

this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

this.partitionRequestClientFactory =
new PartitionRequestClientFactory(client, nettyConfig.getNetworkRetries());
new PartitionRequestClientFactory(
client, nettyConfig.getNetworkRetries(), maxNumberOfConnections);

this.nettyProtocol =
new NettyProtocol(
Expand Down
Expand Up @@ -49,16 +49,24 @@ class PartitionRequestClientFactory {

private final int retryNumber;

private final int maxNumberOfConnections;

private final ConcurrentMap<ConnectionID, CompletableFuture<NettyPartitionRequestClient>>
clients = new ConcurrentHashMap<>();

PartitionRequestClientFactory(NettyClient nettyClient) {
this(nettyClient, 0);
this(nettyClient, 0, 1);
}

PartitionRequestClientFactory(NettyClient nettyClient, int retryNumber) {
this(nettyClient, retryNumber, 1);
}

PartitionRequestClientFactory(
NettyClient nettyClient, int retryNumber, int maxNumberOfConnections) {
this.nettyClient = nettyClient;
this.retryNumber = retryNumber;
this.maxNumberOfConnections = maxNumberOfConnections;
}

/**
Expand All @@ -67,6 +75,11 @@ class PartitionRequestClientFactory {
*/
NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
throws IOException, InterruptedException {
// We map the input ConnectionID to a new value to restrict the number of tcp connections
connectionId =
new ConnectionID(
connectionId.getAddress(),
connectionId.getConnectionIndex() % maxNumberOfConnections);
while (true) {
final CompletableFuture<NettyPartitionRequestClient> newClientFuture =
new CompletableFuture<>();
Expand Down
Expand Up @@ -87,6 +87,9 @@ public class NettyShuffleEnvironmentConfiguration {

private final int maxBuffersPerChannel;

/** The maximum number of tpc connections between taskmanagers for data communication. */
private final int maxNumberOfConnections;

public NettyShuffleEnvironmentConfiguration(
int numNetworkBuffers,
int networkBufferSize,
Expand All @@ -104,7 +107,8 @@ public NettyShuffleEnvironmentConfiguration(
int maxBuffersPerChannel,
long batchShuffleReadMemoryBytes,
int sortShuffleMinBuffers,
int sortShuffleMinParallelism) {
int sortShuffleMinParallelism,
int maxNumberOfConnections) {

this.numNetworkBuffers = numNetworkBuffers;
this.networkBufferSize = networkBufferSize;
Expand All @@ -123,6 +127,7 @@ public NettyShuffleEnvironmentConfiguration(
this.batchShuffleReadMemoryBytes = batchShuffleReadMemoryBytes;
this.sortShuffleMinBuffers = sortShuffleMinBuffers;
this.sortShuffleMinParallelism = sortShuffleMinParallelism;
this.maxNumberOfConnections = maxNumberOfConnections;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -199,6 +204,10 @@ public int getMaxBuffersPerChannel() {
return maxBuffersPerChannel;
}

public int getMaxNumberOfConnections() {
return maxNumberOfConnections;
}

// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -280,6 +289,11 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
String compressionCodec =
configuration.getString(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);

int maxNumConnections =
Math.max(
1,
configuration.getInteger(
NettyShuffleEnvironmentOptions.MAX_NUM_TCP_CONNECTIONS));
return new NettyShuffleEnvironmentConfiguration(
numberOfNetworkBuffers,
pageSize,
Expand All @@ -297,7 +311,8 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
maxBuffersPerChannel,
batchShuffleReadMemoryBytes,
sortShuffleMinBuffers,
sortShuffleMinParallelism);
sortShuffleMinParallelism,
maxNumConnections);
}

/**
Expand Down Expand Up @@ -434,6 +449,7 @@ public int hashCode() {
result = 31 * result + Objects.hashCode(batchShuffleReadMemoryBytes);
result = 31 * result + sortShuffleMinBuffers;
result = 31 * result + sortShuffleMinParallelism;
result = 31 * result + maxNumberOfConnections;
return result;
}

Expand Down Expand Up @@ -464,7 +480,8 @@ public boolean equals(Object obj) {
&& this.blockingShuffleCompressionEnabled
== that.blockingShuffleCompressionEnabled
&& this.maxBuffersPerChannel == that.maxBuffersPerChannel
&& Objects.equals(this.compressionCodec, that.compressionCodec);
&& Objects.equals(this.compressionCodec, that.compressionCodec)
&& this.maxNumberOfConnections == that.maxNumberOfConnections;
}
}

Expand Down Expand Up @@ -501,6 +518,8 @@ public String toString() {
+ sortShuffleMinBuffers
+ ", sortShuffleMinParallelism="
+ sortShuffleMinParallelism
+ ", maxNumberOfConnections="
+ maxNumberOfConnections
+ '}';
}
}
Expand Up @@ -77,6 +77,8 @@ public class NettyShuffleEnvironmentBuilder {

private Executor ioExecutor = Executors.directExecutor();

private int maxNumberOfConnections = 1;

public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) {
this.taskManagerLocation = taskManagerLocation;
return this;
Expand Down Expand Up @@ -170,6 +172,11 @@ public NettyShuffleEnvironmentBuilder setIoExecutor(Executor ioExecutor) {
return this;
}

public NettyShuffleEnvironmentBuilder setMaxNumberOfConnections(int maxNumberOfConnections) {
this.maxNumberOfConnections = maxNumberOfConnections;
return this;
}

public NettyShuffleEnvironment build() {
return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
new NettyShuffleEnvironmentConfiguration(
Expand All @@ -189,7 +196,8 @@ public NettyShuffleEnvironment build() {
maxBuffersPerChannel,
batchShuffleReadMemoryBytes,
sortShuffleMinBuffers,
sortShuffleMinParallelism),
sortShuffleMinParallelism,
maxNumberOfConnections),
taskManagerLocation,
new TaskEventDispatcher(),
resultPartitionManager,
Expand Down
Expand Up @@ -163,6 +163,6 @@ public void testManualConfiguration() throws Exception {

private NettyConnectionManager createNettyConnectionManager(NettyConfig config) {
return new NettyConnectionManager(
new ResultPartitionManager(), new TaskEventDispatcher(), config);
new ResultPartitionManager(), new TaskEventDispatcher(), config, 1);
}
}
Expand Up @@ -34,13 +34,16 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -115,6 +118,36 @@ public void testExceptionsAreNotCached() throws Exception {
}
}

@Test
public void testReuseNettyPartitionRequestClient() throws Exception {
NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
try {
checkReuseNettyPartitionRequestClient(nettyServerAndClient, 1);
checkReuseNettyPartitionRequestClient(nettyServerAndClient, 2);
checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5);
checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10);
} finally {
nettyServerAndClient.client().shutdown();
nettyServerAndClient.server().shutdown();
}
}

private void checkReuseNettyPartitionRequestClient(
NettyTestUtil.NettyServerAndClient nettyServerAndClient, int maxNumberOfConnections)
throws Exception {
final Set<NettyPartitionRequestClient> set = new HashSet<>();

final PartitionRequestClientFactory factory =
new PartitionRequestClientFactory(
nettyServerAndClient.client(), 0, maxNumberOfConnections);
for (int i = 0; i < Math.max(100, maxNumberOfConnections); i++) {
final ConnectionID connectionID =
nettyServerAndClient.getConnectionID((int) (Math.random() * Integer.MAX_VALUE));
set.add(factory.createPartitionRequestClient(connectionID));
}
assertTrue(set.size() <= maxNumberOfConnections);
}

@Test
public void testNettyClientConnectRetry() throws Exception {
NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
Expand Down

0 comments on commit 3cbd0a1

Please sign in to comment.