From 643c88129d0e1ed821f3df206c007a10c1885053 Mon Sep 17 00:00:00 2001 From: Tom van Bussel Date: Sat, 27 Apr 2024 21:50:33 +0200 Subject: [PATCH] [Spark] Support dropping the CHECK constraints table feature --- .../resources/error/delta-error-classes.json | 7 +++++ .../apache/spark/sql/delta/DeltaErrors.scala | 7 +++++ .../PreDowngradeTableFeatureCommand.scala | 10 +++++++ .../apache/spark/sql/delta/TableFeature.scala | 14 ++++++++- .../sql/delta/constraints/Constraints.scala | 7 +++++ .../delta/schema/CheckConstraintsSuite.scala | 30 +++++++++++++++++++ 6 files changed, 74 insertions(+), 1 deletion(-) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index b64ff8da033..2a73433c6b6 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -164,6 +164,13 @@ ], "sqlState" : "42703" }, + "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE": { + "message" : [ + "Cannot drop the CHECK constraints table feature.", + "The following constraints must be dropped first: ." + ], + "sqlState" : "0AKDE" + }, "DELTA_CANNOT_EVALUATE_EXPRESSION" : { "message" : [ "Cannot evaluate expression: " diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index dc4c90b1916..14a52da98ed 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -335,6 +335,13 @@ trait DeltaErrorsBase messageParameters = Array.empty) } + def cannotDropCheckConstraintFeature(constraintNames: Seq[String]): AnalysisException = { + new DeltaAnalysisException( + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + messageParameters = Array(constraintNames.map(formatColumn).mkString(", ")) + ) + } + def incorrectLogStoreImplementationException( sparkConf: SparkConf, cause: Throwable): Throwable = { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 4e4774aef51..33b2c8b2056 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} +import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} import org.apache.spark.sql.util.ScalaExtensions._ @@ -271,3 +272,12 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) true } } + +case class CheckConstraintsPreDowngradeTableFeatureCommand(table: DeltaTableV2) + extends PreDowngradeTableFeatureCommand { + override def removeFeatureTracesIfNeeded(): Boolean = { + val checkConstraintNames = Constraints.getCheckConstraintNames(table.initialSnapshot.metadata) + if (checkConstraintNames.isEmpty) return false + throw DeltaErrors.cannotDropCheckConstraintFeature(checkConstraintNames) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index f58666d946a..8deffd3335b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -440,12 +440,24 @@ object InvariantsTableFeature object CheckConstraintsTableFeature extends LegacyWriterFeature(name = "checkConstraints", minWriterVersion = 3) - with FeatureAutomaticallyEnabledByMetadata { + with FeatureAutomaticallyEnabledByMetadata + with RemovableFeature { override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = { Constraints.getCheckConstraints(metadata, spark).nonEmpty } + + override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = + CheckConstraintsPreDowngradeTableFeatureCommand(table) + + override def validateRemoval(snapshot: Snapshot): Boolean = + Constraints.getCheckConstraintNames(snapshot.metadata).isEmpty + + override def actionUsesFeature(action: Action): Boolean = action match { + case m: Metadata => Constraints.getCheckConstraintNames(m).nonEmpty + case _ => false + } } object ChangeDataFeedTableFeature diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala index aae7524f3c2..666b578d79d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala @@ -49,6 +49,13 @@ object Constraints { /** A SQL expression to check for when writing out data. */ case class Check(name: String, expression: Expression) extends Constraint + def getCheckConstraintNames(metadata: Metadata): Seq[String] = { + metadata.configuration.keys.collect { + case key if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") => + key.stripPrefix("delta.constraints.") + }.toSeq + } + /** * Extract CHECK constraints from the table properties. Note that some CHECK constraints may also * come from schema metadata; these constraints were never released in a public API but are diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala index 8799ed020cf..ea44564d649 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala @@ -422,4 +422,34 @@ class CheckConstraintsSuite extends QueryTest } } + test("drop table feature") { + withTable("table") { + sql("CREATE TABLE table (a INT, b INT) USING DELTA " + + "TBLPROPERTIES ('delta.feature.checkConstraints' = 'supported')") + sql("ALTER TABLE table ADD CONSTRAINT c1 CHECK (a > 0)") + sql("ALTER TABLE table ADD CONSTRAINT c2 CHECK (b > 0)") + + val error1 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + checkError( + error1, + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + parameters = Map("constraints" -> "`c1`, `c2`") + ) + + sql("ALTER TABLE table DROP CONSTRAINT c1") + val error2 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + checkError( + error2, + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + parameters = Map("constraints" -> "`c2`") + ) + + sql("ALTER TABLE table DROP CONSTRAINT c2") + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + } }