Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Oct 20, 2025

What changes were proposed in this pull request?

This PR aims to fix FileStreamSinkSuite flakiness by using walkFileTree instead of walk.

Why are the changes needed?

Files.walk is flaky like the following when the directory has a race condition. walkFileTree has more robust error handling.

// there would be possible to have race condition:
// - some tasks complete while abortJob is being called
// we can't delete complete files for these tasks (it's OK since this is a best effort)
assert(outputFileNames.intersect(trackingFileNames).isEmpty,
"abortJob should clean up files reported as successful.")

[info] - cleanup complete but invalid output for aborted job *** FAILED *** (438 milliseconds)
     [info]   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: ***/spark-4c7ad10b-5848-45d7-ba43-dae4020ad011/output @#output/part-00007-e582f3e3-87e3-40fa-8269-7fac9b545775-c000.snappy.parquet
     [info]   at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
     [info]   at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
     [info]   at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298)
     [info]   at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
     [info]   at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
     [info]   at scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:480)
     [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
     [info]   at scala.collection.mutable.Growable.addAll(Growable.scala:61)
     [info]   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
     [info]   at scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405)
     [info]   at scala.collection.immutable.Set$.from(Set.scala:362)
     [info]   at scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469)
     [info]   at scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469)
     [info]   at scala.collection.AbstractIterator.toSet(Iterator.scala:1306)
     [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:537)

Does this PR introduce any user-facing change?

No, this is a test case change.

How was this patch tested?

Pass the CIs.

Was this patch authored or co-authored using generative AI tooling?

No.

override def visitFileFailed(file: Path, exc: IOException): FileVisitResult = {
exc match {
case _: NoSuchFileException =>
FileVisitResult.CONTINUE
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay to ignore the deleted files because those are supposed to be cleaned up in a success case.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Oct 20, 2025

The updated test passed.

FileStreamSinkV1Suite
...
[info] - cleanup complete but invalid output for aborted job (260 milliseconds)
...
FileStreamSinkV2Suite
...
[info] - cleanup complete but invalid output for aborted job (269 milliseconds)
...

@dongjoon-hyun
Copy link
Member Author

Thank you, @HyukjinKwon . Merged to master/4.0/3.5.

dongjoon-hyun added a commit that referenced this pull request Oct 21, 2025
…g `walkFileTree` instead of `walk`

### What changes were proposed in this pull request?

This PR aims to fix `FileStreamSinkSuite` flakiness by using `walkFileTree` instead of `walk`.

### Why are the changes needed?

`Files.walk` is flaky like the following when the directory has a race condition. `walkFileTree` has more robust error handling.

https://github.com/apache/spark/blob/2bb73fbdeb19f0a972786d3ea33d3263bf84ab66/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala#L543-L547

```
[info] - cleanup complete but invalid output for aborted job *** FAILED *** (438 milliseconds)
     [info]   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: ***/spark-4c7ad10b-5848-45d7-ba43-dae4020ad011/output #output/part-00007-e582f3e3-87e3-40fa-8269-7fac9b545775-c000.snappy.parquet
     [info]   at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
     [info]   at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
     [info]   at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298)
     [info]   at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
     [info]   at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
     [info]   at scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:480)
     [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
     [info]   at scala.collection.mutable.Growable.addAll(Growable.scala:61)
     [info]   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
     [info]   at scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405)
     [info]   at scala.collection.immutable.Set$.from(Set.scala:362)
     [info]   at scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469)
     [info]   at scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469)
     [info]   at scala.collection.AbstractIterator.toSet(Iterator.scala:1306)
     [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:537)
```

### Does this PR introduce _any_ user-facing change?

No, this is a test case change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52671 from dongjoon-hyun/SPARK-53961.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 8430dbf)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun added a commit that referenced this pull request Oct 21, 2025
…g `walkFileTree` instead of `walk`

This PR aims to fix `FileStreamSinkSuite` flakiness by using `walkFileTree` instead of `walk`.

`Files.walk` is flaky like the following when the directory has a race condition. `walkFileTree` has more robust error handling.

https://github.com/apache/spark/blob/2bb73fbdeb19f0a972786d3ea33d3263bf84ab66/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala#L543-L547

```
[info] - cleanup complete but invalid output for aborted job *** FAILED *** (438 milliseconds)
     [info]   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: ***/spark-4c7ad10b-5848-45d7-ba43-dae4020ad011/output #output/part-00007-e582f3e3-87e3-40fa-8269-7fac9b545775-c000.snappy.parquet
     [info]   at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
     [info]   at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
     [info]   at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298)
     [info]   at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
     [info]   at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
     [info]   at scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:480)
     [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
     [info]   at scala.collection.mutable.Growable.addAll(Growable.scala:61)
     [info]   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
     [info]   at scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405)
     [info]   at scala.collection.immutable.Set$.from(Set.scala:362)
     [info]   at scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469)
     [info]   at scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469)
     [info]   at scala.collection.AbstractIterator.toSet(Iterator.scala:1306)
     [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:537)
```

No, this is a test case change.

Pass the CIs.

No.

Closes #52671 from dongjoon-hyun/SPARK-53961.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 8430dbf)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun dongjoon-hyun deleted the SPARK-53961 branch October 21, 2025 03:36
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 14, 2025
…g `walkFileTree` instead of `walk`

### What changes were proposed in this pull request?

This PR aims to fix `FileStreamSinkSuite` flakiness by using `walkFileTree` instead of `walk`.

### Why are the changes needed?

`Files.walk` is flaky like the following when the directory has a race condition. `walkFileTree` has more robust error handling.

https://github.com/apache/spark/blob/5d8b4039b78b277fff709a0452423a16cefad20d/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala#L543-L547

```
[info] - cleanup complete but invalid output for aborted job *** FAILED *** (438 milliseconds)
     [info]   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: ***/spark-4c7ad10b-5848-45d7-ba43-dae4020ad011/output #output/part-00007-e582f3e3-87e3-40fa-8269-7fac9b545775-c000.snappy.parquet
     [info]   at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
     [info]   at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
     [info]   at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298)
     [info]   at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
     [info]   at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
     [info]   at scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:480)
     [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
     [info]   at scala.collection.mutable.Growable.addAll(Growable.scala:61)
     [info]   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
     [info]   at scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405)
     [info]   at scala.collection.immutable.Set$.from(Set.scala:362)
     [info]   at scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469)
     [info]   at scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469)
     [info]   at scala.collection.AbstractIterator.toSet(Iterator.scala:1306)
     [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:537)
```

### Does this PR introduce _any_ user-facing change?

No, this is a test case change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52671 from dongjoon-hyun/SPARK-53961.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit a26c3b7)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants