Skip to content

Commit

Permalink
Use inventory reservoir as source for all files and dirs
Browse files Browse the repository at this point in the history
- Currently, users have large tables with daily/hourly partitions for many years, among all these partitions only recent ones are subjected to change due to job reruns, corrections, and late arriving events.

- When Vacuum is run on these tables, the listing of files is performed on all the partitions and it runs for several hours/days. This duration grows as tables grow and vacuum becomes a major overhead for customers especially when they have hundreds or thousands of such delta tables. File system scan takes the most amount of time in Vacuum operation for large tables, mostly due to the parallelism achievable and API throttling on the object stores.

- This change provides a way for users to pass a reservoir of files generated externally (eg: from inventory reports of cloud stores) as a delta table or as a spark SQL query (having a predefined schema). The vacuum operation when provided with such a reservoir data frame will skip the listing operation and use it as a source of all files in the storage.

"Resolves #1691".

- Unit Testing  (` build/sbt 'testOnly org.apache.spark.sql.delta.DeltaVacuumSuite'`)

yes, the MR accepts an optional method to pass inventory.

`VACUUM table_name [USING INVENTORY <reservoir-delta-table>] [RETAIN num HOURS] [DRY RUN]` `VACUUM table_name [USING INVENTORY <reservoir-query>] [RETAIN num HOURS] [DRY RUN]`

eg:  `VACUUM test_db.table using inventory select * from reservoir_table RETAIN 168 HOURS dry run`

Closes #2257

Co-authored-by: Arun Ravi M V <arunravi.mv@grabtaxi.com>
Signed-off-by: Bart Samwel <bart.samwel@databricks.com>
GitOrigin-RevId: 2bc824e524c677dd5f3a7ed787762df60c3b6d86
  • Loading branch information
Arun Ravi M V authored and tdas committed Feb 26, 2024
1 parent 4c8a442 commit 7d41fb7
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 32 deletions.
13 changes: 12 additions & 1 deletion spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ singleStatement
// If you add keywords here that should not be reserved, add them to 'nonReserved' list.
statement
: VACUUM (path=STRING | table=qualifiedName)
(USING INVENTORY (inventoryTable=qualifiedName | LEFT_PAREN inventoryQuery=subQuery RIGHT_PAREN))?
(RETAIN number HOURS)? (DRY RUN)? #vacuumTable
| (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName) #describeDeltaDetail
| GENERATE modeName=identifier FOR TABLE table=qualifiedName #generate
Expand Down Expand Up @@ -214,6 +215,14 @@ predicateToken
: .+?
;

// We don't have an expression rule in our grammar here, so we just grab the tokens and defer
// parsing them to later. Although this is the same as `exprToken`, `predicateToken`, we have to re-define it to
// workaround an ANTLR issue (https://github.com/delta-io/delta/issues/1205). Should we remove this after
// https://github.com/delta-io/delta/pull/1800
subQuery
: .+?
;

// We don't have an expression rule in our grammar here, so we just grab the tokens and defer
// parsing them to later.
exprToken
Expand All @@ -223,7 +232,7 @@ exprToken
// Add keywords here so that people's queries don't break if they have a column name as one of
// these tokens
nonReserved
: VACUUM | RETAIN | HOURS | DRY | RUN
: VACUUM | USING | INVENTORY | RETAIN | HOURS | DRY | RUN
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
Expand Down Expand Up @@ -266,6 +275,7 @@ HISTORY: 'HISTORY';
HOURS: 'HOURS';
ICEBERG_COMPAT_VERSION: 'ICEBERG_COMPAT_VERSION';
IF: 'IF';
INVENTORY: 'INVENTORY';
LEFT_PAREN: '(';
LIMIT: 'LIMIT';
LOCATION: 'LOCATION';
Expand Down Expand Up @@ -296,6 +306,7 @@ TO: 'TO';
TRUE: 'TRUE';
UNIFORM: 'UNIFORM';
UPGRADE: 'UPGRADE';
USING: 'USING';
VACUUM: 'VACUUM';
VERSION: 'VERSION';
WHERE: 'WHERE';
Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,12 @@
],
"sqlState" : "22006"
},
"DELTA_INVALID_INVENTORY_SCHEMA" : {
"message" : [
"The schema for the specified INVENTORY does not contain all of the required fields. Required fields are: <expectedSchema>"
],
"sqlState" : "42000"
},
"DELTA_INVALID_ISOLATION_LEVEL" : {
"message" : [
"invalid isolation level '<isolationLevel>'"
Expand Down
10 changes: 6 additions & 4 deletions spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,12 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
*/
override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) {
VacuumTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.number).map(_.getText.toDouble),
ctx.RUN != null)
path = Option(ctx.path).map(string),
table = Option(ctx.table).map(visitTableIdentifier),
inventoryTable = Option(ctx.inventoryTable).map(visitTableIdentifier),
inventoryQuery = Option(ctx.inventoryQuery).map(extractRawText),
horizonHours = Option(ctx.number).map(_.getText.toDouble),
dryRun = ctx.RUN != null)
}

/** Provides a list of unresolved attributes for multi dimensional clustering. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedTable
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils, UnresolvedDeltaPathOrIdentifier}
import org.apache.spark.sql.delta.commands.DeltaCommand
import org.apache.spark.sql.delta.commands.VacuumCommand
Expand All @@ -30,12 +32,16 @@ import org.apache.spark.sql.types.StringType
/**
* The `vacuum` command implementation for Spark SQL. Example SQL:
* {{{
* VACUUM ('/path/to/dir' | delta.`/path/to/dir`) [RETAIN number HOURS] [DRY RUN];
* VACUUM ('/path/to/dir' | delta.`/path/to/dir`)
* [USING INVENTORY (delta.`/path/to/dir`| ( sub_query ))]
* [RETAIN number HOURS] [DRY RUN];
* }}}
*/
case class VacuumTableCommand(
override val child: LogicalPlan,
horizonHours: Option[Double],
inventoryTable: Option[LogicalPlan],
inventoryQuery: Option[String],
dryRun: Boolean) extends RunnableCommand with UnaryNode with DeltaCommand {

override val output: Seq[Attribute] =
Expand All @@ -53,17 +59,25 @@ case class VacuumTableCommand(
"VACUUM",
DeltaTableIdentifier(path = Some(deltaTable.path.toString)))
}
VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours).collect()
val inventory = inventoryTable.map(sparkSession.sessionState.analyzer.execute)
.map(p => Some(getDeltaTable(p, "VACUUM").toDf(sparkSession)))
.getOrElse(inventoryQuery.map(sparkSession.sql))
VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours,
inventory).collect()
}
}

object VacuumTableCommand {
def apply(
path: Option[String],
table: Option[TableIdentifier],
inventoryTable: Option[TableIdentifier],
inventoryQuery: Option[String],
horizonHours: Option[Double],
dryRun: Boolean): VacuumTableCommand = {
val child = UnresolvedDeltaPathOrIdentifier(path, table, "VACUUM")
VacuumTableCommand(child, horizonHours, dryRun)
val unresolvedInventoryTable = inventoryTable.map(rt =>
UnresolvedTable(rt.nameParts, "VACUUM", relationTypeMismatchHint = None))
VacuumTableCommand(child, horizonHours, unresolvedInventoryTable, inventoryQuery, dryRun)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,12 @@ trait DeltaErrorsBase
errorClass = "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAME",
messageParameters = Array(name))
}

def invalidInventorySchema(expectedSchema: String): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_INVALID_INVENTORY_SCHEMA",
messageParameters = Array(expectedSchema)
)
}
def invalidIsolationLevelException(s: String): Throwable = {
new DeltaIllegalArgumentException(
errorClass = "DELTA_INVALID_ISOLATION_LEVEL",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ package org.apache.spark.sql.delta.commands
import java.net.URI
import java.util.Date
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand All @@ -31,12 +29,12 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.functions.{col, count, sum}
import org.apache.spark.sql.functions.{col, count, lit, replace, startswith, substr, sum}
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

/**
Expand All @@ -51,6 +49,21 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}
object VacuumCommand extends VacuumCommandImpl with Serializable {

case class FileNameAndSize(path: String, length: Long)

/**
* path : fully qualified uri
* length: size in bytes
* isDir: boolean indicating if it is a directory
* modificationTime: file update time in milliseconds
*/
val INVENTORY_SCHEMA = StructType(
Seq(
StructField("path", StringType),
StructField("length", LongType),
StructField("isDir", BooleanType),
StructField("modificationTime", LongType)
))

/**
* Additional check on retention duration to prevent people from shooting themselves in the foot.
*/
Expand Down Expand Up @@ -125,19 +138,55 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
}.toDF("path")
}

