Skip to content

Commit 0e05caf

Browse files
prakharjain09vkorukanti
authored andcommitted
[Spark] Read support for log compactions
This PR adds read support for log compactions described here: #2072 Closes #2073 GitOrigin-RevId: 6f4a09c3fa09c303cdeb747c382cedcfda5a2a4c
1 parent 2d92266 commit 0e05caf

File tree

4 files changed

+605
-28
lines changed

4 files changed

+605
-28
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala

Lines changed: 130 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
1919
import java.io.FileNotFoundException
2020
import java.util.Objects
2121

22+
import scala.collection.mutable
2223
import scala.concurrent.{ExecutionContext, Future}
2324
import scala.util.control.NonFatal
2425

@@ -29,6 +30,7 @@ import org.apache.spark.sql.delta.actions.Metadata
2930
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3031
import org.apache.spark.sql.delta.util.DeltaThreadPool
3132
import org.apache.spark.sql.delta.util.FileNames._
33+
import org.apache.spark.sql.delta.util.JsonUtils
3234
import com.fasterxml.jackson.annotation.JsonIgnore
3335
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
3436

@@ -105,19 +107,24 @@ trait SnapshotManagement { self: DeltaLog =>
105107
* @return Some array of files found (possibly empty, if no usable commit files are present), or
106108
* None if the listing returned no files at all.
107109
*/
108-
protected final def listDeltaAndCheckpointFiles(
110+
protected final def listDeltaCompactedDeltaAndCheckpointFiles(
109111
startVersion: Long,
110-
versionToLoad: Option[Long]): Option[Array[FileStatus]] =
112+
versionToLoad: Option[Long],
113+
includeMinorCompactions: Boolean): Option[Array[FileStatus]] =
111114
recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
112115
listFromOrNone(startVersion).map { _
113-
// Pick up all checkpoint and delta files
114-
.filter { file => isDeltaCommitOrCheckpointFile(file.getPath) }
115-
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
116-
// such files, hence we drop them so that we never pick up such checkpoints.
117-
.filterNot { file => isCheckpointFile(file) && file.getLen == 0 }
116+
.collect {
117+
case DeltaFile(f, fileVersion) =>
118+
(f, fileVersion)
119+
case CompactedDeltaFile(f, startVersion, endVersion)
120+
if includeMinorCompactions && versionToLoad.forall(endVersion <= _) =>
121+
(f, startVersion)
122+
case CheckpointFile(f, fileVersion) if f.getLen > 0 =>
123+
(f, fileVersion)
124+
}
118125
// take files until the version we want to load
119-
.takeWhile(f => versionToLoad.forall(getFileVersion(f) <= _))
120-
.toArray
126+
.takeWhile { case (_, fileVersion) => versionToLoad.forall(fileVersion <= _) }
127+
.map(_._1).toArray
121128
}
122129
}
123130

@@ -146,12 +153,18 @@ trait SnapshotManagement { self: DeltaLog =>
146153
// if that is -1, list from version 0L
147154
val lastCheckpointVersion = getCheckpointVersion(lastCheckpointInfo, oldCheckpointProviderOpt)
148155
val listingStartVersion = Math.max(0L, lastCheckpointVersion)
149-
val newFiles = listDeltaAndCheckpointFiles(listingStartVersion, versionToLoad)
156+
val includeMinorCompactions =
157+
spark.conf.get(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS)
158+
val newFiles = listDeltaCompactedDeltaAndCheckpointFiles(
159+
startVersion = listingStartVersion,
160+
versionToLoad = versionToLoad,
161+
includeMinorCompactions = includeMinorCompactions)
150162
getLogSegmentForVersion(
151163
versionToLoad,
152164
newFiles,
153165
oldCheckpointProviderOpt = oldCheckpointProviderOpt,
154-
lastCheckpointInfo = lastCheckpointInfo)
166+
lastCheckpointInfo = lastCheckpointInfo
167+
)
155168
}
156169

157170
/**
@@ -185,6 +198,7 @@ trait SnapshotManagement { self: DeltaLog =>
185198
selectedDeltas.headOption.foreach { headDelta =>
186199
val headDeltaVersion = deltaVersion(headDelta)
187200
val lastDeltaVersion = selectedDeltas.last match {
201+
case CompactedDeltaFile(_, _, endV) => endV
188202
case DeltaFile(_, v) => v
189203
}
190204

@@ -195,6 +209,7 @@ trait SnapshotManagement { self: DeltaLog =>
195209
unsafeVolatileMetadata) // metadata is best-effort only
196210
}
197211
val deltaVersions = selectedDeltas.flatMap {
212+
case CompactedDeltaFile(_, startV, endV) => (startV to endV)
198213
case DeltaFile(_, v) => Seq(v)
199214
}
200215
verifyDeltaVersions(spark, deltaVersions, Some(checkpointVersion + 1), versionToLoad)
@@ -216,13 +231,13 @@ trait SnapshotManagement { self: DeltaLog =>
216231
.getOrElse {
217232
// No files found even when listing from 0 => empty directory => table does not exist yet.
218233
if (lastCheckpointVersion < 0) return None
219-
// [SC-95011] FIXME(ryan.johnson): We always write the commit and checkpoint files
220-
// before updating _last_checkpoint. If the listing came up empty, then we either
221-
// encountered a list-after-put inconsistency in the underlying log store, or somebody
222-
// corrupted the table by deleting files. Either way, we can't safely continue.
234+
// We always write the commit and checkpoint files before updating _last_checkpoint.
235+
// If the listing came up empty, then we either encountered a list-after-put
236+
// inconsistency in the underlying log store, or somebody corrupted the table by
237+
// deleting files. Either way, we can't safely continue.
223238
//
224239
// For now, we preserve existing behavior by returning Array.empty, which will trigger a
225-
// recursive call to [[getLogSegmentForVersion]] below (same as before the refactor).
240+
// recursive call to [[getLogSegmentForVersion]] below.
226241
Array.empty[FileStatus]
227242
}
228243

@@ -235,7 +250,8 @@ trait SnapshotManagement { self: DeltaLog =>
235250
// singleton, so try listing from the first version
236251
return getLogSegmentForVersion(versionToLoad = versionToLoad)
237252
}
238-
val (checkpoints, deltas) = newFiles.partition(isCheckpointFile)
253+
val (checkpoints, deltasAndCompactedDeltas) = newFiles.partition(isCheckpointFile)
254+
val (deltas, compactedDeltas) = deltasAndCompactedDeltas.partition(isDeltaFile)
239255
// Find the latest checkpoint in the listing that is not older than the versionToLoad
240256
val checkpointFiles = checkpoints.map(f => CheckpointInstance(f.getPath))
241257
val newCheckpoint = getLatestCompleteCheckpointFromList(checkpointFiles, versionToLoad)
@@ -246,9 +262,8 @@ trait SnapshotManagement { self: DeltaLog =>
246262
// `startCheckpoint` was given but no checkpoint found on delta log. This means that the
247263
// last checkpoint we thought should exist (the `_last_checkpoint` file) no longer exists.
248264
// Try to look up another valid checkpoint and create `LogSegment` from it.
249-
//
250-
// [SC-95011] FIXME(ryan.johnson): Something has gone very wrong if the checkpoint doesn't
251-
// exist at all. This code should only handle rejected incomplete checkpoints.
265+
// This case can arise if the user deleted the table (all commits and checkpoints) but
266+
// left the _last_checkpoint intact.
252267
recordDeltaEvent(this, "delta.checkpoint.error.partial")
253268
val snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last))
254269
getLogSegmentWithMaxExclusiveCheckpointVersion(snapshotVersion, lastCheckpointVersion)
@@ -268,6 +283,10 @@ trait SnapshotManagement { self: DeltaLog =>
268283
deltaVersion(file) > newCheckpointVersion
269284
}
270285

286+
// Here we validate that we are able to create a valid LogSegment by just using commit deltas
287+
// and without considering minor-compacted deltas. We want to fail early if log is messed up
288+
// i.e. some commit deltas are missing (although compacted-deltas are present).
289+
validateDeltaVersions(deltasAfterCheckpoint, newCheckpointVersion, versionToLoad)
271290

272291
val newVersion =
273292
deltasAfterCheckpoint.lastOption.map(deltaVersion).getOrElse(newCheckpoint.get.version)
@@ -288,17 +307,96 @@ trait SnapshotManagement { self: DeltaLog =>
288307
}
289308
val lastCommitTimestamp = deltas.last.getModificationTime
290309

291-
validateDeltaVersions(deltasAfterCheckpoint, newCheckpointVersion, versionToLoad)
310+
val deltasAndCompactedDeltasForLogSegment = useCompactedDeltasForLogSegment(
311+
deltasAndCompactedDeltas,
312+
deltasAfterCheckpoint,
313+
latestCommitVersion = newVersion,
314+
checkpointVersionToUse = newCheckpointVersion)
315+
316+
validateDeltaVersions(
317+
deltasAndCompactedDeltasForLogSegment, newCheckpointVersion, versionToLoad)
292318

293319
Some(LogSegment(
294320
logPath,
295321
newVersion,
296-
deltasAfterCheckpoint,
322+
deltasAndCompactedDeltasForLogSegment,
297323
checkpointProviderOpt,
298324
lastCommitTimestamp))
299325
}
300326
}
301327

328+
/**
329+
* @param deltasAndCompactedDeltas - all deltas or compacted deltas which could be used
330+
* @param deltasAfterCheckpoint - deltas after the last checkpoint file
331+
* @param latestCommitVersion - commit version for which we are trying to create Snapshot for
332+
* @param checkpointVersionToUse - underlying checkpoint version to use in Snapshot, -1 if no
333+
* checkpoint is used.
334+
* @return Returns a list of deltas/compacted-deltas which can be used to construct the
335+
* [[LogSegment]] instead of `deltasAfterCheckpoint`.
336+
*/
337+
protected def useCompactedDeltasForLogSegment(
338+
deltasAndCompactedDeltas: Seq[FileStatus],
339+
deltasAfterCheckpoint: Array[FileStatus],
340+
latestCommitVersion: Long,
341+
checkpointVersionToUse: Long): Array[FileStatus] = {
342+
343+
val selectedDeltas = mutable.ArrayBuffer.empty[FileStatus]
344+
var highestVersionSeen = checkpointVersionToUse
345+
val commitRangeCovered = mutable.ArrayBuffer.empty[Long]
346+
// track if there is at least 1 compacted delta in `deltasAndCompactedDeltas`
347+
var hasCompactedDeltas = false
348+
for (file <- deltasAndCompactedDeltas) {
349+
val (startVersion, endVersion) = file match {
350+
case CompactedDeltaFile(_, startVersion, endVersion) =>
351+
hasCompactedDeltas = true
352+
(startVersion, endVersion)
353+
case DeltaFile(_, version) =>
354+
(version, version)
355+
}
356+
357+
// select the compacted delta if the startVersion doesn't straddle `highestVersionSeen` and
358+
// the endVersion doesn't cross the latestCommitVersion.
359+
if (highestVersionSeen < startVersion && endVersion <= latestCommitVersion) {
360+
commitRangeCovered.appendAll(startVersion to endVersion)
361+
selectedDeltas += file
362+
highestVersionSeen = endVersion
363+
}
364+
}
365+
// If there are no compacted deltas in the `deltasAndCompactedDeltas` list, return from this
366+
// method.
367+
if (!hasCompactedDeltas) return deltasAfterCheckpoint
368+
// Validation-1: Commits represented by `compactedDeltasToUse` should be unique and there must
369+
// not be any duplicates.
370+
val coveredCommits = commitRangeCovered.toSet
371+
val hasDuplicates = (commitRangeCovered.size != coveredCommits.size)
372+
373+
// Validation-2: All commits from (CheckpointVersion + 1) to latestCommitVersion should be
374+
// either represented by compacted delta or by the delta.
375+
val requiredCommits = (checkpointVersionToUse + 1) to latestCommitVersion
376+
val missingCommits = requiredCommits.toSet -- coveredCommits
377+
if (!hasDuplicates && missingCommits.isEmpty) return selectedDeltas.toArray
378+
379+
// If the above check failed, that means the compacted delta validation failed.
380+
// Just record that event and return just the deltas (deltasAfterCheckpoint).
381+
val eventData = Map(
382+
"deltasAndCompactedDeltas" -> deltasAndCompactedDeltas.map(_.getPath.getName),
383+
"deltasAfterCheckpoint" -> deltasAfterCheckpoint.map(_.getPath.getName),
384+
"latestCommitVersion" -> latestCommitVersion,
385+
"checkpointVersionToUse" -> checkpointVersionToUse,
386+
"hasDuplicates" -> hasDuplicates,
387+
"missingCommits" -> missingCommits
388+
)
389+
recordDeltaEvent(
390+
deltaLog = this,
391+
opType = "delta.getLogSegmentForVersion.compactedDeltaValidationFailed",
392+
data = eventData)
393+
if (Utils.isTesting) {
394+
assert(false, s"Validation around Compacted deltas failed while creating Snapshot. " +
395+
s"[${JsonUtils.toJson(eventData)}]")
396+
}
397+
deltasAfterCheckpoint
398+
}
399+
302400
/**
303401
* Load the Snapshot for this Delta table at initialization. This method uses the `lastCheckpoint`
304402
* file as a hint on where to start listing the transaction log directory. If the _delta_log
@@ -398,10 +496,11 @@ trait SnapshotManagement { self: DeltaLog =>
398496
if (upperBoundVersion > 0) findLastCompleteCheckpointBefore(upperBoundVersion) else None
399497
previousCp match {
400498
case Some(cp) =>
401-
val filesSinceCheckpointVersion = listDeltaAndCheckpointFiles(
499+
val filesSinceCheckpointVersion = listDeltaCompactedDeltaAndCheckpointFiles(
402500
startVersion = cp.version,
403-
versionToLoad = Some(snapshotVersion))
404-
.getOrElse(Array.empty)
501+
versionToLoad = Some(snapshotVersion),
502+
includeMinorCompactions = false
503+
).getOrElse(Array.empty)
405504
val (checkpoints, deltas) = filesSinceCheckpointVersion.partition(isCheckpointFile)
406505
if (deltas.isEmpty) {
407506
// We cannot find any delta files. Returns None as we cannot construct a `LogSegment` only
@@ -436,8 +535,13 @@ trait SnapshotManagement { self: DeltaLog =>
436535
Some(checkpointProvider),
437536
deltas.last.getModificationTime))
438537
case None =>
538+
val listFromResult =
539+
listDeltaCompactedDeltaAndCheckpointFiles(
540+
startVersion = 0,
541+
versionToLoad = Some(snapshotVersion),
542+
includeMinorCompactions = false)
439543
val (deltas, deltaVersions) =
440-
listDeltaAndCheckpointFiles(startVersion = 0, versionToLoad = Some(snapshotVersion))
544+
listFromResult
441545
.getOrElse(Array.empty)
442546
.flatMap(DeltaFile.unapply(_))
443547
.unzip

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,6 +1302,13 @@ trait DeltaSQLConfBase {
13021302
.booleanConf
13031303
.createWithDefault(true)
13041304

1305+
val DELTALOG_MINOR_COMPACTION_USE_FOR_READS =
1306+
buildConf("deltaLog.minorCompaction.useForReads")
1307+
.doc("If true, minor compacted delta log files will be used for creating Snapshots")
1308+
.internal()
1309+
.booleanConf
1310+
.createWithDefault(true)
1311+
13051312
val ICEBERG_MAX_COMMITS_TO_CONVERT = buildConf("iceberg.maxPendingCommits")
13061313
.doc("""
13071314
|The maximum number of pending Delta commits to convert to Iceberg incrementally.

spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
2424
object FileNames {
2525

2626
val deltaFileRegex = raw"(\d+)\.json".r
27+
val compactedDeltaFileRegex = raw"(\d+).(\d+).compacted.json".r
2728
val checksumFileRegex = raw"(\d+)\.crc".r
2829
val checkpointFileRegex = raw"(\d+)\.checkpoint((\.\d+\.\d+)?\.parquet|\.[^.]+\.(json|parquet))".r
2930

3031
val deltaFilePattern = deltaFileRegex.pattern
32+
val compactedDeltaFilePattern = compactedDeltaFileRegex.pattern
3133
val checksumFilePattern = checksumFileRegex.pattern
3234
val checkpointFilePattern = checkpointFileRegex.pattern
3335

@@ -40,6 +42,14 @@ object FileNames {
4042
/** Returns the path to the checksum file for the given version. */
4143
def checksumFile(path: Path, version: Long): Path = new Path(path, f"$version%020d.crc")
4244

