Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError #35076

Closed
wants to merge 1 commit into from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Dec 31, 2021

What changes were proposed in this pull request?

When enable push-based shuffle, there is a chance that task hang at

59  Executor task launch worker for task 424.0 in stage 753.0 (TID 106778)
WAITING	Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1660371198})
  sun.misc.Unsafe.park(Native Method)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
  java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
  org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:756)
  org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
  org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
  scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
  scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
  scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
  org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.sort_addToSorter_0$(Unknown Source)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
  org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Unknown Source)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_1$(Unknown Source)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_0$(Unknown Source)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
  org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
  scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
  org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  org.apache.spark.scheduler.Task.run(Task.scala:136)
  org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
  org.apache.spark.executor.Executor$TaskRunner$$Lambda$518/852390142.apply(Unknown Source)
  org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  java.lang.Thread.run(Thread.java:748)

And org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:756) is

while (result == null) {
  ...
  result = results.take() // line 756
  ...
}

After some investigations, found that the last FetchResult put into result is PushMergedLocalMetaFetchResult, and there is a chance that bufs is empty, will cause no SuccessFetchResult be added to results, and thread hang if no other FetchResult is put into results.

while (result == null) {
  ...
  result = results.take()
  ...

  result match {
    case r @ SuccessFetchResult(blockId, mapIndex, address, size, buf, isNetworkReqDone) =>
      ...
      case PushMergedLocalMetaFetchResult(
        shuffleId, shuffleMergeId, reduceId, bitmaps, localDirs) =>
        val shuffleBlockId = ShuffleMergedBlockId(shuffleId, shuffleMergeId, reduceId)
        try {
          val bufs: Seq[ManagedBuffer] = blockManager.getLocalMergedBlockData(shuffleBlockId,
            localDirs)
          // THERE IS A CHANCE THAT bufs.isEmpty!
          ...
          bufs.zipWithIndex.foreach { case (buf, chunkId) =>
            buf.retain()
            val shuffleChunkId = ShuffleBlockChunkId(shuffleId, shuffleMergeId, reduceId,
              chunkId)
            pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
            results.put(SuccessFetchResult(shuffleChunkId, SHUFFLE_PUSH_MAP_ID,
              pushBasedFetchHelper.localShuffleMergerBlockMgrId, buf.size(), buf,
              isNetworkReqDone = false))
          }
        } catch {
          case e: Exception =>
            pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
              shuffleBlockId, pushBasedFetchHelper.localShuffleMergerBlockMgrId)
        }
        result = null
    ...
  }
}

Why are the changes needed?

Fallback to fetch original blocks when noLocalMergedBlockDataError to avoid task hang.

Does this PR introduce any user-facing change?

Bug fix, to make push-based shuffle more stable.

How was this patch tested?

Pass 1T TPC-DS tests

@github-actions github-actions bot added the CORE label Dec 31, 2021
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mridulm
Copy link
Contributor

mridulm commented Dec 31, 2021

+CC @otterc

@otterc
Copy link
Contributor

otterc commented Jan 4, 2022

After some investigations, found that the last FetchResult put into result is PushMergedLocalMetaFetchResult, and there is a chance that bufs is empty, will cause no SuccessFetchResult be added to results, and thread hang if no other FetchResult is put into results.

I don't think that bufs should ever be empty. If bufs is empty, it just means that there were no blocks that got merged and in that case there shouldn't even be push-merged block (local or remote). @pan3793 could you please let me know how to reproduce this bug? Is there a particular query that you are running with which you see this?

@pan3793
Copy link
Member Author

pan3793 commented Jan 4, 2022

No particular query, but easy to reproduce when run all 1T TPCDS queries.

@pan3793
Copy link
Member Author

pan3793 commented Jan 5, 2022

@otterc I agree with you that bufs should not be empty in design, and #34934 also does.

I also suspect there are some bugs or concurrence issues in code, and add some assertions, but unfortunately, nothing was found.

Besides those 2 issues, I also met the shuffle data corruption issues frequently.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
        ...

Both hardware(disk) issue and network issue may cause shuffle data corruption, and due to the lack of checksum mechanism of push-based shuffle, there is a chance we pass the corrupt data to xxSerializer layer, then cause exception and fail the task.

So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.

With this patch and #34934, the data corruption is the only critical issue(I mean can fail the job) in our dozen rounds of 1T TPC-DS test, and I think add the checksum should solve that issue.

