Skip to content

Commit

Permalink
[Spark] Implement incremental clustering using ZCUBE approach
Browse files Browse the repository at this point in the history
## Description
Implement incremental Liquid clustering according to the deisgn
[doc](https://docs.google.com/document/d/1FWR3odjOw4v4-hjFy_hVaNdxHVs4WuK1asfB6M6XEMw/edit?usp=sharing).

This implementation uses ZCube based approach to achieve incremental
clustering. When a Zcube size is big enough, the zcube is sealed and the
next clustering won't re-cluster those files and so less write
amplification.

Key changes
Each clustered file is tagged with ZCUBE_ID to track which ZCUBE it
belongs to and the id is generated using UUID. Also anther tag
ZCUBE_ZORDER_BY is used to track the clustering columns. Each clustered
file has the clsuteringProvider populated with liquid.

## How was this patch tested?
new unit tests.

## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
dabao521 committed Mar 13, 2024
1 parent c046547 commit 4456a12
Show file tree
Hide file tree
Showing 12 changed files with 1,131 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ class OptimizeExecutor(
optimizeContext: DeltaOptimizeContext)
extends DeltaCommand with SQLMetricsReporting with Serializable {

/**
* In which mode the Optimize command is running. There are three valid modes:
* 1. Compaction
* 2. ZOrder
* 3. Clustering
*/
private val optimizeStrategy =
OptimizeTableStrategy(sparkSession, txn.snapshot, optimizeContext, zOrderByColumns)

/** Timestamp to use in [[FileAction]] */
private val operationTimestamp = new SystemClock().getTimeMillis()

Expand All @@ -242,15 +251,6 @@ class OptimizeExecutor(
}
}

private lazy val curve: String = {
if (zOrderByColumns.nonEmpty) {
"zorder"
} else {
assert(isClusteredTable)
"hilbert"
}
}

def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
val minFileSize = optimizeContext.minFileSize.getOrElse(
Expand All @@ -269,7 +269,7 @@ class OptimizeExecutor(
}
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)
val jobs = groupFilesIntoBins(partitionsToCompact)

val maxThreads =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS)
Expand Down Expand Up @@ -320,18 +320,7 @@ class OptimizeExecutor(
numDeletionVectorRowsRemoved = removedDVs.map(_.cardinality).sum))
}

if (isMultiDimClustering) {
val inputFileStats =
ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum)
optimizeStats.zOrderStats = Some(ZOrderStats(
strategyName = "all", // means process all files in a partition
inputCubeFiles = ZOrderFileStats(0, 0),
inputOtherFiles = inputFileStats,
inputNumCubes = 0,
mergedFiles = inputFileStats,
// There will one z-cube for each partition
numOutputCubes = optimizeStats.numPartitionsOptimized))
}
optimizeStrategy.updateOptimizeStats(optimizeStats, removedFiles, jobs)

return Seq(Row(txn.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics))
}
Expand Down Expand Up @@ -365,27 +354,31 @@ class OptimizeExecutor(
*
* @param partitionsToCompact List of files to compact group by partition.
* Partition is defined by the partition values (partCol -> partValue)
* @param maxTargetFileSize Max size (in bytes) of the compaction output file.
* @return Sequence of bins. Each bin contains one or more files from the same
* partition and targeted for one output file.
*/
private def groupFilesIntoBins(
partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])],
maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])] = {
partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])])
: Seq[(Map[String, String], Seq[AddFile])] = {
val maxBinSize = optimizeStrategy.maxBinSize
partitionsToCompact.flatMap {
case (partition, files) =>
val bins = new ArrayBuffer[Seq[AddFile]]()

val currentBin = new ArrayBuffer[AddFile]()
var currentBinSize = 0L

files.sortBy(_.size).foreach { file =>
val preparedFiles = optimizeStrategy.prepareFilesPerPartition(files)
preparedFiles.foreach { file =>
// Generally, a bin is a group of existing files, whose total size does not exceed the
// desired maxFileSize. They will be coalesced into a single output file.
// However, if isMultiDimClustering = true, all files in a partition will be read by the
// same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize
// will be produced. See below.
if (file.size + currentBinSize > maxTargetFileSize && !isMultiDimClustering) {
// desired maxBinSize. The output file size depends on the mode:
// 1. Compaction: Files in a bin will be coalesced into a single output file.
// 2. ZOrder: all files in a partition will be read by the
// same job, the data will be range-partitioned and
// numFiles = totalFileSize / maxFileSize will be produced.
// 3. Clustering: Files in a bin belongs to one ZCUBE, the data will be
// range-partitioned and numFiles = totalFileSize / maxFileSize.
if (file.size + currentBinSize > maxBinSize) {
bins += currentBin.toVector
currentBin.clear()
currentBin += file
Expand Down Expand Up @@ -431,7 +424,7 @@ class OptimizeExecutor(
input,
approxNumFiles,
clusteringColumns,
curve)
optimizeStrategy.curve)
} else {
val useRepartition = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_REPARTITION_ENABLED)
Expand All @@ -450,13 +443,9 @@ class OptimizeExecutor(
sparkSession.sparkContext.getLocalProperty(SPARK_JOB_GROUP_ID),
description)

val binInfo = optimizeStrategy.initNewBin
val addFiles = txn.writeFiles(repartitionDF, None, isOptimize = true, Nil).collect {
case a: AddFile =>
(if (isClusteredTable) {
a.copy(clusteringProvider = Some(ClusteredTableUtils.clusteringProvider))
} else {
a
}).copy(dataChange = false)
case a: AddFile => optimizeStrategy.tagAddFile(a, binInfo)
case other =>
throw new IllegalStateException(
s"Unexpected action $other with type ${other.getClass}. File compaction job output" +
Expand Down

0 comments on commit 4456a12

Please sign in to comment.