def getFilesFromInventory(basePath: String,
partitionColumns: Seq[String],
inventory: DataFrame): Dataset[SerializableFileStatus] = {
implicit val fileNameAndSizeEncoder: Encoder[SerializableFileStatus] =
org.apache.spark.sql.Encoders.product[SerializableFileStatus]

// filter out required fields from provided inventory DF
val inventorySchema = StructType(
inventory.schema.fields.filter(f => INVENTORY_SCHEMA.fields.map(_.name).contains(f.name))
)
if (inventorySchema != INVENTORY_SCHEMA) {
throw DeltaErrors.invalidInventorySchema(INVENTORY_SCHEMA.treeString)
}

inventory
.filter(startswith(col("path"), lit(s"$basePath/")))
.select(
substr(col("path"), lit(basePath.length + 2)).as("path"),
col("length"), col("isDir"), col("modificationTime")
)
.flatMap {
row =>
val path = row.getString(0)
if(!DeltaTableUtils.isHiddenDirectory(partitionColumns, path)) {
Seq(SerializableFileStatus(path,
row.getLong(1), row.getBoolean(2), row.getLong(3)))
} else {
None
}
}
}

/**
* Clears all untracked files and folders within this table. First lists all the files and
* directories in the table, and gets the relative paths with respect to the base of the
* table. Then it gets the list of all tracked files for this table, which may or may not
* be within the table base path, and gets the relative paths of all the tracked files with
* respect to the base of the table. Files outside of the table path will be ignored.
* Then we take a diff of the files and delete directories that were already empty, and all files
* that are within the table that are no longer tracked.
* Clears all untracked files and folders within this table. If the inventory is not provided
* then the command first lists all the files and directories in the table, if inventory is
* provided then it will be used for identifying files and directories within the table and
* gets the relative paths with respect to the base of the table. Then the command gets the
* list of all tracked files for this table, which may or may not be within the table base path,
* and gets the relative paths of all the tracked files with respect to the base of the table.
* Files outside of the table path will be ignored. Then we take a diff of the files and delete
* directories that were already empty, and all files that are within the table that are no longer
* tracked.
*
* @param dryRun If set to true, no files will be deleted. Instead, we will list all files and
* directories that will be cleared.
* @param retentionHours An optional parameter to override the default Delta tombstone retention
* period
* @param inventory An optional dataframe of files and directories within the table generated
* from sources like blob store inventory report
* @return A Dataset containing the paths of the files/folders to delete in dryRun mode. Otherwise
* returns the base path of the table.
*/
Expand All @@ -146,6 +195,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
deltaLog: DeltaLog,
dryRun: Boolean = true,
retentionHours: Option[Double] = None,
inventory: Option[DataFrame] = None,
clock: Clock = new SystemClock): DataFrame = {
recordDeltaOperation(deltaLog, "delta.gc") {

Expand Down Expand Up @@ -189,16 +239,18 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {

val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism

val allFilesAndDirs = DeltaFileOperations.recursiveListDirs(
val allFilesAndDirsWithDuplicates = inventory match {
case Some(inventoryDF) => getFilesFromInventory(basePath, partitionColumns, inventoryDF)
case None => DeltaFileOperations.recursiveListDirs(
spark,
Seq(basePath),
hadoopConf,
hiddenDirNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
fileListingParallelism = Option(parallelism)
)
.groupByKey(_.path)
}
val allFilesAndDirs = allFilesAndDirsWithDuplicates.groupByKey(_.path)
.mapGroups { (k, v) =>
val duplicates = v.toSeq
// of all the duplicates we can return the newest file.
Expand Down
19 changes: 11 additions & 8 deletions spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,30 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
// Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`.
val parser = new DeltaSqlParser(null)
assert(parser.parsePlan("vacuum 123_") ===
VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, false))
VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, None, None, false))
assert(parser.parsePlan("vacuum 1a.123_") ===
VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None), None, false))
VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None),
None, None, None, false))
assert(parser.parsePlan("vacuum a.123A") ===
VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None),
None, None, None, false))
assert(parser.parsePlan("vacuum a.123E3_column") ===
VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None),
None, None, None, false))
assert(parser.parsePlan("vacuum a.123D_column") ===
VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM", None),
None, false))
None, None, None, false))
assert(parser.parsePlan("vacuum a.123BD_column") ===
VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM", None),
None, false))
None, None, None, false))

assert(parser.parsePlan("vacuum delta.`/tmp/table`") ===
VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM", None),
None, false))
None, None, None, false))

assert(parser.parsePlan("vacuum \"/tmp/table\"") ===
VacuumTableCommand(
UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, false))
UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, None, None, false))
}

test("Restore command is parsed as expected") {
Expand Down

0 comments on commit 7d41fb7

Please sign in to comment.