@pan3793
Copy link
Member Author

pan3793 commented Jan 5, 2022

Update:

After f6128a6, I run 3 rounds of 1T TPC-DS using master branch's code, don't see neither this issue nor the issue mentioned in #34934, but the following issue occurs frequently, it cause stage retry rather than fail the job, and usually the stage can success in retry.

FetchFailed(null, shuffleId=15, mapIndex=-1, mapId=-1, reduceId=838, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 15 partition 838
	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1619)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$7(MapOutputTracker.scala:1555)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4(MapOutputTracker.scala:1554)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4$adapted(MapOutputTracker.scala:1535)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1535)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1230)
	at org.apache.spark.MapOutputTrackerWorker.getPushBasedShuffleMapSizesByExecutorId(MapOutputTracker.scala:1204)
	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:136)
	at org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
	at org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
	at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:208)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1475)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@pan3793
Copy link
Member Author

pan3793 commented Jan 6, 2022

Update:
After f6128a6, this issue and the issue mentioned in #34934 still be there, when I turn spark.shuffle.push.minShuffleSizeToWait to zero(default is 500m), the issues happen frequently again.

@otterc
Copy link
Contributor

otterc commented Jan 6, 2022

@pan3793 I will try to reproduce this issue and debug. It will take me couple of days. Thanks for debugging and updating with your findings. I will let you know what I find.

@pan3793
Copy link
Member Author

pan3793 commented Jan 7, 2022

Hi @otterc I got more information for this issue.

Add assertion and debug log in RemoteBlockPushResolver(ESS side)

public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
    ...
    for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
        synchronized (partition) {
          try {
            // This can throw IOException which will marks this shuffle partition as not merged.
            partition.finalizePartition();
            bitmaps.add(partition.mapTracker);
            reduceIds.add(partition.reduceId);
            sizes.add(partition.getLastChunkOffset());
          } catch (IOException ioe) {
            logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
              msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
          } finally {
            partition.closeAllFilesAndDeleteIfNeeded(false);
          }
        }
+       assert partition.dataFile.length() == partition.lastChunkOffset;
+       assert partition.indexFile.file.length() == partition.indexFile.getPos();
+       assert partition.metaFile.file.length() == partition.metaFile.getPos();
+       logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}",
+              msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId,
+              partition.indexFile.getPos() / 8 - 1,
+              partition.metaFile.getPos(),
+              partition.lastChunkOffset);
     }
      mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
        bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
        Longs.toArray(sizes));
    }
    ...
}

Add assertion and debug log in IndexShuffleBlockResolver(Reducer side)

override def getMergedBlockData(
    blockId: ShuffleMergedBlockId,
    dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
  val indexFile =
    getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId,
      blockId.reduceId, dirs)
  val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
    blockId.shuffleMergeId, blockId.reduceId, dirs)
  val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
    blockId.shuffleMergeId, blockId.reduceId, dirs)
  // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
  val size = indexFile.length.toInt
  val offsets = Utils.tryWithResource {
    new DataInputStream(Files.newInputStream(indexFile.toPath))
  } { dis =>
    val buffer = ByteBuffer.allocate(size)
    dis.readFully(buffer.array)
    buffer.asLongBuffer
  }
  // Number of chunks is number of indexes - 1
  val numChunks = size / 8 - 1
+ if (numChunks == 0) {
+   val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
+   val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
+   val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
+   logError(s"$blockId chunk_size is 0, " +
+      s"index_file is $indexFile, backup to $indexBackupPath" +
+      s"data_file is $dataFile, backup to $dataBackupPath" +
+      s"meta_file is $metaFile, backup to $metaBackupPath")
+   Files.copy(indexFile.toPath, indexBackupPath)
+   Files.copy(dataFile.toPath, dataBackupPath)
+   Files.copy(metaFile.toPath, metaBackupPath)
+   assert(false)
  }
  for (index <- 0 until numChunks) yield {
    new FileSegmentManagedBuffer(transportConf, dataFile,
      offsets.get(index),
      offsets.get(index + 1) - offsets.get(index))
  }
}

Then I run TPCDS several rounds and reproduce the exception.

Assertion failed in reduce task side.

01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
	at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)

ESS logs

2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157

Reduce task backup merged shuffle files

