Skip to content

Commit

Permalink
[SPARK-32916][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Ensure the number …
Browse files Browse the repository at this point in the history
…of chunks in meta file and index file are equal

### What changes were proposed in this pull request?
1. Fixes for bugs in `RemoteBlockPushResolver` where the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in #30062.
 - If the writing to index file fails, the position of meta file is not reset. This means that the number of chunks in meta file is inconsistent with index file.
- During the exception handling while writing to index/meta file, we just set the pointer to the start position. If the files are closed just after this then it doesn't get rid of any the extra bytes written to it.
2. Adds an IOException threshold. If the `RemoteBlockPushResolver` encounters IOExceptions greater than this threshold  while updating data/meta/index file of a shuffle partition, then it responds to the client with  exception- `IOExceptions exceeded the threshold` so that client can stop pushing data for this shuffle partition.
3. When the update to metadata fails, exception is not propagated back to the client. This results in the increased size of the current chunk. However, with (2) in place, the current chunk will still be of a manageable size.

### Why are the changes needed?
This fix is needed for the bugs mentioned above.
1. Moved writing to meta file after index file. This fixes the issue because if there is an exception writing to meta file, then the index file position is not updated. With this change, if there is an exception writing to index file, then none of the files are effectively updated and the same is true vice-versa.
2. Truncating the lengths of data/index/meta files when the partition is finalized.
3. When the IOExceptions have reached the threshold, it is most likely that future blocks will also face the issue. So, it is better to let the clients know so that they can stop pushing the blocks for that partition.
4. When just the meta update fails, client retries pushing the block which was successfully merged to data file. This can be avoided by letting the chunk grow slightly.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests for all the bugs and threshold.

Closes #30433 from otterc/SPARK-32916-followup.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 0677c39)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
  • Loading branch information
otterc authored and Mridul Muralidharan committed Dec 23, 2020
1 parent 0b746ea commit b174ac7
Show file tree
Hide file tree
Showing 4 changed files with 629 additions and 71 deletions.
Expand Up @@ -398,4 +398,14 @@ public long mergedIndexCacheSize() {
return JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
}

/**
* The threshold for number of IOExceptions while merging shuffle blocks to a shuffle partition.
* When the number of IOExceptions while writing to merged shuffle data/index/meta file exceed
* this threshold then the shuffle server will respond back to client to stop pushing shuffle
* blocks for this shuffle partition.
*/
public int ioExceptionsThresholdDuringMerge() {
return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
}
}
Expand Up @@ -71,6 +71,15 @@ class BlockPushErrorHandler implements ErrorHandler {
public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
"Couldn't find an opportunity to write block";

/**
* String constant used for generating exception messages indicating the server encountered
* IOExceptions multiple times, greater than the configured threshold, while trying to merged
* shuffle blocks of the same shuffle partition. When the client receives this this response,
* it will stop pushing any more blocks for the same shuffle partition.
*/
public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX =
"IOExceptions exceeded the threshold";

@Override
public boolean shouldRetryError(Throwable t) {
// If it is a connection time out or a connection closed exception, no need to retry.
Expand Down

0 comments on commit b174ac7

Please sign in to comment.