diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index b0aa71a7e1b3..4c06a0109e34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming import java.io.{File, IOException} import java.nio.file.{Files, Paths} +import java.nio.file.attribute.BasicFileAttributes import java.util.Locale import scala.collection.mutable.ArrayBuffer @@ -534,10 +535,25 @@ abstract class FileStreamSinkSuite extends StreamTest { } import PendingCommitFilesTrackingManifestFileCommitProtocol._ - val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala - .filter(_.toString.endsWith(".parquet")) - .map(_.getFileName.toString) - .toSet + import java.nio.file.{Path, _} + val outputFileNames = scala.collection.mutable.Set.empty[String] + Files.walkFileTree( + outputDir.toPath, + new SimpleFileVisitor[Path] { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + val fileName = file.getFileName.toString + if (fileName.endsWith(".parquet")) outputFileNames += fileName + FileVisitResult.CONTINUE + } + override def visitFileFailed(file: Path, exc: IOException): FileVisitResult = { + exc match { + case _: NoSuchFileException => + FileVisitResult.CONTINUE + case _ => + FileVisitResult.TERMINATE + } + } + }) val trackingFileNames = tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet // there would be possible to have race condition: