Skip to content

Commit

Permalink
[Spark] Support dropping the CHECK constraints table feature
Browse files Browse the repository at this point in the history
  • Loading branch information
tomvanbussel committed Apr 29, 2024
1 parent 4b835e9 commit 643c881
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 1 deletion.
7 changes: 7 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@
],
"sqlState" : "42703"
},
"DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE": {
"message" : [
"Cannot drop the CHECK constraints table feature.",
"The following constraints must be dropped first: <constraints>."
],
"sqlState" : "0AKDE"
},
"DELTA_CANNOT_EVALUATE_EXPRESSION" : {
"message" : [
"Cannot evaluate expression: <expression>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ trait DeltaErrorsBase
messageParameters = Array.empty)
}

def cannotDropCheckConstraintFeature(constraintNames: Seq[String]): AnalysisException = {
new DeltaAnalysisException(
errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE",
messageParameters = Array(constraintNames.map(formatColumn).mkString(", "))
)
}

def incorrectLogStoreImplementationException(
sparkConf: SparkConf,
cause: Throwable): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.util.ScalaExtensions._
Expand Down Expand Up @@ -271,3 +272,12 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
true
}
}

case class CheckConstraintsPreDowngradeTableFeatureCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand {
override def removeFeatureTracesIfNeeded(): Boolean = {
val checkConstraintNames = Constraints.getCheckConstraintNames(table.initialSnapshot.metadata)
if (checkConstraintNames.isEmpty) return false
throw DeltaErrors.cannotDropCheckConstraintFeature(checkConstraintNames)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,24 @@ object InvariantsTableFeature

object CheckConstraintsTableFeature
extends LegacyWriterFeature(name = "checkConstraints", minWriterVersion = 3)
with FeatureAutomaticallyEnabledByMetadata {
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
Constraints.getCheckConstraints(metadata, spark).nonEmpty
}

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
CheckConstraintsPreDowngradeTableFeatureCommand(table)

override def validateRemoval(snapshot: Snapshot): Boolean =
Constraints.getCheckConstraintNames(snapshot.metadata).isEmpty

override def actionUsesFeature(action: Action): Boolean = action match {
case m: Metadata => Constraints.getCheckConstraintNames(m).nonEmpty
case _ => false
}
}

object ChangeDataFeedTableFeature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ object Constraints {
/** A SQL expression to check for when writing out data. */
case class Check(name: String, expression: Expression) extends Constraint

def getCheckConstraintNames(metadata: Metadata): Seq[String] = {
metadata.configuration.keys.collect {
case key if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") =>
key.stripPrefix("delta.constraints.")
}.toSeq
}

/**
* Extract CHECK constraints from the table properties. Note that some CHECK constraints may also
* come from schema metadata; these constraints were never released in a public API but are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,34 @@ class CheckConstraintsSuite extends QueryTest
}
}

test("drop table feature") {
withTable("table") {
sql("CREATE TABLE table (a INT, b INT) USING DELTA " +
"TBLPROPERTIES ('delta.feature.checkConstraints' = 'supported')")
sql("ALTER TABLE table ADD CONSTRAINT c1 CHECK (a > 0)")
sql("ALTER TABLE table ADD CONSTRAINT c2 CHECK (b > 0)")

val error1 = intercept[AnalysisException] {
sql("ALTER TABLE table DROP FEATURE checkConstraints")
}
checkError(
error1,
errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE",
parameters = Map("constraints" -> "`c1`, `c2`")
)

sql("ALTER TABLE table DROP CONSTRAINT c1")
val error2 = intercept[AnalysisException] {
sql("ALTER TABLE table DROP FEATURE checkConstraints")
}
checkError(
error2,
errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE",
parameters = Map("constraints" -> "`c2`")
)

sql("ALTER TABLE table DROP CONSTRAINT c2")
sql("ALTER TABLE table DROP FEATURE checkConstraints")
}
}
}

0 comments on commit 643c881

Please sign in to comment.