Skip to content

Commit 2833a3e

Browse files
[Spark] DROP Support for Vacuum Protocol Check table feature (#2983)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description DROP Support for Vacuum Protocol Check table feature ## How was this patch tested? UTs ## Does this PR introduce _any_ user-facing changes? No
1 parent f4a4944 commit 2833a3e

File tree

5 files changed

+129
-0
lines changed

5 files changed

+129
-0
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,13 @@
897897
],
898898
"sqlState" : "0AKDE"
899899
},
900+
"DELTA_FEATURE_DROP_DEPENDENT_FEATURE" : {
901+
"message" : [
902+
"Cannot drop table feature `<feature>` because some other features (<dependentFeatures>) in this table depends on `<feature>`.",
903+
"Consider dropping them first before dropping this feature."
904+
],
905+
"sqlState" : "0AKDE"
906+
},
900907
"DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT" : {
901908
"message" : [
902909
"Cannot drop <feature> from this table because it is not currently present in the table's protocol."

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,6 +2334,14 @@ trait DeltaErrorsBase
23342334
messageParameters = Array(feature))
23352335
}
23362336

2337+
def dropTableFeatureFailedBecauseOfDependentFeatures(
2338+
feature: String,
2339+
dependentFeatures: Seq[String]): DeltaTableFeatureException = {
2340+
new DeltaTableFeatureException(
2341+
errorClass = "DELTA_FEATURE_DROP_DEPENDENT_FEATURE",
2342+
messageParameters = Array(feature, dependentFeatures.mkString(", "), feature))
2343+
}
2344+
23372345
def dropTableFeatureConflictRevalidationFailed(
23382346
conflictingCommit: Option[CommitInfo] = None): DeltaTableFeatureException = {
23392347
val concurrentCommit = DeltaErrors.concurrentModificationExceptionMsg(

spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616

1717
package org.apache.spark.sql.delta
18+
1819
import java.util.concurrent.TimeUnit
1920

21+
import scala.util.control.NonFatal
22+
2023
import org.apache.spark.sql.delta.catalog.DeltaTableV2
2124
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
2225
import org.apache.spark.sql.delta.metering.DeltaLogging
@@ -187,6 +190,29 @@ case class InCommitTimestampsPreDowngradeCommand(table: DeltaTableV2)
187190
}
188191
}
189192

