From 0f21df0b29cc18f0e0c7b12543f3a037e4032e65 Mon Sep 17 00:00:00 2001 From: Uros Stankovic Date: Thu, 6 Jun 2024 13:08:48 -0700 Subject: [PATCH] [SPARK-48286] Fix analysis of column with exists default expression - Add user facing error ### What changes were proposed in this pull request? FIRST CHANGE Pass correct parameter list to `org.apache.spark.sql.catalyst.util.ResolveDefaultColumns#analyze` when it is invoked from `org.apache.spark.sql.connector.catalog.CatalogV2Util#structFieldToV2Column`. `org.apache.spark.sql.catalyst.util.ResolveDefaultColumns#analyze` method accepts 3 parameter 1) Field to analyze 2) Statement type - String 3) Metadata key - CURRENT_DEFAULT or EXISTS_DEFAULT Method `org.apache.spark.sql.connector.catalog.CatalogV2Util#structFieldToV2Column` pass `fieldToAnalyze` and `EXISTS_DEFAULT` as second parameter, so it is not metadata key, instead of that, it is statement type, so different expression is analyzed. Pull requests where original change was introduced https://github.com/apache/spark/pull/40049 - Initial commit https://github.com/apache/spark/pull/44876 - Refactor that did not touch the issue https://github.com/apache/spark/pull/44935 - Another refactor that did not touch the issue SECOND CHANGE Add user facing exception when default value is not foldable or resolved. Otherwise, user would see message "You hit a bug in Spark ...". ### Why are the changes needed? It is needed to pass correct value to `Column` object ### Does this PR introduce _any_ user-facing change? Yes, this is a bug fix, existence default value has now proper expression, but before this change, existence default value was actually current default value of column. ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #46594 from urosstan-db/SPARK-48286-Analyze-exists-default-expression-instead-of-current-default-expression. Lead-authored-by: Uros Stankovic Co-authored-by: Uros Stankovic <155642965+urosstan-db@users.noreply.github.com> Signed-off-by: Wenchen Fan --- .../util/ResolveDefaultColumnsUtil.scala | 16 +++++++++++++ .../sql/connector/catalog/CatalogV2Util.scala | 7 +++++- ...SourceV2DataFrameSessionCatalogSuite.scala | 9 ++++++- .../sql/connector/DataSourceV2SQLSuite.scala | 24 +++++++++++++++++++ 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index d73e2ca6bd9d4..ad104b6e0c765 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -284,6 +284,7 @@ object ResolveDefaultColumns extends QueryErrorsBase throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions( statementType, colName, defaultSQL) } + // Analyze the parse result. val plan = try { val analyzer: Analyzer = DefaultColumnAnalyzer @@ -298,6 +299,21 @@ object ResolveDefaultColumns extends QueryErrorsBase val analyzed: Expression = plan.collectFirst { case Project(Seq(a: Alias), OneRowRelation()) => a.child }.get + + if (!analyzed.foldable) { + throw QueryCompilationErrors.defaultValueNotConstantError(statementType, colName, defaultSQL) + } + + // Another extra check, expressions should already be resolved if AnalysisException is not + // thrown in the code block above + if (!analyzed.resolved) { + throw QueryCompilationErrors.defaultValuesUnresolvedExprError( + statementType, + colName, + defaultSQL, + cause = null) + } + // Perform implicit coercion from the provided expression type to the required column type. coerceDefaultValue(analyzed, dataType, statementType, colName, defaultSQL) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 5485f5255b6e7..f36310e8ad899 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -525,10 +525,15 @@ private[sql] object CatalogV2Util { } if (isDefaultColumn) { - val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY) + val e = analyze( + f, + statementType = "Column analysis", + metadataKey = EXISTS_DEFAULT_COLUMN_METADATA_KEY) + assert(e.resolved && e.foldable, "The existence default value must be a simple SQL string that is resolved and foldable, " + "but got: " + f.getExistenceDefaultValue().get) + val defaultValue = new ColumnDefaultValue( f.getCurrentDefaultValue().get, LiteralValue(e.eval(), f.dataType)) val cleanedMetadata = metadataWithKeysRemoved( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 5d5ea6499c49d..7bbb6485c273f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -110,7 +110,14 @@ class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable Option(tables.get(ident)) match { case Some(table) => val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes) - val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE") + val provider = Option(properties.get("provider")) + + val schema = CatalogV2Util.applySchemaChanges( + table.schema, + changes, + provider, + "ALTER TABLE" + ) // fail if the last column in the schema was dropped if (schema.fields.isEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 14b9feb2951a9..5b2472f774278 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -3484,6 +3484,30 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-48286: Add new column with default value which is not foldable") { + val foldableExpressions = Seq("1", "2 + 1") + withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> v2Source) { + withTable("tab") { + spark.sql(s"CREATE TABLE tab (col1 INT DEFAULT 100) USING $v2Source") + val exception = intercept[AnalysisException] { + // Rand function is not foldable + spark.sql(s"ALTER TABLE tab ADD COLUMN col2 DOUBLE DEFAULT rand()") + } + assert(exception.getSqlState == "42623") + assert(exception.errorClass.get == "INVALID_DEFAULT_VALUE.NOT_CONSTANT") + assert(exception.messageParameters("colName") == "`col2`") + assert(exception.messageParameters("defaultValue") == "rand()") + assert(exception.messageParameters("statement") == "ALTER TABLE") + } + foldableExpressions.foreach(expr => { + withTable("tab") { + spark.sql(s"CREATE TABLE tab (col1 INT DEFAULT 100) USING $v2Source") + spark.sql(s"ALTER TABLE tab ADD COLUMN col2 DOUBLE DEFAULT $expr") + } + }) + } + } + private def testNotSupportedV2Command( sqlCommand: String, sqlParams: String,