Skip to content

Commit

Permalink
[#1606] feat(client): Add client retry mechanism for NO_BUFFER when r…
Browse files Browse the repository at this point in the history
…eading data(memory/local/index) (#1616)

### What changes were proposed in this pull request?

1. Remove server-side retry when reading data, and switch to client-side backoff retry; 
2. Optimize lock contention issues when reading data, using CAS.

### Why are the changes needed?

Fix #1606.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UTs added. Tested in our env.
  • Loading branch information
rickyma committed Apr 8, 2024
1 parent 1051d26 commit f9d71da
Show file tree
Hide file tree
Showing 22 changed files with 803 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
LOG.info("In reduce: " + reduceId + ", Rss MR client starts to fetch blocks from RSS server");
JobConf readerJobConf = getRemoteConf();
boolean expectedTaskIdsBitmapFilterEnable = serverInfoList.size() > 1;
int retryMax =
rssJobConf.getInt(
RssMRConfig.RSS_CLIENT_RETRY_MAX, RssMRConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
long retryIntervalMax =
rssJobConf.getLong(
RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance()
.createShuffleReadClient(
Expand All @@ -232,6 +239,8 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
.hadoopConf(readerJobConf)
.idHelper(new MRIdHelper())
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
.retryMax(retryMax)
.retryIntervalMax(retryIntervalMax)
.rssConf(RssMRConfig.toRssConf(rssJobConf)));
RssFetcher fetcher =
new RssFetcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ public RssShuffleReader(
@Override
public Iterator<Product2<K, C>> read() {
LOG.info("Shuffle read started:" + getReadInfo());
int retryMax =
rssConf.getInteger(
RssClientConfig.RSS_CLIENT_RETRY_MAX,
RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
long retryIntervalMax =
rssConf.getLong(
RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance()
.createShuffleReadClient(
Expand All @@ -123,6 +131,8 @@ public Iterator<Product2<K, C>> read() {
.shuffleServerInfoList(shuffleServerInfoList)
.hadoopConf(hadoopConf)
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
.retryMax(retryMax)
.retryIntervalMax(retryIntervalMax)
.rssConf(rssConf));
RssShuffleDataIterator rssShuffleDataIterator =
new RssShuffleDataIterator<K, C>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ class MultiPartitionIterator<K, C> extends AbstractIterator<Product2<K, C>> {
boolean expectedTaskIdsBitmapFilterEnable =
!(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE)
|| shuffleServerInfoList.size() > 1;
int retryMax =
rssConf.getInteger(
RssClientConfig.RSS_CLIENT_RETRY_MAX,
RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
long retryIntervalMax =
rssConf.getLong(
RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance()
.createShuffleReadClient(
Expand All @@ -262,6 +270,8 @@ class MultiPartitionIterator<K, C> extends AbstractIterator<Product2<K, C>> {
.hadoopConf(hadoopConf)
.shuffleDataDistributionType(dataDistributionType)
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
.retryMax(retryMax)
.retryIntervalMax(retryIntervalMax)
.rssConf(rssConf));
RssShuffleDataIterator<K, C> iterator =
new RssShuffleDataIterator<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ public static class ReadClientBuilder {
private int indexReadLimit;
private long readBufferSize;
private ClientType clientType;
private int retryMax;
private long retryIntervalMax;

public ReadClientBuilder appId(String appId) {
this.appId = appId;
Expand Down Expand Up @@ -310,6 +312,16 @@ public ReadClientBuilder clientType(ClientType clientType) {
return this;
}

public ReadClientBuilder retryMax(int retryMax) {
this.retryMax = retryMax;
return this;
}

public ReadClientBuilder retryIntervalMax(long retryIntervalMax) {
this.retryIntervalMax = retryIntervalMax;
return this;
}

public ReadClientBuilder() {}

public String getAppId() {
Expand Down Expand Up @@ -388,6 +400,14 @@ public ClientType getClientType() {
return clientType;
}

public int getRetryMax() {
return retryMax;
}

public long getRetryIntervalMax() {
return retryIntervalMax;
}

public ShuffleReadClientImpl build() {
return new ShuffleReadClientImpl(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ private void init(ShuffleClientFactory.ReadClientBuilder builder) {
request.setExpectTaskIds(taskIdBitmap);
request.setClientConf(builder.getRssConf());
request.setClientType(builder.getClientType());
request.setRetryMax(builder.getRetryMax());
request.setRetryIntervalMax(builder.getRetryIntervalMax());
if (builder.isExpectedTaskIdsBitmapFilterEnable()) {
request.useExpectedTaskIdsBitmapFilter();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.test;

import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.server.MockedGrpcServer;
import org.apache.uniffle.server.MockedShuffleServer;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

public class RpcClientRetryTest extends ShuffleReadWriteBase {

private static ShuffleServerInfo shuffleServerInfo0;
private static ShuffleServerInfo shuffleServerInfo1;
private static ShuffleServerInfo shuffleServerInfo2;
private static MockedShuffleWriteClientImpl shuffleWriteClientImpl;

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(StorageType storageType) {
return ShuffleClientFactory.newReadBuilder()
.storageType(storageType.name())
.shuffleId(0)
.partitionId(0)
.indexReadLimit(100)
.partitionNumPerRange(1)
.partitionNum(10)
.readBufferSize(1000);
}

public static MockedShuffleServer createMockedShuffleServer(int id, File tmpDir)
throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
File dataDir1 = new File(tmpDir, id + "_1");
File dataDir2 = new File(tmpDir, id + "_2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 5.0);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 15.0);
shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 600L);
return new MockedShuffleServer(shuffleServerConf);
}

@BeforeAll
public static void initCluster(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);

grpcShuffleServers.add(createMockedShuffleServer(0, tmpDir));
grpcShuffleServers.add(createMockedShuffleServer(1, tmpDir));
grpcShuffleServers.add(createMockedShuffleServer(2, tmpDir));

shuffleServerInfo0 =
new ShuffleServerInfo(
String.format("127.0.0.1-%s", grpcShuffleServers.get(0).getGrpcPort()),
grpcShuffleServers.get(0).getIp(),
grpcShuffleServers.get(0).getGrpcPort());
shuffleServerInfo1 =
new ShuffleServerInfo(
String.format("127.0.0.1-%s", grpcShuffleServers.get(1).getGrpcPort()),
grpcShuffleServers.get(1).getIp(),
grpcShuffleServers.get(1).getGrpcPort());
shuffleServerInfo2 =
new ShuffleServerInfo(
String.format("127.0.0.1-%s", grpcShuffleServers.get(2).getGrpcPort()),
grpcShuffleServers.get(2).getIp(),
grpcShuffleServers.get(2).getGrpcPort());
for (CoordinatorServer coordinator : coordinators) {
coordinator.start();
}
for (ShuffleServer shuffleServer : grpcShuffleServers) {
shuffleServer.start();
}
}

public static void cleanCluster() throws Exception {
for (CoordinatorServer coordinator : coordinators) {
coordinator.stopServer();
}
for (ShuffleServer shuffleServer : grpcShuffleServers) {
shuffleServer.stopServer();
}
grpcShuffleServers = Lists.newArrayList();
coordinators = Lists.newArrayList();
}

@AfterAll
public static void cleanEnv() throws Exception {
if (shuffleWriteClientImpl != null) {
shuffleWriteClientImpl.close();
}
cleanCluster();
}

private static Stream<Arguments> testRpcRetryLogicProvider() {
return Stream.of(
Arguments.of(StorageType.MEMORY_LOCALFILE),
// According to SERVER_BUFFER_CAPACITY & SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
// data will be flushed to disk, so read from disk only
Arguments.of(StorageType.LOCALFILE));
}

@ParameterizedTest
@MethodSource("testRpcRetryLogicProvider")
public void testRpcRetryLogic(StorageType storageType) {
String testAppId = "testRpcRetryLogic";
registerShuffleServer(testAppId, 3, 2, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();

List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
0,
0,
0,
3,
25,
blockIdBitmap,
expectedData,
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));

SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap successfulBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Long blockId : result.getSuccessBlockIds()) {
successfulBlockIdBitmap.addLong(blockId);
}
for (Long blockId : result.getFailedBlockIds()) {
failedBlockIdBitmap.addLong(blockId);
}
assertEquals(0, failedBlockIdBitmap.getLongCardinality());
assertEquals(blockIdBitmap, successfulBlockIdBitmap);

Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);

ShuffleReadClientImpl readClient1 =
baseReadBuilder(storageType)
.appId(testAppId)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
.shuffleServerInfoList(
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
.retryMax(3)
.retryIntervalMax(1)
.build();

// The data cannot be read because the maximum number of retries is 3
enableFirstNReadRequestsToFail(4);
try {
validateResult(readClient1, expectedData);
fail();
} catch (Exception e) {
// do nothing
}
disableFirstNReadRequestsToFail();

ShuffleReadClientImpl readClient2 =
baseReadBuilder(storageType)
.appId(testAppId)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
.shuffleServerInfoList(
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
.retryMax(3)
.retryIntervalMax(1)
.build();

// The data can be read because the reader will retry
enableFirstNReadRequestsToFail(1);
validateResult(readClient2, expectedData);
disableFirstNReadRequestsToFail();
}

private static void enableFirstNReadRequestsToFail(int failedCount) {
for (ShuffleServer server : grpcShuffleServers) {
((MockedGrpcServer) server.getServer())
.getService()
.enableFirstNReadRequestToFail(failedCount);
}
}

private static void disableFirstNReadRequestsToFail() {
for (ShuffleServer server : grpcShuffleServers) {
((MockedGrpcServer) server.getServer()).getService().resetFirstNReadRequestToFail();
}
}

static class MockedShuffleWriteClientImpl extends ShuffleWriteClientImpl {
MockedShuffleWriteClientImpl(ShuffleClientFactory.WriteClientBuilder builder) {
super(builder);
}

public SendShuffleDataResult sendShuffleData(
String appId, List<ShuffleBlockInfo> shuffleBlockInfoList) {
return super.sendShuffleData(appId, shuffleBlockInfoList, () -> false);
}
}

private void registerShuffleServer(
String testAppId, int replica, int replicaWrite, int replicaRead, boolean replicaSkip) {

shuffleWriteClientImpl =
new MockedShuffleWriteClientImpl(
ShuffleClientFactory.newWriteBuilder()
.clientType(ClientType.GRPC.name())
.retryMax(3)
.retryIntervalMax(1000)
.heartBeatThreadNum(1)
.replica(replica)
.replicaWrite(replicaWrite)
.replicaRead(replicaRead)
.replicaSkipEnabled(replicaSkip)
.dataTransferPoolSize(1)
.dataCommitPoolSize(1)
.unregisterThreadPoolSize(10)
.unregisterRequestTimeSec(10));

List<ShuffleServerInfo> allServers =
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2);

for (int i = 0; i < replica; i++) {
shuffleWriteClientImpl.registerShuffle(
allServers.get(i),
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
1);
}
}
}

0 comments on commit f9d71da

Please sign in to comment.