Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37675][CORE][SHUFFLE] Return PushMergedRemoteMetaFailedFetchResult if no available push-merged block #34934

Closed
wants to merge 3 commits into from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Dec 17, 2021

What changes were proposed in this pull request?

When push-based shuffle enabled, reduce task will ask ESS for MergedMetas, then PushMergedRemoteMetaFailedFetchResult should be returned instead of PushMergedRemoteMetaFetchResult if there is no available push-merged block on ESS.

Why are the changes needed?

Because that push-based shuffle works as best-effort, there are opportunities that no chunks of the block are available on ESS, in current implementation, it will cause reduce task failed with ArithmeticException: / by zero.

Caused by: java.lang.ArithmeticException: / by zero
	at org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:117)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:980)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)

After the change, a PushMergedRemoteMetaFetchResult will let reduce task fall back to fetching the original blocks.

Does this PR introduce any user-facing change?

Yes, it'a bug fix. The change makes push-based shuffle stable. Before this change, my 1T TPCDS test failed several times w/ ArithmeticException: / by zero, after the change, passed w/o any exception.

How was this patch tested?

Existing tests, and run 1T TPCDS manually.

@github-actions github-actions bot added the CORE label Dec 17, 2021
@pan3793 pan3793 changed the title [SPARK-37675] Fix calculate approxChunkSize when bitmaps isEmpty [SPARK-37675][CORE][SHUFFLE] Fix calculate approxChunkSize when bitmaps isEmpty Dec 17, 2021
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@pan3793 pan3793 changed the title [SPARK-37675][CORE][SHUFFLE] Fix calculate approxChunkSize when bitmaps isEmpty [WIP][SPARK-37675][CORE][SHUFFLE] Fix calculate approxChunkSize when bitmaps isEmpty Dec 17, 2021
@pan3793
Copy link
Member Author

pan3793 commented Dec 17, 2021

Found another issue with this change, investigating

@pan3793 pan3793 changed the title [WIP][SPARK-37675][CORE][SHUFFLE] Fix calculate approxChunkSize when bitmaps isEmpty [WIP][SPARK-37675][CORE][SHUFFLE] Return PushMergedRemoteMetaFailedFetchResult if no available push-merged block Dec 17, 2021
@mridulm
Copy link
Contributor

mridulm commented Dec 18, 2021

+CC @otterc

@pan3793 pan3793 changed the title [WIP][SPARK-37675][CORE][SHUFFLE] Return PushMergedRemoteMetaFailedFetchResult if no available push-merged block [SPARK-37675][CORE][SHUFFLE] Return PushMergedRemoteMetaFailedFetchResult if no available push-merged block Dec 18, 2021
@pan3793
Copy link
Member Author

pan3793 commented Dec 18, 2021

@mridulm @otterc It's ready for review now, please take a look when you have time.

@otterc
Copy link
Contributor

otterc commented Dec 18, 2021

Will take a look on Monday. It's unusual that there isn't even a single chunk because that means that no blocks were merged. If there aren't any blocks that were merged then there shouldn't be a push-merged block for the iterator to fetch at all. In this case there isn't any need to fetch the meta. Seems like that there could be a bug somewhere else where empty mergeStatus for a partition is getting created.
cc. @Ngone51 @mridulm

@pan3793
Copy link
Member Author

pan3793 commented Dec 19, 2021

After the change, my 1T TPCDS passed one round, but failed in the second round, I'm not sure if it's related or just another issue. I'm a newbie in this area, I'll appreciate it if you can give me some advices.

21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
	at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@mridulm
Copy link
Contributor

mridulm commented Dec 19, 2021

+CC @Ngone51 Looks like there is an additional interaction between shuffle corruption diagnosis and push based shuffle.
The underlying issue is a fetch failure - which also needs to be investigated (but gets suppressed due to the assertion failure).

Failure to fetch push based shuffle chunks should have resulted in fallback to fetching the original shuffle blocks - needs to be investigated why it dropped down to diagnosis directly (CC @otterc )

@Ngone51
Copy link
Member

Ngone51 commented Dec 20, 2021

+CC @Ngone51 Looks like there is an additional interaction between shuffle corruption diagnosis and push based shuffle.

Yeah, we should skip diagnosis for push-based shuffle, created a ticket for it: SPARK-37695

@Ngone51
Copy link
Member

Ngone51 commented Dec 20, 2021

Failure to fetch push based shuffle chunks should have resulted in fallback to fetching the original shuffle blocks - needs to be investigated why it dropped down to diagnosis directly (CC @otterc )

It looks like the fetch to push-based chunks doesn't fail. However, it fails when consuming the fetched chunk data. In this case, we can only throw exception since the downstream RDDs could have already consumed partial data.

