/
ManifestFileCommitProtocol.scala
156 lines (133 loc) · 6.01 KB
/
ManifestFileCommitProtocol.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.io.IOException
import java.util.UUID
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in
* structured streaming.
*
* @param path path to write the final output to.
*/
class ManifestFileCommitProtocol(jobId: String, path: String)
extends FileCommitProtocol with Serializable with Logging {
// Track the list of files added by a task, only used on the executors.
@transient private var addedFiles: ArrayBuffer[String] = _
@transient private var fileLog: FileStreamSinkLog = _
private var batchId: Long = _
@transient private var pendingCommitFiles: ArrayBuffer[Path] = _
/**
* Sets up the manifest log output and the batch id for this job.
* Must be called before any other function.
*/
def setupManifestOptions(fileLog: FileStreamSinkLog, batchId: Long): Unit = {
this.fileLog = fileLog
this.batchId = batchId
}
override def setupJob(jobContext: JobContext): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this function")
pendingCommitFiles = new ArrayBuffer[Path]
}
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this function")
val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray
// We shouldn't remove the files if they're written to the metadata:
// `fileLog.add(batchId, fileStatuses)` could fail AFTER writing files to the metadata
// as well as there could be race
// so for the safety we clean up the list before calling anything incurs exception.
// The case is uncommon and we do best effort instead of guarantee, so the simplicity of
// logic here would be OK, and safe for dealing with unexpected situations.
pendingCommitFiles.clear()
if (fileLog.add(batchId, fileStatuses)) {
logInfo(s"Committed batch $batchId")
} else {
throw new IllegalStateException(s"Race while writing batch $batchId")
}
}
override def abortJob(jobContext: JobContext): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this function")
// Best effort cleanup of complete files from failed job.
// Since the file has UUID in its filename, we are safe to try deleting them
// as the file will not conflict with file with another attempt on the same task.
if (pendingCommitFiles.nonEmpty) {
pendingCommitFiles.foreach { path =>
try {
val fs = path.getFileSystem(jobContext.getConfiguration)
// this is to make sure the file can be seen from driver as well
if (fs.exists(path)) {
fs.delete(path, false)
}
} catch {
case e: IOException =>
logWarning(s"Fail to remove temporary file $path, continue removing next.", e)
}
}
pendingCommitFiles.clear()
}
}
override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
pendingCommitFiles ++= taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]]
.map(_.toFileStatus.getPath)
}
override def setupTask(taskContext: TaskAttemptContext): Unit = {
addedFiles = new ArrayBuffer[String]
}
override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
val uuid = UUID.randomUUID.toString
val filename = f"part-$split%05d-$uuid$ext"
val file = dir.map { d =>
new Path(new Path(path, d), filename).toString
}.getOrElse {
new Path(path, filename).toString
}
addedFiles += file
file
}
override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString)
}
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
if (addedFiles.nonEmpty) {
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
val statuses: Seq[SinkFileStatus] =
addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f)))).toSeq
new TaskCommitMessage(statuses)
} else {
new TaskCommitMessage(Seq.empty[SinkFileStatus])
}
}
override def abortTask(taskContext: TaskAttemptContext): Unit = {
// best effort cleanup of incomplete files
if (addedFiles.nonEmpty) {
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
addedFiles.foreach { file => fs.delete(new Path(file), false) }
}
}
}