diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 08e30e7fc35..25b5c895f27 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -170,6 +170,13 @@ ], "sqlState" : "42703" }, + "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE" : { + "message" : [ + "Cannot drop the CHECK constraints table feature.", + "The following constraints must be dropped first: ." + ], + "sqlState" : "0AKDE" + }, "DELTA_CANNOT_EVALUATE_EXPRESSION" : { "message" : [ "Cannot evaluate expression: " diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 5245f8f91a8..14ea0b7a957 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -335,6 +335,13 @@ trait DeltaErrorsBase messageParameters = Array.empty) } + def cannotDropCheckConstraintFeature(constraintNames: Seq[String]): AnalysisException = { + new DeltaAnalysisException( + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + messageParameters = Array(constraintNames.map(formatColumn).mkString(", ")) + ) + } + def incorrectLogStoreImplementationException( sparkConf: SparkConf, cause: Throwable): Throwable = { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 96a17b72b82..e5f96f1c21c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics +import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} @@ -378,3 +379,22 @@ case class ColumnMappingPreDowngradeCommand(table: DeltaTableV2) true } } + +case class CheckConstraintsPreDowngradeTableFeatureCommand(table: DeltaTableV2) + extends PreDowngradeTableFeatureCommand { + + /** + * Throws an exception if the table has CHECK constraints, and returns false otherwise (as no + * action was required). + * + * We intentionally error out instead of removing the CHECK constraints here, as dropping a + * table feature should not never alter the logical representation of a table (only its physical + * representation). Instead, we ask the user to explicitly drop the constraints before the table + * feature can be dropped. + */ + override def removeFeatureTracesIfNeeded(): Boolean = { + val checkConstraintNames = Constraints.getCheckConstraintNames(table.initialSnapshot.metadata) + if (checkConstraintNames.isEmpty) return false + throw DeltaErrors.cannotDropCheckConstraintFeature(checkConstraintNames) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 25513c2e657..e930455d514 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -457,12 +457,24 @@ object InvariantsTableFeature object CheckConstraintsTableFeature extends LegacyWriterFeature(name = "checkConstraints", minWriterVersion = 3) - with FeatureAutomaticallyEnabledByMetadata { + with FeatureAutomaticallyEnabledByMetadata + with RemovableFeature { override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = { Constraints.getCheckConstraints(metadata, spark).nonEmpty } + + override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = + CheckConstraintsPreDowngradeTableFeatureCommand(table) + + override def validateRemoval(snapshot: Snapshot): Boolean = + Constraints.getCheckConstraintNames(snapshot.metadata).isEmpty + + override def actionUsesFeature(action: Action): Boolean = { + // This method is never called, as it is only used for ReaderWriterFeatures. + throw new UnsupportedOperationException() + } } object ChangeDataFeedTableFeature diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala index 96c066b2537..11920e2220c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala @@ -49,6 +49,13 @@ object Constraints { /** A SQL expression to check for when writing out data. */ case class Check(name: String, expression: Expression) extends Constraint + def getCheckConstraintNames(metadata: Metadata): Seq[String] = { + metadata.configuration.keys.collect { + case key if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") => + key.stripPrefix("delta.constraints.") + }.toSeq + } + /** * Extract CHECK constraints from the table properties. Note that some CHECK constraints may also * come from schema metadata; these constraints were never released in a public API but are diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala index 16293c05804..954286bbe00 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala @@ -18,6 +18,9 @@ package org.apache.spark.sql.delta.schema import scala.collection.JavaConverters._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.DeltaLog + // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.constraints.CharVarcharConstraint import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -459,4 +462,39 @@ class CheckConstraintsSuite extends QueryTest ) } } + + test("drop table feature") { + withTable("table") { + sql("CREATE TABLE table (a INT, b INT) USING DELTA " + + "TBLPROPERTIES ('delta.feature.checkConstraints' = 'supported')") + sql("ALTER TABLE table ADD CONSTRAINT c1 CHECK (a > 0)") + sql("ALTER TABLE table ADD CONSTRAINT c2 CHECK (b > 0)") + + val error1 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + checkError( + error1, + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + parameters = Map("constraints" -> "`c1`, `c2`") + ) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("table")) + assert(deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) + + sql("ALTER TABLE table DROP CONSTRAINT c1") + val error2 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + checkError( + error2, + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + parameters = Map("constraints" -> "`c2`") + ) + assert(deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) + + sql("ALTER TABLE table DROP CONSTRAINT c2") + sql("ALTER TABLE table DROP FEATURE checkConstraints") + assert(!deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) + } + } }