Skip to content

Commit

Permalink
[SPARK-42521][SQL] Add NULLs for INSERTs with user-specified lists of…
Browse files Browse the repository at this point in the history
… fewer columns than the target table

### What changes were proposed in this pull request?

Add NULLs for INSERTs with user-specified lists of fewer columns than the target table.

This is done by updating the semantics of the `USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES` SQLConf to only apply for INSERTs with explicit user-specific column lists, and changing it to true by default.

### Why are the changes needed?

This behavior is consistent with other query engines.

### Does this PR introduce _any_ user-facing change?

Yes, per above.

### How was this patch tested?

Unit test coverage in `InsertSuite`.

Closes #40229 from dtenedor/defaults-insert-nulls.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit d2a527a)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
dtenedor authored and gengliangwang committed Mar 2, 2023
1 parent a1d5e89 commit d8fa508
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 122 deletions.
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ license: |

## Upgrading from Spark SQL 3.3 to 3.4

- Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, these commands would have failed returning errors reporting that the number of provided columns does not match the number of columns in the target table. Note that disabling `spark.sql.defaultColumn.useNullsForMissingDefaultValues` will restore the previous behavior.
- Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed.
- Since Spark 3.4, v1 database, table, permanent view and function identifier will include 'spark_catalog' as the catalog name if database is defined, e.g. a table identifier will be: `spark_catalog.default.t`. To restore the legacy behavior, set `spark.sql.legacy.v1IdentifierNoCatalog` to `true`.
- Since Spark 3.4, when ANSI SQL mode(configuration `spark.sql.ansi.enabled`) is on, Spark SQL always returns NULL result on getting a map value with a non-existing key. In Spark 3.3 or earlier, there will be an error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
val regenerated: InsertIntoStatement =
regenerateUserSpecifiedCols(i, schema)
val expanded: LogicalPlan =
addMissingDefaultValuesForInsertFromInlineTable(node, schema)
addMissingDefaultValuesForInsertFromInlineTable(node, schema, i.userSpecifiedCols.size)
val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
replaced.map { r: LogicalPlan =>
Expand All @@ -132,7 +132,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema)
val project: Project = i.query.asInstanceOf[Project]
val expanded: Project =
addMissingDefaultValuesForInsertFromProject(project, schema)
addMissingDefaultValuesForInsertFromProject(project, schema, i.userSpecifiedCols.size)
val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
replaced.map { r =>
Expand Down Expand Up @@ -273,15 +273,16 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
*/
private def addMissingDefaultValuesForInsertFromInlineTable(
node: LogicalPlan,
insertTableSchemaWithoutPartitionColumns: StructType): LogicalPlan = {
val numQueryOutputs: Int = node match {
case table: UnresolvedInlineTable => table.rows(0).size
case local: LocalRelation => local.data(0).numFields
}
insertTableSchemaWithoutPartitionColumns: StructType,
numUserSpecifiedColumns: Int): LogicalPlan = {
val schema = insertTableSchemaWithoutPartitionColumns
val newDefaultExpressions: Seq[Expression] =
getDefaultExpressionsForInsert(numQueryOutputs, schema)
val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map { _.name }
getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
schema.fields.drop(numUserSpecifiedColumns).map(_.name)
} else {
schema.fields.map(_.name)
}
node match {
case _ if newDefaultExpressions.isEmpty => node
case table: UnresolvedInlineTable =>
Expand All @@ -306,11 +307,11 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
*/
private def addMissingDefaultValuesForInsertFromProject(
project: Project,
insertTableSchemaWithoutPartitionColumns: StructType): Project = {
val numQueryOutputs: Int = project.projectList.size
insertTableSchemaWithoutPartitionColumns: StructType,
numUserSpecifiedColumns: Int): Project = {
val schema = insertTableSchemaWithoutPartitionColumns
val newDefaultExpressions: Seq[Expression] =
getDefaultExpressionsForInsert(numQueryOutputs, schema)
getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
val newAliases: Seq[NamedExpression] =
newDefaultExpressions.zip(schema.fields).map {
case (expr, field) => Alias(expr, field.name)()
Expand All @@ -322,9 +323,13 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
* This is a helper for the addMissingDefaultValuesForInsertFromInlineTable methods above.
*/
private def getDefaultExpressionsForInsert(
numQueryOutputs: Int,
schema: StructType): Seq[Expression] = {
val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
schema: StructType,
numUserSpecifiedColumns: Int): Seq[Expression] = {
val remainingFields: Seq[StructField] = if (numUserSpecifiedColumns > 0) {
schema.fields.drop(numUserSpecifiedColumns)
} else {
Seq.empty
}
val numDefaultExpressionsToAdd = getStructFieldsForDefaultExpressions(remainingFields).size
Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3115,13 +3115,12 @@ object SQLConf {
val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES =
buildConf("spark.sql.defaultColumn.useNullsForMissingDefaultValues")
.internal()
.doc("When true, and DEFAULT columns are enabled, allow column definitions lacking " +
"explicit default values to behave as if they had specified DEFAULT NULL instead. " +
"For example, this allows most INSERT INTO statements to specify only a prefix of the " +
"columns in the target table, and the remaining columns will receive NULL values.")
.doc("When true, and DEFAULT columns are enabled, allow INSERT INTO commands with user-" +
"specified lists of fewer columns than the target table to behave as if they had " +
"specified DEFAULT for all remaining columns instead, in order.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION =
buildConf("spark.sql.legacy.skipTypeValidationOnAlterPartition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {

withTable("t1") {
createTable("t1", cols, Seq.fill(4)("int"))
val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)"))
val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 values(1)"))
assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") ||
e1.getMessage.contains("expected 4 columns but found 1") ||
e1.getMessage.contains("not enough data columns") ||
Expand All @@ -217,7 +217,7 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
withTable("t1") {
createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2))
val e1 = intercept[AnalysisException] {
sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)")
sql(s"INSERT INTO t1 partition(c3=3, c4=4) values(1)")
}
assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") ||
e1.getMessage.contains("not enough data columns") ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,8 @@ class DataSourceV2SQLSuiteV1Filter
verifyTable(t1, df)
// Missing columns
assert(intercept[AnalysisException] {
sql(s"INSERT INTO $t1(data) VALUES(4)")
}.getMessage.contains("Cannot find data for output column 'id'"))
sql(s"INSERT INTO $t1 VALUES(4)")
}.getMessage.contains("not enough data columns"))
// Duplicate columns
checkError(
exception = intercept[AnalysisException] {
Expand All @@ -1121,9 +1121,9 @@ class DataSourceV2SQLSuiteV1Filter
verifyTable(t1, Seq((3L, "c")).toDF("id", "data"))
// Missing columns
assert(intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $t1(data) VALUES(4)")
}.getMessage.contains("Cannot find data for output column 'id'"))
// Duplicate columns
sql(s"INSERT OVERWRITE $t1 VALUES(4)")
}.getMessage.contains("not enough data columns"))
// Duplicate columns
checkError(
exception = intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)")
Expand All @@ -1150,9 +1150,9 @@ class DataSourceV2SQLSuiteV1Filter
verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data", "data2"))
// Missing columns
assert(intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $t1(data, id) VALUES('a', 4)")
}.getMessage.contains("Cannot find data for output column 'data2'"))
// Duplicate columns
sql(s"INSERT OVERWRITE $t1 VALUES('a', 4)")
}.getMessage.contains("not enough data columns"))
// Duplicate columns
checkError(
exception = intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)")
Expand Down
Loading

0 comments on commit d8fa508

Please sign in to comment.