/
FileStreamSinkLog.scala
118 lines (103 loc) · 4.28 KB
/
FileStreamSinkLog.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
/**
* The status of a file outputted by [[FileStreamSink]]. A file is visible only if it appears in
* the sink log and its action is not "delete".
*
* @param path the file path.
* @param size the file size.
* @param isDir whether this file is a directory.
* @param modificationTime the file last modification time.
* @param blockReplication the block replication.
* @param blockSize the block size.
* @param action the file action. Must be either "add" or "delete".
*/
case class SinkFileStatus(
path: String,
size: Long,
isDir: Boolean,
modificationTime: Long,
blockReplication: Int,
blockSize: Long,
action: String) {
def toFileStatus: FileStatus = {
new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, new Path(path))
}
}
object SinkFileStatus {
def apply(f: FileStatus): SinkFileStatus = {
SinkFileStatus(
path = f.getPath.toUri.toString,
size = f.getLen,
isDir = f.isDirectory,
modificationTime = f.getModificationTime,
blockReplication = f.getReplication,
blockSize = f.getBlockSize,
action = FileStreamSinkLog.ADD_ACTION)
}
}
/**
* A special log for [[FileStreamSink]]. It will write one log file for each batch. The first line
* of the log file is the version number, and there are multiple JSON lines following. Each JSON
* line is a JSON format of [[SinkFileStatus]].
*
* As reading from many small files is usually pretty slow, [[FileStreamSinkLog]] will compact log
* files every "spark.sql.sink.file.log.compactLen" batches into a big file. When doing a
* compaction, it will read all old log files and merge them with the new batch. During the
* compaction, it will also delete the files that are deleted (marked by [[SinkFileStatus.action]]).
* When the reader uses `allFiles` to list all files, this method only returns the visible files
* (drops the deleted files).
*/
class FileStreamSinkLog(
metadataLogVersion: String,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
private implicit val formats = Serialization.formats(NoTypeHints)
protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
protected override val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompactInterval
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
protected override def serializeData(data: SinkFileStatus): String = {
write(data)
}
protected override def deserializeData(encodedString: String): SinkFileStatus = {
read[SinkFileStatus](encodedString)
}
override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
if (deletedFiles.isEmpty) {
logs
} else {
logs.filter(f => !deletedFiles.contains(f.path))
}
}
}
object FileStreamSinkLog {
val VERSION = "v1"
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
}