/
HDFSMetadataLog.scala
408 lines (359 loc) · 14.3 KB
/
HDFSMetadataLog.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.io._
import java.nio.charset.StandardCharsets
import java.util.{Collections, LinkedHashMap => JLinkedHashMap}
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.ArrayImplicits._
/**
* A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path`
* as the metadata storage.
*
* When writing a new batch, [[HDFSMetadataLog]] will firstly write to a temp file and then rename
* it to the final batch file. If the rename step fails, there must be multiple writers and only
* one of them will succeed and the others will fail.
*
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
* files in a directory always shows the latest files.
*/
class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
extends MetadataLog[T] with Logging {
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
/** Needed to serialize type T into JSON when using Jackson */
@scala.annotation.nowarn
private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
// Avoid serializing generic sequences, see SPARK-17372
require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
"Should not create a log with type Seq, use Arrays instead - see SPARK-17372")
val metadataPath = new Path(path)
protected val fileManager =
CheckpointFileManager.create(metadataPath, sparkSession.sessionState.newHadoopConf())
if (!fileManager.exists(metadataPath)) {
fileManager.mkdirs(metadataPath)
}
protected val metadataCacheEnabled: Boolean
= sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
/**
* Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
* when committing offsets, this cache will save some file system operations.
*/
protected[sql] val batchCache = Collections.synchronizedMap(new JLinkedHashMap[Long, T](2) {
override def removeEldestEntry(e: java.util.Map.Entry[Long, T]): Boolean = size > 2
})
/**
* A `PathFilter` to filter only batch files
*/
protected val batchFilesFilter = new PathFilter {
override def accept(path: Path): Boolean = isBatchFile(path)
}
protected def batchIdToPath(batchId: Long): Path = {
new Path(metadataPath, batchId.toString)
}
protected def pathToBatchId(path: Path) = {
path.getName.toLong
}
protected def isBatchFile(path: Path) = {
try {
path.getName.toLong
true
} catch {
case _: NumberFormatException => false
}
}
/**
* Serialize the metadata and write to the output stream. If this method is overridden in a
* subclass, the overriding method should not close the given output stream, as it will be closed
* in the caller.
*/
protected def serialize(metadata: T, out: OutputStream): Unit = {
Serialization.write(metadata, out)
}
/**
* Read and deserialize the metadata from input stream. If this method is overridden in a
* subclass, the overriding method should not close the given input stream, as it will be closed
* in the caller.
*/
protected def deserialize(in: InputStream): T = {
val reader = new InputStreamReader(in, StandardCharsets.UTF_8)
Serialization.read[T](reader)
}
/**
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
* metadata has already been stored, this method will return `false`.
*/
override def add(batchId: Long, metadata: T): Boolean = {
require(metadata != null, "'null' metadata cannot written to a metadata log")
val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) }
if (metadataCacheEnabled && res) batchCache.put(batchId, metadata)
res
}
override def get(batchId: Long): Option[T] = {
if (metadataCacheEnabled && batchCache.containsKey(batchId)) {
val metadata = batchCache.get(batchId)
assert(metadata != null)
return Some(metadata)
}
try {
applyFnToBatchByStream(batchId) { input => Some(deserialize(input)) }
} catch {
case fne: FileNotFoundException =>
logDebug(fne.getMessage)
None
}
}
/**
* Get the id of the previous batch from storage
* @param batchId get the previous batch id of this batch with batchId
* @return
*/
def getPrevBatchFromStorage(batchId: Long): Option[Long] = {
val batchFiles = listBatchesOnDisk
var prev: Option[Long] = None
for (file <- batchFiles.sorted) {
if (file >= batchId) {
return prev
}
prev = Some(file)
}
None
}
/**
* Apply provided function to each entry in the specific batch metadata log.
*
* Unlike get which will materialize all entries into memory, this method streamlines the process
* via READ-AND-PROCESS. This helps to avoid the memory issue on huge metadata log file.
*
* NOTE: This no longer fails early on corruption. The caller should handle the exception
* properly and make sure the logic is not affected by failing in the middle.
*/
def applyFnToBatchByStream[RET](
batchId: Long, skipExistingCheck: Boolean = false)(fn: InputStream => RET): RET = {
val batchMetadataFile = batchIdToPath(batchId)
if (skipExistingCheck || fileManager.exists(batchMetadataFile)) {
val input = fileManager.open(batchMetadataFile)
try {
fn(input)
} catch {
case ise: IllegalStateException =>
// re-throw the exception with the log file path added
throw new IllegalStateException(
s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
} finally {
IOUtils.closeQuietly(input)
}
} else {
throw QueryExecutionErrors.batchMetadataFileNotFoundError(batchMetadataFile)
}
}
protected def write(batchMetadataFile: Path,
fn: OutputStream => Unit): Unit = {
// Only write metadata when the batch has not yet been written
val output = fileManager.createAtomic(batchMetadataFile, overwriteIfPossible = false)
try {
fn(output)
output.close()
} catch {
case e: FileAlreadyExistsException =>
output.cancel()
// If next batch file already exists, then another concurrently running query has
// written it.
throw QueryExecutionErrors.multiStreamingQueriesUsingPathConcurrentlyError(path, e)
case e: Throwable =>
output.cancel()
throw e
}
}
/**
* Store the metadata for the specified batchId and return `true` if successful. This method
* fills the content of metadata via executing function. If the function throws an exception,
* writing will be automatically cancelled and this method will propagate the exception.
*
* If the batchId's metadata has already been stored, this method will return `false`.
*
* Writing the metadata is done by writing a batch to a temp file then rename it to the batch
* file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = {
val batchMetadataFile = batchIdToPath(batchId)
if ((metadataCacheEnabled && batchCache.containsKey(batchId))
|| fileManager.exists(batchMetadataFile)) {
false
} else {
write(batchMetadataFile, fn)
true
}
}
private def getExistingBatch(batchId: Long): T = {
val metadata = batchCache.get(batchId)
if (metadata == null) {
applyFnToBatchByStream(batchId, skipExistingCheck = true) { input => deserialize(input) }
} else {
metadata
}
}
override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
assert(startId.isEmpty || endId.isEmpty || startId.get <= endId.get)
val batchIds = listBatches.filter { batchId =>
(endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
}.sorted
HDFSMetadataLog.verifyBatchIds(batchIds.toImmutableArraySeq, startId, endId)
batchIds.map(batchId => (batchId, getExistingBatch(batchId)))
}
/** Return the latest batch id without reading the file. */
def getLatestBatchId(): Option[Long] = listBatches.sorted.lastOption
override def getLatest(): Option[(Long, T)] = {
listBatches.sorted.lastOption.map { batchId =>
logInfo(s"Getting latest batch $batchId")
(batchId, getExistingBatch(batchId))
}
}
/**
* Get an array of [FileStatus] referencing batch files.
* The array is sorted by most recent batch file first to
* oldest batch file.
*/
def getOrderedBatchFiles(): Array[FileStatus] = {
fileManager.list(metadataPath, batchFilesFilter)
.sortBy(f => pathToBatchId(f.getPath))
.reverse
}
private var lastPurgedBatchId: Long = -1L
/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
override def purge(thresholdBatchId: Long): Unit = {
val possibleTargetBatchIds = (lastPurgedBatchId + 1 until thresholdBatchId)
if (possibleTargetBatchIds.length <= 3) {
// avoid using list if we only need to purge at most 3 elements
possibleTargetBatchIds.foreach { batchId =>
val path = batchIdToPath(batchId)
fileManager.delete(path)
if (metadataCacheEnabled) batchCache.remove(batchId)
logTrace(s"Removed metadata log file: $path")
}
} else {
// using list to retrieve all elements
for (batchId <- listBatches if batchId < thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
if (metadataCacheEnabled) batchCache.remove(batchId)
logTrace(s"Removed metadata log file: $path")
}
}
lastPurgedBatchId = thresholdBatchId - 1
}
/**
* Removes all log entries later than thresholdBatchId (exclusive).
*/
def purgeAfter(thresholdBatchId: Long): Unit = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
for (batchId <- batchIds if batchId > thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
if (metadataCacheEnabled) batchCache.remove(batchId)
logTrace(s"Removed metadata log file: $path")
}
}
/** List the available batches on file system. */
protected def listBatches: Array[Long] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath)) ++
// Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
// elimiate the race condition.
batchCache.synchronized {
batchCache.keySet.asScala.toArray
}
logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", "))
if (batchIds.isEmpty) {
Array.empty
} else {
// Assume batch ids are continuous
(batchIds.min to batchIds.max).toArray
}
}
/**
* List the batches persisted to storage
* @return array of batches ids
*/
def listBatchesOnDisk: Array[Long] = {
fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath)).sorted
}
private[sql] def validateVersion(text: String, maxSupportedVersion: Int): Int =
MetadataVersionUtil.validateVersion(text, maxSupportedVersion)
}
object HDFSMetadataLog {
/**
* Verify if batchIds are continuous and between `startId` and `endId` (both inclusive and
* startId assumed to be <= endId).
*
* @param batchIds the sorted ids to verify.
* @param startId the start id. If it's set, batchIds should start with this id.
* @param endId the start id. If it's set, batchIds should end with this id.
*/
def verifyBatchIds(batchIds: Seq[Long], startId: Option[Long], endId: Option[Long]): Unit = {
// Verify that we can get all batches between `startId` and `endId`.
if (startId.isDefined || endId.isDefined) {
if (batchIds.isEmpty) {
throw new IllegalStateException(s"batch ${startId.orElse(endId).get} doesn't exist")
}
if (startId.isDefined) {
val minBatchId = batchIds.head
assert(minBatchId >= startId.get)
if (minBatchId != startId.get) {
val missingBatchIds = startId.get to minBatchId
throw new IllegalStateException(
s"batches (${missingBatchIds.mkString(", ")}) don't exist " +
s"(startId: $startId, endId: $endId)")
}
}
if (endId.isDefined) {
val maxBatchId = batchIds.last
assert(maxBatchId <= endId.get)
if (maxBatchId != endId.get) {
val missingBatchIds = maxBatchId to endId.get
throw new IllegalStateException(
s"batches (${missingBatchIds.mkString(", ")}) don't exist " +
s"(startId: $startId, endId: $endId)")
}
}
}
if (batchIds.nonEmpty) {
val minBatchId = batchIds.head
val maxBatchId = batchIds.last
val missingBatchIds = (minBatchId to maxBatchId).toSet -- batchIds
if (missingBatchIds.nonEmpty) {
throw new IllegalStateException(s"batches (${missingBatchIds.mkString(", ")}) " +
s"don't exist (startId: $startId, endId: $endId)")
}
}
}
}