iterator.addToResultsQueue(PushMergedRemoteMetaFetchResult(shuffleId, shuffleMergeId,
reduceId, sizeMap((shuffleId, reduceId)), bitmaps, address))
} else {
logInfo(s"No available push-merged block for ($shuffleId, $shuffleMergeId," +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...I thought we only raise the merged status request when there are merged blocks. Because we ensured this when fetching merge status:

val remainingMapStatuses = if (mergeStatus != null && mergeStatus.totalSize > 0) {

But why bitmaps could be empty here? cc @mridulm @otterc

@pan3793
Copy link
Member Author

pan3793 commented Dec 20, 2021

Sounds like there is something wrong earlier than I dealt with, I will investigate in this direction, and use this change as a workaround until the issue has been addressed to unblock my push-based shuffle feature test.

@pan3793
Copy link
Member Author

pan3793 commented Dec 30, 2021

#32287 (comment)
NOT sure if it's related, but I do hang here several times. @Ngone51

@pan3793
Copy link
Member Author

pan3793 commented Jan 12, 2022

@otterc I reproduced this issue today and sent a email to you with logs and spark confs you requested here, highly suspect that SPARK-37675 and SPARK-37793 share the same root cause. Please let me know if any other things I can do.

@mridulm
Copy link
Contributor

mridulm commented Jan 18, 2022

To clarify, these logs are with a version of spark/shuffle service without modifications ?
Or were there any code changes made to them ? Thx.

@pan3793
Copy link
Member Author

pan3793 commented Jan 18, 2022

To clarify, these logs are with a version of spark/shuffle service without modifications ? Or were there any code changes made to them ? Thx.

Oops, I forgot to link the code pan3793@c38459d

@otterc
Copy link
Contributor

otterc commented Jan 19, 2022

@otterc I reproduced this issue today and sent a email to you with logs and spark confs you requested here, highly suspect that SPARK-37675 and SPARK-37793 share the same root cause. Please let me know if any other things I can do.

@pan3793 Would you be able to add these changes and rerun this test?

  1. Log the reduceId in the iterator for which the assertion fails. Changing the assertion to this will work:
    assert(numChunks > 0, s"zero chunks for $blockId")

  2. In the RemoteBlockPushResolver.finalizeShuffleMerge, add this condition for the partition mapTracker and reduceId to be added to the results:

         try {
            // This can throw IOException which will marks this shuffle partition as not merged.
            partition.finalizePartition();
            if (partition.mapTracker.getCardinality() > 0) { // needs to be added
              bitmaps.add(partition.mapTracker);
              reduceIds.add(partition.reduceId);
              sizes.add(partition.getLastChunkOffset());
            }
          } catch (IOException ioe) {
            logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
              msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
          } finally {
            partition.closeAllFilesAndDeleteIfNeeded(false);
          }

Please let me know if you can rerun with these changes and share the logs with me.
Thank you!

@pan3793
Copy link
Member Author

pan3793 commented Jan 20, 2022

Thanks @otterc, changed code as you suggested, pan3793@1c14eb9, and have sent logs to you via email, please take a look again.

@otterc
Copy link
Contributor

otterc commented Jan 20, 2022

Thanks @otterc, changed code as you suggested, pan3793@1c14eb9, and have sent logs to you via email, please take a look again.

Thanks @pan3793. The new logs, however, don't have the statement on the shuffle server which logs the partition that is finalized. I see that you removed this:

 logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}",
                    msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId,
                    partition.indexFile.getPos() / 8 - 1,
                    partition.metaFile.getPos(),
                    partition.lastChunkOffset);

I think that the partitions that the reducer is trying to read are not finalized by the shuffle server. This log statement would have shown that. I still don't know how the reducer is getting these merged block but at least we can rule out the shuffle service.
There is also a possibility that push-based shuffle has a conflict with AQE. Do you see the same issue when spark.sql.adaptive.coalescePartitions.enabled and spark.sql.adaptive.fetchShuffleBlocksInBatch are disabled?

@pan3793
Copy link
Member Author

pan3793 commented Jan 21, 2022

I think that the partitions that the reducer is trying to read are not finalized by the shuffle server. This log statement would have shown that. I still don't know how the reducer is getting these merged block but at least we can rule out the shuffle service.

Is there possible that shuffle service overwites the finalized merged blocks?

There is also a possibility that push-based shuffle has a conflict with AQE. Do you see the same issue when spark.sql.adaptive.coalescePartitions.enabled and spark.sql.adaptive.fetchShuffleBlocksInBatch are disabled?

I think fetchShuffleBlocksInBatch should not affect merged blocks, but anyway, let me try.

BTW, I will lose the access of this test cluster after 1.31, hopes we can find the root cause before then.

private def collectFetchRequests(
address: BlockManagerId,
blockInfos: Seq[(BlockId, Long, Int)],
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit = {
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[FetchBlockInfo]()
while (iterator.hasNext) {
val (blockId, size, mapIndex) = iterator.next()
curBlocks += FetchBlockInfo(blockId, size, mapIndex)
curRequestSize += size
blockId match {
// Either all blocks are push-merged blocks, shuffle chunks, or original blocks.
// Based on these types, we decide to do batch fetch and create FetchRequests with
// forMergedMetas set.
case ShuffleBlockChunkId(_, _, _, _) =>
if (curRequestSize >= targetRemoteRequestSize ||
curBlocks.size >= maxBlocksInFlightPerAddress) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, enableBatchFetch = false)
curRequestSize = curBlocks.map(_.size).sum
}
case ShuffleMergedBlockId(_, _, _) =>
if (curBlocks.size >= maxBlocksInFlightPerAddress) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, enableBatchFetch = false, forMergedMetas = true)
}
case _ =>
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, doBatchFetch)
curRequestSize = curBlocks.map(_.size).sum
}
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
val (enableBatchFetch, forMergedMetas) = {
curBlocks.head.blockId match {
case ShuffleBlockChunkId(_, _, _, _) => (false, false)
case ShuffleMergedBlockId(_, _, _) => (false, true)
case _ => (doBatchFetch, false)
}
}
createFetchRequests(curBlocks.toSeq, address, isLast = true, collectedRemoteRequests,
enableBatchFetch = enableBatchFetch, forMergedMetas = forMergedMetas)
}
}

@otterc
Copy link
Contributor

otterc commented Jan 22, 2022

@pan3793, @mridulm identified the issue on the server side. You are right that the service was overwriting the finalized merged blocks. The bug was introduced with SPARK-32923. Will update later about the bug.

@github-actions
Copy link

github-actions bot commented May 3, 2022

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label May 3, 2022
@github-actions github-actions bot closed this May 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants