Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming

import java.io.{File, IOException}
import java.nio.file.{Files, Paths}
import java.nio.file.attribute.BasicFileAttributes
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -534,10 +535,25 @@ abstract class FileStreamSinkSuite extends StreamTest {
}

import PendingCommitFilesTrackingManifestFileCommitProtocol._
val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala
.filter(_.toString.endsWith(".parquet"))
.map(_.getFileName.toString)
.toSet
import java.nio.file.{Path, _}
val outputFileNames = scala.collection.mutable.Set.empty[String]
Files.walkFileTree(
outputDir.toPath,
new SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
val fileName = file.getFileName.toString
if (fileName.endsWith(".parquet")) outputFileNames += fileName
FileVisitResult.CONTINUE
}
override def visitFileFailed(file: Path, exc: IOException): FileVisitResult = {
exc match {
case _: NoSuchFileException =>
FileVisitResult.CONTINUE
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay to ignore the deleted files because those are supposed to be cleaned up in a success case.

case _ =>
FileVisitResult.TERMINATE
}
}
})
val trackingFileNames = tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet

// there would be possible to have race condition:
Expand Down