Skip to content

Commit

Permalink
[SPARK-27254][SS] Cleanup complete but invalid output files in Manife…
Browse files Browse the repository at this point in the history
…stFileCommitProtocol if job is aborted

## What changes were proposed in this pull request?

SPARK-27210 enables ManifestFileCommitProtocol to clean up incomplete output files in task level if task is aborted.

This patch extends the area of cleaning up, proposes ManifestFileCommitProtocol to clean up complete but invalid output files in job level if job aborts. Please note that this works as 'best-effort', not kind of guarantee, as we have in HadoopMapReduceCommitProtocol.

## How was this patch tested?

Added UT.

Closes #24186 from HeartSaVioR/SPARK-27254.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
  • Loading branch information
2 people authored and zsxwing committed Sep 27, 2019
1 parent 233c214 commit d72f398
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 2 deletions.
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.streaming

import java.io.IOException
import java.util.UUID

import scala.collection.mutable.ArrayBuffer
Expand All @@ -43,6 +44,8 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
@transient private var fileLog: FileStreamSinkLog = _
private var batchId: Long = _

@transient private var pendingCommitFiles: ArrayBuffer[Path] = _

/**
* Sets up the manifest log output and the batch id for this job.
* Must be called before any other function.
Expand All @@ -54,13 +57,21 @@ class ManifestFileCommitProtocol(jobId: String, path: String)

override def setupJob(jobContext: JobContext): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this function")
// Do nothing
pendingCommitFiles = new ArrayBuffer[Path]
}

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this function")
val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray

// We shouldn't remove the files if they're written to the metadata:
// `fileLog.add(batchId, fileStatuses)` could fail AFTER writing files to the metadata
// as well as there could be race
// so for the safety we clean up the list before calling anything incurs exception.
// The case is uncommon and we do best effort instead of guarantee, so the simplicity of
// logic here would be OK, and safe for dealing with unexpected situations.
pendingCommitFiles.clear()

if (fileLog.add(batchId, fileStatuses)) {
logInfo(s"Committed batch $batchId")
} else {
Expand All @@ -70,7 +81,29 @@ class ManifestFileCommitProtocol(jobId: String, path: String)

override def abortJob(jobContext: JobContext): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this function")
// Do nothing
// Best effort cleanup of complete files from failed job.
// Since the file has UUID in its filename, we are safe to try deleting them
// as the file will not conflict with file with another attempt on the same task.
if (pendingCommitFiles.nonEmpty) {
pendingCommitFiles.foreach { path =>
try {
val fs = path.getFileSystem(jobContext.getConfiguration)
// this is to make sure the file can be seen from driver as well
if (fs.exists(path)) {
fs.delete(path, false)
}
} catch {
case e: IOException =>
logWarning(s"Fail to remove temporary file $path, continue removing next.", e)
}
}
pendingCommitFiles.clear()
}
}

override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
pendingCommitFiles ++= taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]]
.map(_.toFileStatus.getPath)
}

override def setupTask(taskContext: TaskAttemptContext): Unit = {
Expand Down
Expand Up @@ -22,10 +22,13 @@ import java.nio.file.Files
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.JobContext

import org.apache.spark.SparkConf
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.execution.DataSourceScanExec
Expand Down Expand Up @@ -473,6 +476,77 @@ abstract class FileStreamSinkSuite extends StreamTest {
assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned up.")
}
}

testQuietly("cleanup complete but invalid output for aborted job") {
withSQLConf(("spark.sql.streaming.commitProtocolClass",
classOf[PendingCommitFilesTrackingManifestFileCommitProtocol].getCanonicalName)) {
withTempDir { tempDir =>
val checkpointDir = new File(tempDir, "chk")
val outputDir = new File(tempDir, "output @#output")
val inputData = MemoryStream[Int]
inputData.addData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val q = inputData.toDS()
.repartition(10)
.map { value =>
// we intend task failure after some tasks succeeds
if (value == 5) {
// put some delay to let other task commits before this task fails
Thread.sleep(100)
value / 0
} else {
value
}
}
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("parquet")
.start(outputDir.getCanonicalPath)

intercept[StreamingQueryException] {
try {
q.processAllAvailable()
} finally {
q.stop()
}
}

import PendingCommitFilesTrackingManifestFileCommitProtocol._
val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala
.filter(_.toString.endsWith(".parquet"))
.map(_.getFileName.toString)
.toSet
val trackingFileNames = tracking.map(new Path(_).getName).toSet

// there would be possible to have race condition:
// - some tasks complete while abortJob is being called
// we can't delete complete files for these tasks (it's OK since this is a best effort)
assert(outputFileNames.intersect(trackingFileNames).isEmpty,
"abortJob should clean up files reported as successful.")
}
}
}
}

object PendingCommitFilesTrackingManifestFileCommitProtocol {
val tracking: ArrayBuffer[String] = new ArrayBuffer[String]()

def cleanPendingCommitFiles(): Unit = tracking.clear()
def addPendingCommitFiles(paths: Seq[String]): Unit = tracking ++= paths
}

class PendingCommitFilesTrackingManifestFileCommitProtocol(jobId: String, path: String)
extends ManifestFileCommitProtocol(jobId, path) {
import PendingCommitFilesTrackingManifestFileCommitProtocol._

override def setupJob(jobContext: JobContext): Unit = {
super.setupJob(jobContext)
cleanPendingCommitFiles()
}

override def onTaskCommit(taskCommit: FileCommitProtocol.TaskCommitMessage): Unit = {
super.onTaskCommit(taskCommit)
addPendingCommitFiles(taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]].map(_.path))
}
}

class FileStreamSinkV1Suite extends FileStreamSinkSuite {
Expand Down

0 comments on commit d72f398

Please sign in to comment.