root@beta-spark4:/tmp# ls -l shuffleMerged_application_1640143179334_0148_126_0_4877*
-rw-r--r-- 1 root root 16036 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.data
-rw-r--r-- 1 root root     8 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.index
-rw-r--r-- 1 root root     0 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.meta

So, the ESS and reduce task running on same machine, and ESS closed the 'data', 'index', 'meta' files and reported the there size as chunk_size=1, meta_length=18, data_length=157, these metadata also return to driver and pass to reduce task, but when reduce task read the file from disk, the data is not match!

@mridulm
Copy link
Contributor

mridulm commented Jan 8, 2022

One possibility I was thinking of was if we calling finalizePartition before any chunks have been fully written out for a reducer.

Can you test with this change ?

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index d0eb4aed65..0d3a3c7448 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -575,17 +575,23 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       List<Long> sizes = new ArrayList<>(shuffleMergePartitions.size());
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
         synchronized (partition) {
+          boolean shouldDelete = false;
           try {
             // This can throw IOException which will marks this shuffle partition as not merged.
             partition.finalizePartition();
-            bitmaps.add(partition.mapTracker);
-            reduceIds.add(partition.reduceId);
-            sizes.add(partition.getLastChunkOffset());
+            long size = partition.getLastChunkOffset();
+            if (size > 0) {
+              bitmaps.add(partition.mapTracker);
+              reduceIds.add(partition.reduceId);
+              sizes.add(size);
+            } else {
+              shouldDelete = true;
+            }
           } catch (IOException ioe) {
             logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
               msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
-            partition.closeAllFilesAndDeleteIfNeeded(false);
+            partition.closeAllFilesAndDeleteIfNeeded(shouldDelete);
           }
         }
       }

+CC @otterc

@pan3793
Copy link
Member Author

pan3793 commented Jan 9, 2022

Sorry, I don't get your point @mridulm

The ESS log indicate that partition.getLastChunkOffset() is 157, how does your change to solve the issue you pointed?

2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157

@pan3793
Copy link
Member Author

pan3793 commented Jan 9, 2022

After reading and debugging the push-based shuffle code, I don't know if I understand it correctly, and have some questions, will appreciate it if you can give me some feedbacks @mridulm @otterc

  1. On ESS side, there may be multiple streams request to write one shuffle partition, I saw some variables declared without transient volatile, does netty ensure to handle them in the same thread?
  2. The ESS writes 3 files for a merged partition, data, index, meta, and maintains each committed file position in-memory variables. When data arrives, locks partitionInfo, and writes files ordered by data, index, meta from the committed position, if all writing success, update the committed file position, if any IOException occurs, the committed file position will keep previous values, then release the partitionInfo lock. Thus, the committed status should always be consistent. Finally, truncate files in committed positions before reporting merged status to DAGScheduler. So if ESS reported a merged status to DAGScheduler, the final files should always be consistent with each other and the merged status. And we can trust the committed data of file in anytime. Do I understand it correctly?
  3. For performance, ESS does not call flush of each file writing, if write does not throw IOE, ESS treats the writing is succeeded, and finally call partition.closeAllFilesAndDeleteIfNeeded(false) in #finalizeShuffleMerge, but #closeAllFilesAndDeleteIfNeeded will swallow any IOE which may cause the file inconsistent with the merged status?
  4. Does file.e.getChannel().truncate(file.getPos()) always success if no IOE throw? I saw it will return null in some conditions(NOT familiar with file system)
  5. A basic question about the OS file system. If process A writes and closes a file without any IOE, and gets the file length is len, does OS ensure another process B always reads the latest file content and gets the same len?

@mridulm
Copy link
Contributor

mridulm commented Jan 9, 2022

I had written up my comment before your log messages @pan3793.
What I had detailed is a potential bug - whether that is impacting this specific issue needs to be seen, though looks unlikely.

The discrepancy between data file size and data length seems to indicate some other issue here - though I cant make much out from the details provided unfortunately: particularly the file sizes are extremely confusing observation.
I will let @otterc reproduce.

@mridulm
Copy link
Contributor

mridulm commented Jan 9, 2022

To respond to your queries @pan3793 (pls feel free to elaborate @otterc):

  1. On ESS side, there may be multiple streams request to write one shuffle partition, I saw some variables declared without transient, does netty ensure to handle them in the same thread?