193+
case class VacuumProtocolCheckPreDowngradeCommand(table: DeltaTableV2)
194+
extends PreDowngradeTableFeatureCommand
195+
with DeltaLogging {
196+
197+
/**
198+
* Returns true when it performs a cleaning action. When no action was required
199+
* it returns false.
200+
* For downgrading the [[VacuumProtocolCheckTableFeature]], we don't need remove any traces, we
201+
* just need to remove the feature from the [[Protocol]].
202+
*/
203+
override def removeFeatureTracesIfNeeded(): Boolean = {
204+
val dependentFeatures = VacuumProtocolCheckTableFeature.otherFeaturesRequiringThisFeature
205+
val dependentFeaturesInProtocol =
206+
dependentFeatures.filter(table.initialSnapshot.protocol.isFeatureSupported(_))
207+
if (dependentFeaturesInProtocol.nonEmpty) {
208+
val dependentFeatureNames = dependentFeaturesInProtocol.map(_.name)
209+
throw DeltaErrors.dropTableFeatureFailedBecauseOfDependentFeatures(
210+
VacuumProtocolCheckTableFeature.name, dependentFeatureNames.toSeq)
211+
}
212+
false
213+
}
214+
}
215+
190216
case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
191217
extends PreDowngradeTableFeatureCommand
192218
with DeltaLogging {

spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,22 @@ object InCommitTimestampTableFeature
729729
*/
730730
object VacuumProtocolCheckTableFeature
731731
extends ReaderWriterFeature(name = "vacuumProtocolCheck")
732+
with RemovableFeature {
733+
734+
val otherFeaturesRequiringThisFeature = Set(ManagedCommitTableFeature)
735+
736+
override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = {
737+
VacuumProtocolCheckPreDowngradeCommand(table)
738+
}
739+
740+
// The delta snapshot doesn't have any trace of the [[VacuumProtocolCheckTableFeature]] feature.
741+
// Other than it being present in PROTOCOL, which will be handled by the table feature downgrade
742+
// command once this method returns true.
743+
override def validateRemoval(snapshot: Snapshot): Boolean = true
744+
745+
// None of the actions uses [[VacuumProtocolCheckTableFeature]]
746+
override def actionUsesFeature(action: Action): Boolean = false
747+
}
732748

733749
/**
734750
* Features below are for testing only, and are being registered to the system only in the testing

spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3534,6 +3534,78 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
35343534
testV2CheckpointTableFeatureDrop(V2Checkpoint.Format.PARQUET, true, true)
35353535
}
35363536

3537+
private def testRemoveVacuumProtocolCheckTableFeature(
3538+
enableFeatureInitially: Boolean,
3539+
additionalTableProperties: Seq[(String, String)] = Seq.empty,
3540+
downgradeFailsWithException: Option[String] = None,
3541+
featureExpectedAtTheEnd: Boolean = false): Unit = {
3542+
val featureName = VacuumProtocolCheckTableFeature.name
3543+
withTempDir { dir =>
3544+
val deltaLog = DeltaLog.forTable(spark, dir)
3545+
val finalAdditionalTableProperty = if (enableFeatureInitially) {
3546+
additionalTableProperties ++
3547+
Seq((s"$FEATURE_PROP_PREFIX${featureName}", "supported"))
3548+
} else {
3549+
additionalTableProperties
3550+
}
3551+
var additionalTablePropertyString =
3552+
finalAdditionalTableProperty.map { case (k, v) => s"'$k' = '$v'" }.mkString(", ")
3553+
if (additionalTablePropertyString.nonEmpty) {
3554+
additionalTablePropertyString = s", $additionalTablePropertyString"
3555+
}
3556+
sql(
3557+
s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta
3558+
|TBLPROPERTIES (
3559+
| delta.minReaderVersion = $TABLE_FEATURES_MIN_READER_VERSION,
3560+
| delta.minWriterVersion = $TABLE_FEATURES_MIN_WRITER_VERSION
3561+
| $additionalTablePropertyString
3562+
|)""".stripMargin)
3563+
3564+
val protocol = deltaLog.update().protocol
3565+
assert(protocol.minReaderVersion == TABLE_FEATURES_MIN_READER_VERSION)
3566+
assert(protocol.minWriterVersion == TABLE_FEATURES_MIN_WRITER_VERSION)
3567+
assert(protocol.readerFeatures.get.contains(featureName)
3568+
=== enableFeatureInitially)
3569+
downgradeFailsWithException match {
3570+
case Some(exceptionClass) =>
3571+
val e = intercept[DeltaTableFeatureException] {
3572+
AlterTableDropFeatureDeltaCommand(DeltaTableV2(spark, deltaLog.dataPath), featureName)
3573+
.run(spark)
3574+
}
3575+
assert(e.getErrorClass == exceptionClass)
3576+
case None =>
3577+
AlterTableDropFeatureDeltaCommand(DeltaTableV2(spark, deltaLog.dataPath), featureName)
3578+
.run(spark)
3579+
}
3580+
val latestProtocolReaderFeatures = deltaLog.update().protocol.readerFeatures.getOrElse(Set())
3581+
assert(
3582+
latestProtocolReaderFeatures.contains(VacuumProtocolCheckTableFeature.name) ===
3583+
featureExpectedAtTheEnd)
3584+
assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = featureExpectedAtTheEnd)
3585+
}
3586+
}
3587+
3588+
test("Remove VacuumProtocolCheckTableFeature when it was enabled") {
3589+
testRemoveVacuumProtocolCheckTableFeature(enableFeatureInitially = true)
3590+
}
3591+
3592+
test("Removing VacuumProtocolCheckTableFeature should fail when dependent feature " +
3593+
"Managed Commit is enabled") {
3594+
testRemoveVacuumProtocolCheckTableFeature(
3595+
enableFeatureInitially = true,
3596+
additionalTableProperties = Seq(
3597+
(s"$FEATURE_PROP_PREFIX${ManagedCommitTableFeature.name}", "supported")),
3598+
downgradeFailsWithException = Some("DELTA_FEATURE_DROP_DEPENDENT_FEATURE"),
3599+
featureExpectedAtTheEnd = true)
3600+
}
3601+
3602+
test("Removing VacuumProtocolCheckTableFeature should fail when it is not enabled") {
3603+
testRemoveVacuumProtocolCheckTableFeature(
3604+
enableFeatureInitially = false,
3605+
downgradeFailsWithException = Some("DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT")
3606+
)
3607+
}
3608+
35373609
private def validateICTRemovalMetrics(
35383610
usageLogs: Seq[UsageRecord],
35393611
expectEnablementProperty: Boolean,

0 commit comments

Comments
 (0)