45+
/** Returns the path to the compacted delta file for the given version range. */
46+
def compactedDeltaFile(
47+
path: Path,
48+
fromVersion: Long,
49+
toVersion: Long): Path = {
50+
new Path(path, f"$fromVersion%020d.$toVersion%020d.compacted.json")
51+
}
52+
4353
/** Returns the version for the given delta path. */
4454
def deltaVersion(path: Path): Long = path.getName.split("\\.")(0).toLong
4555
def deltaVersion(file: FileStatus): Long = deltaVersion(file.getPath)
@@ -48,6 +58,12 @@ object FileNames {
4858
def checksumVersion(path: Path): Long = path.getName.stripSuffix(".crc").toLong
4959
def checksumVersion(file: FileStatus): Long = checksumVersion(file.getPath)
5060

61+
def compactedDeltaVersions(path: Path): (Long, Long) = {
62+
val parts = path.getName.split("\\.")
63+
(parts(0).toLong, parts(1).toLong)
64+
}
65+
def compactedDeltaVersions(file: FileStatus): (Long, Long) = compactedDeltaVersions(file.getPath)
66+
5167
/**
5268
* Returns the prefix of all delta log files for the given version.
5369
*
@@ -93,9 +109,23 @@ object FileNames {
93109
def isChecksumFile(path: Path): Boolean = checksumFilePattern.matcher(path.getName).matches()
94110
def isChecksumFile(file: FileStatus): Boolean = isChecksumFile(file.getPath)
95111

112+
def isCompactedDeltaFile(path: Path): Boolean =
113+
compactedDeltaFilePattern.matcher(path.getName).matches()
114+
def isCompactedDeltaFile(file: FileStatus): Boolean = isCompactedDeltaFile(file.getPath)
115+
96116
def checkpointVersion(path: Path): Long = path.getName.split("\\.")(0).toLong
97117
def checkpointVersion(file: FileStatus): Long = checkpointVersion(file.getPath)
98118

119+
object CompactedDeltaFile {
120+
def unapply(f: FileStatus): Option[(FileStatus, Long, Long)] =
121+
unapply(f.getPath).map { case (_, startVersion, endVersion) => (f, startVersion, endVersion) }
122+
def unapply(path: Path): Option[(Path, Long, Long)] = path.getName match {
123+
case compactedDeltaFileRegex(lo, hi) => Some(path, lo.toLong, hi.toLong)
124+
case _ => None
125+
}
126+
}
127+
128+
99129
/**
100130
* Get the version of the checkpoint, checksum or delta file. Returns None if an unexpected
101131
* file type is seen.
@@ -104,6 +134,7 @@ object FileNames {
104134
case DeltaFile(_, version) => Some(version)
105135
case ChecksumFile(_, version) => Some(version)
106136
case CheckpointFile(_, version) => Some(version)
137+
case CompactedDeltaFile(_, _, endVersion) => Some(endVersion)
107138
case _ => None
108139
}
109140

@@ -145,10 +176,9 @@ object FileNames {
145176
}
146177

147178
object FileType extends Enumeration {
148-
val DELTA, CHECKPOINT, CHECKSUM, OTHER = Value
179+
val DELTA, CHECKPOINT, CHECKSUM, COMPACTED_DELTA, OTHER = Value
149180
}
150181

151-
152182
/** File path for a new V2 Checkpoint Json file */
153183
def newV2CheckpointJsonFile(path: Path, version: Long): Path =
154184
new Path(path, f"$version%020d.checkpoint.${UUID.randomUUID.toString}.json")

0 commit comments

Comments
 (0)