transient applies to serialization/deserialization - did you mean volatile in this context instead ?
Having said that, all state is modified with the AppShufflePartitionInfo locked - so would be within that critical section.

  1. The ESS writes 3 files for a merged partition, data, index, meta, and maintains each committed file position in-memory variables. When data arrives, locks partitionInfo, and writes files ordered by data, index, meta from the committed position, if all writing success, update the committed file position, if any IOException occurs, the committed file position will keep previous values, then release the partitionInfo lock. Thus, the committed status should always be consistent. Finally, truncate files in committed positions before reporting merged status to DAGScheduler. So if ESS reported a merged status to DAGScheduler, the final files should always be consistent with each other and the merged status. And we can trust the committed data of file in anytime. Do I understand it correctly?

Yes. In addition, any exceptions during write/etc would trigger a failure, and would reset back to previous 'good' state.

  1. For performance, ESS does not call flush of each file writing, if write does not throw IOE, ESS treats the writing is succeeded, and finally call partition.closeAllFilesAndDeleteIfNeeded(false) in #finalizeShuffleMerge, but #closeAllFilesAndDeleteIfNeeded will swallow any IOE which may cause the file inconsistent with the merged status?

The IOException being thrown in that method is when we are unable to close the stream - in this case, it is a close of the fd.
While possible in theory, usually it would point to other more severe issues outside of what spark can deal with.
But you are right, it does log and ignore failures if close fails.

  1. Does file.e.getChannel().truncate(file.getPos()) always success if no IOE throw? I saw it will return null in some conditions(NOT familiar with file system)

The truncate is actually a best case effort to clean up excess disk space usage.
If there is an ongoing write, and we are finalizing - the excess data from write is not relevant and wont be consumed - and so truncate.
It also makes things more clear when debugging (the file sizes should match the metadata we know).

  1. A basic question about the OS file system. If process A writes and closes a file without any IOE, and gets the file length is len, does OS ensure another process B always reads the latest file content and gets the same len?

Yes, unless there is some other interleaving modifications to that file (or some OS/fs/driver bugs, but I am discounting them for the time being !).

@pan3793
Copy link
Member Author

pan3793 commented Jan 10, 2022

@mridulm thank you for your response!

did you mean volatile in this context instead?
Having said that, all state is modified with the AppShufflePartitionInfo locked - so would be within that critical section.

Oops, sorry, I mean volatile. Another question here, the synchronized(appInfo) ensures the visibility of appInfo itself, but what about the member variables without volatile of appInfo?

The IOException being thrown in that method is when we are unable to close the stream - in this case, it is a close of the fd.
While possible in theory, usually it would point to other more severe issues outside of what spark can deal with.
But you are right, it does log and ignore failures if close fails.

Then I think if #closeAllFilesAndDeleteIfNeeded throws IOE, we need to mark this partition merge failed.

The truncate is actually a best case effort to clean up excess disk space usage.

Then it has a chance that the actual file size is greater than metadata, i.e. because of a hardware issue, the disk becomes read-only when doing truncate(in this case it will throw IOE?). But since we can always trust the content before the 'good' position, for the case that file length greater than the 'good' position still can be treated as good merged data?

@otterc
Copy link
Contributor

otterc commented Jan 11, 2022

Does file.e.getChannel().truncate(file.getPos()) always success if no IOE throw? I saw it will return null in some conditions(NOT familiar with file system)

We do depend on truncate to ensure that the data in the merged file is consistent with the metadata. If there is some partial shuffle block that has been written to the merged file but it has not been committed yet (metadata not updated), then truncate will remove that data. If there is a failure during truncation then The FileChannel.truncate throws exceptions, otherwise it returns the FileChannel. The javadoc for FileChannel.truncate indicates the same as well as the implementation FileChannelImp.truncate().

@pan3793
Copy link
Member Author

pan3793 commented Jan 11, 2022

@otterc Thanks for reply. I misunderstanded the code at first, if any truncate failed, the metadata of that partition will NOT be added to the mergeStatuses, then mergeStatuses should always exactly be consistent with the files on disk.

@otterc
Copy link
Contributor

otterc commented Jan 11, 2022

@pan3793 I have been trying to reproduce this issue by running different tpch queries but wasn't able to reproduce the issue. I am running the code in the master branch. Is it possible for you to share the entire application log and the shuffle service logs with me? Some other things that can be useful for me to reproduce this problem

  • Values of all the other spark configurations?
  • Do you see any other exceptions in the shuffle service logs before this happens?
  • Do you see any stage re-attempts?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 22, 2022
@github-actions github-actions bot closed this Apr 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants