Skip to content

Commit

Permalink
[SPARK-48286] Fix analysis of column with exists default expression -…
Browse files Browse the repository at this point in the history
… Add user facing error

### What changes were proposed in this pull request?

FIRST CHANGE

Pass correct parameter list to `org.apache.spark.sql.catalyst.util.ResolveDefaultColumns#analyze` when it is invoked from `org.apache.spark.sql.connector.catalog.CatalogV2Util#structFieldToV2Column`.

`org.apache.spark.sql.catalyst.util.ResolveDefaultColumns#analyze` method accepts 3 parameter

1) Field to analyze
2) Statement type - String
3) Metadata key - CURRENT_DEFAULT or EXISTS_DEFAULT

Method `org.apache.spark.sql.connector.catalog.CatalogV2Util#structFieldToV2Column`
pass `fieldToAnalyze` and `EXISTS_DEFAULT` as second parameter, so it is not metadata key, instead of that, it is statement type, so different expression is analyzed.

Pull requests where original change was introduced
apache#40049 - Initial commit
apache#44876 - Refactor that did not touch the issue
apache#44935 - Another refactor that did not touch the issue

SECOND CHANGE
Add user facing exception when default value is not foldable or resolved. Otherwise, user would see message "You hit a bug in Spark ...".
### Why are the changes needed?
It is needed to pass correct value to `Column` object

### Does this PR introduce _any_ user-facing change?
Yes, this is a bug fix, existence default value has now proper expression, but before this change, existence default value was actually current default value of column.

### How was this patch tested?
Unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46594 from urosstan-db/SPARK-48286-Analyze-exists-default-expression-instead-of-current-default-expression.

Lead-authored-by: Uros Stankovic <uros.stankovic@databricks.com>
Co-authored-by: Uros Stankovic <155642965+urosstan-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed Jun 6, 2024
1 parent 84fa052 commit 0f21df0
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
statementType, colName, defaultSQL)
}

// Analyze the parse result.
val plan = try {
val analyzer: Analyzer = DefaultColumnAnalyzer
Expand All @@ -298,6 +299,21 @@ object ResolveDefaultColumns extends QueryErrorsBase
val analyzed: Expression = plan.collectFirst {
case Project(Seq(a: Alias), OneRowRelation()) => a.child
}.get

if (!analyzed.foldable) {
throw QueryCompilationErrors.defaultValueNotConstantError(statementType, colName, defaultSQL)
}

// Another extra check, expressions should already be resolved if AnalysisException is not
// thrown in the code block above
if (!analyzed.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
statementType,
colName,
defaultSQL,
cause = null)
}

// Perform implicit coercion from the provided expression type to the required column type.
coerceDefaultValue(analyzed, dataType, statementType, colName, defaultSQL)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,15 @@ private[sql] object CatalogV2Util {
}

if (isDefaultColumn) {
val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY)
val e = analyze(
f,
statementType = "Column analysis",
metadataKey = EXISTS_DEFAULT_COLUMN_METADATA_KEY)

assert(e.resolved && e.foldable,
"The existence default value must be a simple SQL string that is resolved and foldable, " +
"but got: " + f.getExistenceDefaultValue().get)

val defaultValue = new ColumnDefaultValue(
f.getCurrentDefaultValue().get, LiteralValue(e.eval(), f.dataType))
val cleanedMetadata = metadataWithKeysRemoved(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,14 @@ class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable
Option(tables.get(ident)) match {
case Some(table) =>
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
val provider = Option(properties.get("provider"))

val schema = CatalogV2Util.applySchemaChanges(
table.schema,
changes,
provider,
"ALTER TABLE"
)

// fail if the last column in the schema was dropped
if (schema.fields.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3484,6 +3484,30 @@ class DataSourceV2SQLSuiteV1Filter
}
}

test("SPARK-48286: Add new column with default value which is not foldable") {
val foldableExpressions = Seq("1", "2 + 1")
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> v2Source) {
withTable("tab") {
spark.sql(s"CREATE TABLE tab (col1 INT DEFAULT 100) USING $v2Source")
val exception = intercept[AnalysisException] {
// Rand function is not foldable
spark.sql(s"ALTER TABLE tab ADD COLUMN col2 DOUBLE DEFAULT rand()")
}
assert(exception.getSqlState == "42623")
assert(exception.errorClass.get == "INVALID_DEFAULT_VALUE.NOT_CONSTANT")
assert(exception.messageParameters("colName") == "`col2`")
assert(exception.messageParameters("defaultValue") == "rand()")
assert(exception.messageParameters("statement") == "ALTER TABLE")
}
foldableExpressions.foreach(expr => {
withTable("tab") {
spark.sql(s"CREATE TABLE tab (col1 INT DEFAULT 100) USING $v2Source")
spark.sql(s"ALTER TABLE tab ADD COLUMN col2 DOUBLE DEFAULT $expr")
}
})
}
}

private def testNotSupportedV2Command(
sqlCommand: String,
sqlParams: String,
Expand Down

0 comments on commit 0f21df0

Please sign in to comment.