Skip to content

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Jan 5, 2025

What changes were proposed in this pull request?

This PR adds protection against IOException (IOE) scenarios when reading the BasicFileAttributes of a file in the deleteRecursivelyUsingJavaIO method: it catches the IOE and returns null, and silently handles the scenario where fileAttributes is null in the subsequent logic.

Why are the changes needed?

When the inode itself does not exist, it is impossible to read its BasicFileAttributes, and an IOException (IOE) will be thrown, which caused the failure of the MacOS daily test:

- JobArtifactSet uses resources from SparkContext *** FAILED ***
  java.nio.file.NoSuchFileException: /Users/runner/work/spark/spark/core/target/tmp/spark-6a6b2d5d-1371-4801-a6c4-59dc9d69c2f2/userFiles-e450317a-136c-49ff-8099-9e8282c766b5/testFile661537940680128228.zip
  at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
  at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
  at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
  at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
  at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:171)
  at java.base/java.nio.file.Files.readAttributes(Files.java:1853)
  at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:130)
  at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:123)
  at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
  at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121)
  ...

#49347 aimed to fix the cleanup of symbolic links by moving the operation to read BasicFileAttributes before !file.exists to add a check for broken symbolic links. However, in Spark, there is a logic that first cleans up the potentially existing destination path before overwriting it. The target path being cleaned up may itself be a non-existent inode, such as:

val dest = new File(
root,
if (uri.getFragment != null) uri.getFragment else source.getName)
logInfo(
log"Unpacking an archive ${MDC(LogKeys.PATH, path)}" +
log" (${MDC(LogKeys.BYTE_SIZE, source.length)} bytes)" +
log" from ${MDC(LogKeys.SOURCE_PATH, source.getAbsolutePath)}" +
log" to ${MDC(LogKeys.DESTINATION_PATH, dest.getAbsolutePath)}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)

Therefore, additional protection is needed for this scenario to maintain compatibility with the old behavior.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

No

@LuciferYang LuciferYang marked this pull request as draft January 5, 2025 20:05
@github-actions github-actions bot added the INFRA label Jan 5, 2025
uses: ./.github/workflows/maven_test.yml
with:
java: 21
os: macos-15
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test with macos, will revert after check

*/
private static BasicFileAttributes readFileAttributes(File file) {
try {
return Files.readAttributes(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dongjoon-hyun

  1. If the inode itself does not exist, it will not be possible to read the BasicFileAttributes.
  2. If it is a corrupted symbolic link, the BasicFileAttributes can be read, but file.exists() will return false.

It seems that scenario 1 was not covered before. I found that the code has logic that attempts to delete a potentially existing target before writing.

For example:

val dest = new File(
root,
if (uri.getFragment != null) uri.getFragment else source.getName)
logInfo(
log"Unpacking an archive ${MDC(LogKeys.PATH, path)}" +
log" (${MDC(LogKeys.BYTE_SIZE, source.length)} bytes)" +
log" from ${MDC(LogKeys.SOURCE_PATH, source.getAbsolutePath)}" +
log" to ${MDC(LogKeys.DESTINATION_PATH, dest.getAbsolutePath)}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)

Let's test it on macOS first.

Copy link
Contributor Author

@LuciferYang LuciferYang Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's getting a bit late in my time zone, so I need to sleep for 4 hours first.

Let's check the new test results and decide on the next steps. Of course, if you think the risk is too high, we can also revert this patch first. @dongjoon-hyun Thanks ~

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can wait :)

@dongjoon-hyun
Copy link
Member

Thank you for taking a look at this. Please take a rest first and your time, @LuciferYang !

@LuciferYang
Copy link
Contributor Author

 Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches *** FAILED ***
  java.lang.AssertionError: assertion failed: Exception tree doesn't contain the expected exception with message: Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow.
org.scalatest.exceptions.TestFailedException: isPropagated was false Partition [topic-40, 1] metadata not propagated after timeout
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
	at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$waitUntilMetadataIsPropagated$1(KafkaTestUtils.scala:615)
	at org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184)
	at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196)
	at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
	at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
	at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:457)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:614)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:379)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:378)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:351)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:348)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:54)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:47)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:869)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:110)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:104)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:790)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:866)
	at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:866)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:387)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:357)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39)
	at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37)
	at org.apache.spark.sql.execution.streaming.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:59)
	at org.apache.spark.sql.execution.streaming.MultiBatchExecutor.execute(TriggerExecutor.scala:64)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:790)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)
  at scala.Predef$.assert(Predef.scala:279)
  at org.apache.spark.TestUtils$.assertExceptionMsg(TestUtils.scala:200)
  at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$9(KafkaMicroBatchSourceSuite.scala:373)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
  at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
  at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
  at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
  at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
  at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
  ...

