From 01dd8131e534c872fe2717994a2b9f9c49387af5 Mon Sep 17 00:00:00 2001 From: Tom van Bussel Date: Sat, 27 Apr 2024 21:50:33 +0200 Subject: [PATCH 1/2] [Spark] Support dropping the CHECK constraints table feature --- .../resources/error/delta-error-classes.json | 7 +++++ .../apache/spark/sql/delta/DeltaErrors.scala | 7 +++++ .../PreDowngradeTableFeatureCommand.scala | 10 +++++++ .../apache/spark/sql/delta/TableFeature.scala | 14 ++++++++- .../sql/delta/constraints/Constraints.scala | 7 +++++ .../delta/schema/CheckConstraintsSuite.scala | 30 +++++++++++++++++++ 6 files changed, 74 insertions(+), 1 deletion(-) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index b64ff8da033..dd3b95192fb 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -164,6 +164,13 @@ ], "sqlState" : "42703" }, + "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE" : { + "message" : [ + "Cannot drop the CHECK constraints table feature.", + "The following constraints must be dropped first: ." + ], + "sqlState" : "0AKDE" + }, "DELTA_CANNOT_EVALUATE_EXPRESSION" : { "message" : [ "Cannot evaluate expression: " diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index dc4c90b1916..14a52da98ed 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -335,6 +335,13 @@ trait DeltaErrorsBase messageParameters = Array.empty) } + def cannotDropCheckConstraintFeature(constraintNames: Seq[String]): AnalysisException = { + new DeltaAnalysisException( + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + messageParameters = Array(constraintNames.map(formatColumn).mkString(", ")) + ) + } + def incorrectLogStoreImplementationException( sparkConf: SparkConf, cause: Throwable): Throwable = { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 4e4774aef51..33b2c8b2056 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} +import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} import org.apache.spark.sql.util.ScalaExtensions._ @@ -271,3 +272,12 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) true } } + +case class CheckConstraintsPreDowngradeTableFeatureCommand(table: DeltaTableV2) + extends PreDowngradeTableFeatureCommand { + override def removeFeatureTracesIfNeeded(): Boolean = { + val checkConstraintNames = Constraints.getCheckConstraintNames(table.initialSnapshot.metadata) + if (checkConstraintNames.isEmpty) return false + throw DeltaErrors.cannotDropCheckConstraintFeature(checkConstraintNames) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index f58666d946a..8deffd3335b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -440,12 +440,24 @@ object InvariantsTableFeature object CheckConstraintsTableFeature extends LegacyWriterFeature(name = "checkConstraints", minWriterVersion = 3) - with FeatureAutomaticallyEnabledByMetadata { + with FeatureAutomaticallyEnabledByMetadata + with RemovableFeature { override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = { Constraints.getCheckConstraints(metadata, spark).nonEmpty } + + override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = + CheckConstraintsPreDowngradeTableFeatureCommand(table) + + override def validateRemoval(snapshot: Snapshot): Boolean = + Constraints.getCheckConstraintNames(snapshot.metadata).isEmpty + + override def actionUsesFeature(action: Action): Boolean = action match { + case m: Metadata => Constraints.getCheckConstraintNames(m).nonEmpty + case _ => false + } } object ChangeDataFeedTableFeature diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala index aae7524f3c2..666b578d79d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala @@ -49,6 +49,13 @@ object Constraints { /** A SQL expression to check for when writing out data. */ case class Check(name: String, expression: Expression) extends Constraint + def getCheckConstraintNames(metadata: Metadata): Seq[String] = { + metadata.configuration.keys.collect { + case key if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") => + key.stripPrefix("delta.constraints.") + }.toSeq + } + /** * Extract CHECK constraints from the table properties. Note that some CHECK constraints may also * come from schema metadata; these constraints were never released in a public API but are diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala index 8799ed020cf..ea44564d649 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala @@ -422,4 +422,34 @@ class CheckConstraintsSuite extends QueryTest } } + test("drop table feature") { + withTable("table") { + sql("CREATE TABLE table (a INT, b INT) USING DELTA " + + "TBLPROPERTIES ('delta.feature.checkConstraints' = 'supported')") + sql("ALTER TABLE table ADD CONSTRAINT c1 CHECK (a > 0)") + sql("ALTER TABLE table ADD CONSTRAINT c2 CHECK (b > 0)") + + val error1 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + checkError( + error1, + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + parameters = Map("constraints" -> "`c1`, `c2`") + ) + + sql("ALTER TABLE table DROP CONSTRAINT c1") + val error2 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + checkError( + error2, + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + parameters = Map("constraints" -> "`c2`") + ) + + sql("ALTER TABLE table DROP CONSTRAINT c2") + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + } } From 223571d504edac6c56bbaa0ef4610c84b60f36cc Mon Sep 17 00:00:00 2001 From: Tom van Bussel Date: Thu, 13 Jun 2024 13:49:53 +0200 Subject: [PATCH 2/2] Address feedback --- .../sql/delta/PreDowngradeTableFeatureCommand.scala | 10 ++++++++++ .../org/apache/spark/sql/delta/TableFeature.scala | 6 +++--- .../spark/sql/delta/schema/CheckConstraintsSuite.scala | 7 +++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index f05321a80a8..e5f96f1c21c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -382,6 +382,16 @@ case class ColumnMappingPreDowngradeCommand(table: DeltaTableV2) case class CheckConstraintsPreDowngradeTableFeatureCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand { + + /** + * Throws an exception if the table has CHECK constraints, and returns false otherwise (as no + * action was required). + * + * We intentionally error out instead of removing the CHECK constraints here, as dropping a + * table feature should not never alter the logical representation of a table (only its physical + * representation). Instead, we ask the user to explicitly drop the constraints before the table + * feature can be dropped. + */ override def removeFeatureTracesIfNeeded(): Boolean = { val checkConstraintNames = Constraints.getCheckConstraintNames(table.initialSnapshot.metadata) if (checkConstraintNames.isEmpty) return false diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 4810ad2aa08..e930455d514 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -471,9 +471,9 @@ object CheckConstraintsTableFeature override def validateRemoval(snapshot: Snapshot): Boolean = Constraints.getCheckConstraintNames(snapshot.metadata).isEmpty - override def actionUsesFeature(action: Action): Boolean = action match { - case m: Metadata => Constraints.getCheckConstraintNames(m).nonEmpty - case _ => false + override def actionUsesFeature(action: Action): Boolean = { + // This method is never called, as it is only used for ReaderWriterFeatures. + throw new UnsupportedOperationException() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala index d2dda17fcea..954286bbe00 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala @@ -18,6 +18,9 @@ package org.apache.spark.sql.delta.schema import scala.collection.JavaConverters._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.DeltaLog + // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.constraints.CharVarcharConstraint import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -475,6 +478,8 @@ class CheckConstraintsSuite extends QueryTest errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", parameters = Map("constraints" -> "`c1`, `c2`") ) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("table")) + assert(deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) sql("ALTER TABLE table DROP CONSTRAINT c1") val error2 = intercept[AnalysisException] { @@ -485,9 +490,11 @@ class CheckConstraintsSuite extends QueryTest errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", parameters = Map("constraints" -> "`c2`") ) + assert(deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) sql("ALTER TABLE table DROP CONSTRAINT c2") sql("ALTER TABLE table DROP FEATURE checkConstraints") + assert(!deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) } } }