Skip to content

Commit

Permalink
[Spark]Add VacuumProtocolCheck ReaderWriter Table Feature (#2730)
Browse files Browse the repository at this point in the history
## Description

Add a new VacuumProtocolCheck ReaderWriter Table Feature so that Vacuum
command on older DBR client and OSS clients fail.
This is in follow-up to #2557
where protocol-check was added during the vacuum-write flow.

## How was this patch tested?
UTs

## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
sumeet-db committed Mar 13, 2024
1 parent 60914cd commit 2e197f1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ object TableFeature {
// Row IDs are still under development and only available in testing.
RowTrackingFeature,
InCommitTimestampTableFeature,
TypeWideningTableFeature)
TypeWideningTableFeature,
VacuumProtocolCheckTableFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -656,6 +657,19 @@ object InCommitTimestampTableFeature
}
}

/**
* A ReaderWriter table feature for VACUUM. If this feature is enabled:
* A writer should follow one of the following:
* 1. Non-Support for Vacuum: Writers can explicitly state that they do not support VACUUM for
* any table, regardless of whether the Vacuum Protocol Check Table feature exists.
* 2. Implement Writer Protocol Check: Ensure that the VACUUM implementation includes a writer
* protocol check before any file deletions occur.
* Readers don't need to understand or change anything new; they just need to acknowledge the
* feature exists
*/
object VacuumProtocolCheckTableFeature
extends ReaderWriterFeature(name = "vacuumProtocolCheck-dev")

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,33 @@ class DeltaTableFeatureSuite
}
}

for(commandName <- Seq("ALTER", "CLONE", "REPLACE", "CREATE OR REPLACE")) {
test(s"Vacuum Protocol Check is disabled by default but can be enabled during $commandName") {
val table = "tbl"
withTable(table) {
spark.range(0).write.format("delta").saveAsTable(table)
val log = DeltaLog.forTable(spark, TableIdentifier(table))
val protocol = log.update().protocol
assert(!protocol.readerAndWriterFeatureNames.contains(VacuumProtocolCheckTableFeature.name))

val tblProperties1 = Seq(s"'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION")
sql(buildTablePropertyModifyingCommand(
commandName, targetTableName = table, sourceTableName = table, tblProperties1))
val newProtocol1 = log.update().protocol
assert(!newProtocol1.readerAndWriterFeatureNames.contains(
VacuumProtocolCheckTableFeature.name))

val tblProperties2 = Seq(s"'$FEATURE_PROP_PREFIX${VacuumProtocolCheckTableFeature.name}' " +
s"= 'supported', 'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION")
sql(buildTablePropertyModifyingCommand(
commandName, targetTableName = table, sourceTableName = table, tblProperties2))
val newProtocol2 = log.update().protocol
assert(newProtocol2.readerAndWriterFeatureNames.contains(
VacuumProtocolCheckTableFeature.name))
}
}
}

private def buildTablePropertyModifyingCommand(
commandName: String,
targetTableName: String,
Expand Down

0 comments on commit 2e197f1

Please sign in to comment.