Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send commit concurrently in client side #59

Merged
merged 7 commits into from
Jul 17, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public class RssMRConfig {
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
public static final String RSS_DATA_COMMIT_POOL_SIZE =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;

public static final String RSS_CLIENT_SEND_THREAD_NUM =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
public static final int RSS_CLIENT_DEFAULT_SEND_THREAD_NUM =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,13 @@ public static ShuffleWriteClient createShuffleClient(JobConf jobConf) {
RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
int dataTransferPoolSize = jobConf.getInt(RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE,
RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
int dataCommitPoolSize = jobConf.getInt(RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE,
RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
ShuffleWriteClient client = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled,
dataTransferPoolSize);
dataTransferPoolSize, dataCommitPoolSize);
return client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public class RssSparkConfig {
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
public static final String RSS_DATA_COMMIT_POOL_SIZE =
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;

public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
public static final String RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class RssShuffleManager implements ShuffleManager {
private final int dataReplicaRead;
private final boolean dataReplicaSkipEnabled;
private final int dataTransferPoolSize;
private final int dataCommitPoolSize;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
private RemoteStorageInfo remoteStorage;
Expand Down Expand Up @@ -168,10 +169,13 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
int heartBeatThreadNum = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
shuffleWriteClient = ShuffleClientFactory
this.dataCommitPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
this.shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize,
dataCommitPoolSize);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class RssShuffleManager implements ShuffleManager {
private final int dataReplicaRead;
private final boolean dataReplicaSkipEnabled;
private final int dataTransferPoolSize;
private final int dataCommitPoolSize;
private ShuffleWriteClient shuffleWriteClient;
private final Map<String, Set<Long>> taskToSuccessBlockIds;
private final Map<String, Set<Long>> taskToFailedBlockIds;
Expand Down Expand Up @@ -169,11 +170,14 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {

this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
this.dataCommitPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);

shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize,
dataCommitPoolSize);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
Expand Down Expand Up @@ -238,11 +242,14 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
this.dataCommitPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);

shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize,
dataCommitPoolSize);
this.taskToSuccessBlockIds = taskToSuccessBlockIds;
this.taskToFailedBlockIds = taskToFailedBlockIds;
if (loop != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public static ShuffleClientFactory getInstance() {

public ShuffleWriteClient createShuffleWriteClient(
String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, int dataTransferPoolSize) {
int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, int dataTransferPoolSize,
int dataCommitPoolSize) {
return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
replica, replicaWrite, replicaRead, replicaSkipEnabled, dataTransferPoolSize);
replica, replicaWrite, replicaRead, replicaSkipEnabled, dataTransferPoolSize, dataCommitPoolSize);
}

public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,20 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
private int replicaRead;
private boolean replicaSkipEnabled;
private int dataTranferPoolSize;
private int dataCommitPoolSize = -1;
private final ForkJoinPool dataTransferPool;

public ShuffleWriteClientImpl(String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled,
int dataTranferPoolSize) {
public ShuffleWriteClientImpl(
String clientType,
int retryMax,
long retryIntervalMax,
int heartBeatThreadNum,
int replica,
int replicaWrite,
int replicaRead,
boolean replicaSkipEnabled,
int dataTranferPoolSize,
int dataCommitPoolSize) {
this.clientType = clientType;
this.retryMax = retryMax;
this.retryIntervalMax = retryIntervalMax;
Expand All @@ -105,6 +114,7 @@ public ShuffleWriteClientImpl(String clientType, int retryMax, long retryInterva
this.replicaSkipEnabled = replicaSkipEnabled;
this.dataTranferPoolSize = dataTranferPoolSize;
this.dataTransferPool = new ForkJoinPool(dataTranferPoolSize);
this.dataCommitPoolSize = dataCommitPoolSize;
}

private boolean sendShuffleDataAsync(
Expand Down Expand Up @@ -247,43 +257,62 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
return new SendShuffleDataResult(successBlockIds, failedBlockIds);
}

/**
* This method will wait until all shuffle data have been flushed
* to durable storage in assigned shuffle servers.
* @param shuffleServerInfoSet
* @param appId
* @param shuffleId
* @param numMaps
* @return
*/
@Override
public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
ForkJoinPool forkJoinPool = new ForkJoinPool(
dataCommitPoolSize == -1 ? shuffleServerInfoSet.size() : dataCommitPoolSize
);
AtomicInteger successfulCommit = new AtomicInteger(0);
shuffleServerInfoSet.stream().forEach(ssi -> {
RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
long startTime = System.currentTimeMillis();
try {
RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
int commitCount = response.getCommitCount();
LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+ "] to ShuffleServer[" + ssi.getId() + "], cost "
+ (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+ commitCount + "], map number of stage is " + numMaps);
if (commitCount >= numMaps) {
RssFinishShuffleResponse rfsResponse =
getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+ "] with statusCode " + rfsResponse.getStatusCode();
try {
forkJoinPool.submit(() -> {
shuffleServerInfoSet.parallelStream().forEach(ssi -> {
RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
String errorMsg = "Failed to commit shuffle data to " + ssi + " for shuffleId[" + shuffleId + "]";
long startTime = System.currentTimeMillis();
try {
RssSendCommitResponse response = getShuffleServerClient(ssi).sendCommit(request);
if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
int commitCount = response.getCommitCount();
LOG.info("Successfully sendCommit for appId[" + appId + "], shuffleId[" + shuffleId
+ "] to ShuffleServer[" + ssi.getId() + "], cost "
+ (System.currentTimeMillis() - startTime) + " ms, got committed maps["
+ commitCount + "], map number of stage is " + numMaps);
if (commitCount >= numMaps) {
RssFinishShuffleResponse rfsResponse =
getShuffleServerClient(ssi).finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
String msg = "Failed to finish shuffle to " + ssi + " for shuffleId[" + shuffleId
+ "] with statusCode " + rfsResponse.getStatusCode();
LOG.error(msg);
throw new Exception(msg);
} else {
LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
}
}
} else {
String msg = errorMsg + " with statusCode " + response.getStatusCode();
LOG.error(msg);
throw new Exception(msg);
} else {
LOG.info("Successfully finish shuffle to " + ssi + " for shuffleId[" + shuffleId + "]");
}
successfulCommit.incrementAndGet();
} catch (Exception e) {
LOG.error(errorMsg, e);
}
} else {
String msg = errorMsg + " with statusCode " + response.getStatusCode();
LOG.error(msg);
throw new Exception(msg);
}
successfulCommit.incrementAndGet();
} catch (Exception e) {
LOG.error(errorMsg, e);
}
});
});
}).join();
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use

finally {
   forkJoinPool.shutdownNow();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My fault…..

forkJoinPool.shutdownNow();
}

// check if every commit/finish call is successful
return successfulCommit.get() == shuffleServerInfoSet.size();
}
Expand Down Expand Up @@ -508,6 +537,7 @@ public void sendAppHeartbeat(String appId, long timeoutMs) {
public void close() {
heartBeatExecutorService.shutdownNow();
coordinatorClients.forEach(CoordinatorClient::close);
dataTransferPool.shutdownNow();
}

private void throwExceptionIfNecessary(ClientResponse response, String errorMsg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class RssClientConfig {
public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE = true;
public static final String RSS_DATA_TRANSFER_POOL_SIZE = "rss.client.data.transfer.pool.size";
public static final int RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE = Runtime.getRuntime().availableProcessors();
public static final String RSS_DATA_COMMIT_POOL_SIZE = "rss.client.data.commit.pool.size";
public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE = -1;
public static final String RSS_HEARTBEAT_INTERVAL = "rss.heartbeat.interval";
public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE = 10 * 1000L;
public static final String RSS_HEARTBEAT_TIMEOUT = "rss.heartbeat.timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ShuffleWriteClientImplTest {
@Test
public void testSendData() {
ShuffleWriteClientImpl shuffleWriteClient =
new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1);
new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1, 1);
ShuffleServerClient mockShuffleServerClient = mock(ShuffleServerClient.class);
ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public static void setupServers() throws Exception {
@Test
public void testTags() throws Exception {
ShuffleWriteClientImpl shuffleWriteClient = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
1, 1, 1, true, 1);
1, 1, 1, true, 1, 1);
shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);

// Case1 : only set the single default shuffle version tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private void registerShuffleServer(String testAppId,
int replica, int replicaWrite, int replicaRead, boolean replicaSkip) {

shuffleWriteClientImpl = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
replica, replicaWrite, replicaRead, replicaSkip, 1);
replica, replicaWrite, replicaRead, replicaSkip, 1, 1);

List<ShuffleServerInfo> allServers = Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1,
shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void createClient() {
public void clearResourceTest() throws Exception {
final ShuffleWriteClient shuffleWriteClient =
ShuffleClientFactory.getInstance().createShuffleWriteClient(
"GRPC", 2, 10000L, 4, 1, 1, 1, true, 1);
"GRPC", 2, 10000L, 4, 1, 1, 1, true, 1, 1);
shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
shuffleWriteClient.registerShuffle(
new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void setupServers() throws Exception {
@BeforeEach
public void createClient() {
shuffleWriteClientImpl = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
1, 1, 1, true, 1);
1, 1, 1, true, 1, 1);
}

@AfterEach
Expand Down