This seems to be a known flaky test.

@github-actions github-actions bot removed the INFRA label Jan 6, 2025
@dongjoon-hyun
Copy link
Member

It seems that PR builder failure looks reasonable. It means this PR is unable to reproduce the MacOS Daily CI failures. WDTY?

https://github.com/apache/spark/actions/runs/12622568770

Screenshot 2025-01-05 at 21 45 57

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Jan 6, 2025

@dongjoon-hyun Sorry, I didn't understand your meaning.

image

The test was added previously to confirm whether the this pr can fix the macos daily test

@LuciferYang
Copy link
Contributor Author

So, it's expected that this PR doesn't reproduce the previous issue, which indicates that it has fixed that problem.

@LuciferYang LuciferYang marked this pull request as ready for review January 6, 2025 06:04
@LuciferYang
Copy link
Contributor Author

LuciferYang commented Jan 6, 2025

@LuciferYang
Copy link
Contributor Author

@HyukjinKwon this one is ready now

@LuciferYang
Copy link
Contributor Author

 Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches *** FAILED ***
  java.lang.AssertionError: assertion failed: Exception tree doesn't contain the expected exception with message: Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow.
org.scalatest.exceptions.TestFailedException: isPropagated was false Partition [topic-40, 1] metadata not propagated after timeout
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
	at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$waitUntilMetadataIsPropagated$1(KafkaTestUtils.scala:615)
	at org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184)
	at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196)
	at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
	at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
	at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:457)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:614)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:379)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:378)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:351)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:348)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:54)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:47)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:869)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:110)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:104)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:790)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:866)
	at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:866)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:387)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:357)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39)
	at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37)
	at org.apache.spark.sql.execution.streaming.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:59)
	at org.apache.spark.sql.execution.streaming.MultiBatchExecutor.execute(TriggerExecutor.scala:64)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:790)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)
  at scala.Predef$.assert(Predef.scala:279)
  at org.apache.spark.TestUtils$.assertExceptionMsg(TestUtils.scala:200)
  at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$9(KafkaMicroBatchSourceSuite.scala:373)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
  at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
  at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
  at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
  at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
  at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
  ...

This seems to be a known flaky test.

I manually tested this flaky test locally, and it passed.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try this after merging. Thank you, @LuciferYang .

@dongjoon-hyun
Copy link
Member

Merged to master~

@dongjoon-hyun
Copy link
Member

After merging, MacOS CI is triggered here.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Jan 7, 2025

After merging, MacOS CI is triggered here.

The test case Query with Trigger.AvailableNow should throw error when offset(s) in planned topic partitions got unavailable during subsequent batches seems quite unstable. I checked back and found that a similar test case, Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches, failed frequently about two weeks ago. These two test cases are executed consecutively, and the error messages when they fail appear to be similar.

Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches:

image

Query with Trigger.AvailableNow should throw error when offset(s) in planned topic partitions got unavailable during subsequent batches

image

Need to investigate further.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Jan 7, 2025

Regarding the failed case mentioned above, let's synchronize on the progress of the investigation.

When the failure occurs, the following code server.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic, partition) always matches the case _ => false, indicating the failure of that case.

private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.dataPlaneRequestProcessor.metadataCache
.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
FetchRequest.isValidBrokerId(partitionState.leader) &&
!partitionState.replicas.isEmpty
case _ =>
false
}
eventually(timeout(1.minute)) {
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}

I'm not very familiar with this part of the code, so I need to further investigate it. Do you have any suggestions on this? @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

To @LuciferYang , you can stop investigating on this. IIUC, the situation is the same before your PR.

Kafka test suites have been unstable in MacOS environment.

Thank you for your precious time.

@LuciferYang
Copy link
Contributor Author

To @LuciferYang , you can stop investigating on this. IIUC, the situation is the same before your PR.

Kafka test suites have been unstable in MacOS environment.

Thank you for your precious time.

OK ~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants