Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values #37430

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
val regenerated: InsertIntoStatement =
regenerateUserSpecifiedCols(i, schema)
val expanded: LogicalPlan =
addMissingDefaultValuesForInsertFromInlineTable(node, schema)
addMissingDefaultValuesForInsertFromInlineTable(node, schema, i.userSpecifiedCols.nonEmpty)
val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
replaced.map { r: LogicalPlan =>
Expand All @@ -132,7 +132,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema)
val project: Project = i.query.asInstanceOf[Project]
val expanded: Project =
addMissingDefaultValuesForInsertFromProject(project, schema)
addMissingDefaultValuesForInsertFromProject(project, schema, i.userSpecifiedCols.nonEmpty)
val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
replaced.map { r =>
Expand Down Expand Up @@ -265,14 +265,15 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
*/
private def addMissingDefaultValuesForInsertFromInlineTable(
node: LogicalPlan,
insertTableSchemaWithoutPartitionColumns: StructType): LogicalPlan = {
insertTableSchemaWithoutPartitionColumns: StructType,
hasUserSpecifiedFields: Boolean): LogicalPlan = {
val numQueryOutputs: Int = node match {
case table: UnresolvedInlineTable => table.rows(0).size
case local: LocalRelation => local.data(0).numFields
}
val schema = insertTableSchemaWithoutPartitionColumns
val newDefaultExpressions: Seq[Expression] =
getDefaultExpressionsForInsert(numQueryOutputs, schema)
getDefaultExpressionsForInsert(numQueryOutputs, schema, hasUserSpecifiedFields)
val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map { _.name }
node match {
case _ if newDefaultExpressions.isEmpty => node
Expand All @@ -298,11 +299,12 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
*/
private def addMissingDefaultValuesForInsertFromProject(
project: Project,
insertTableSchemaWithoutPartitionColumns: StructType): Project = {
insertTableSchemaWithoutPartitionColumns: StructType,
hasUserSpecifiedFields: Boolean): Project = {
val numQueryOutputs: Int = project.projectList.size
val schema = insertTableSchemaWithoutPartitionColumns
val newDefaultExpressions: Seq[Expression] =
getDefaultExpressionsForInsert(numQueryOutputs, schema)
getDefaultExpressionsForInsert(numQueryOutputs, schema, hasUserSpecifiedFields)
val newAliases: Seq[NamedExpression] =
newDefaultExpressions.zip(schema.fields).map {
case (expr, field) => Alias(expr, field.name)()
Expand All @@ -315,20 +317,14 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
*/
private def getDefaultExpressionsForInsert(
numQueryOutputs: Int,
schema: StructType): Seq[Expression] = {
val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
val numDefaultExpressionsToAdd = getStructFieldsForDefaultExpressions(remainingFields).size
Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
}

/**
* This is a helper for the getDefaultExpressionsForInsert methods above.
*/
private def getStructFieldsForDefaultExpressions(fields: Seq[StructField]): Seq[StructField] = {
if (SQLConf.get.useNullsForMissingDefaultColumnValues) {
fields
schema: StructType,
hasUserSpecifiedFields: Boolean): Seq[Expression] = {
if (hasUserSpecifiedFields) {
val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
val numDefaultExpressionsToAdd = remainingFields.size
Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
} else {
fields.takeWhile(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY))
Seq.empty[Expression]
}
}

Expand Down Expand Up @@ -487,8 +483,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
schema.fields.filter {
field => !userSpecifiedColNames.contains(field.name)
}
Some(StructType(userSpecifiedFields ++
getStructFieldsForDefaultExpressions(nonUserSpecifiedFields)))
Some(StructType(userSpecifiedFields ++ nonUserSpecifiedFields))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2924,17 +2924,6 @@ object SQLConf {
.stringConf
.createWithDefault("csv,json,orc,parquet")

val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES =
buildConf("spark.sql.defaultColumn.useNullsForMissingDefaultValues")
.internal()
.doc("When true, and DEFAULT columns are enabled, allow column definitions lacking " +
"explicit default values to behave as if they had specified DEFAULT NULL instead. " +
"For example, this allows most INSERT INTO statements to specify only a prefix of the " +
"columns in the target table, and the remaining columns will receive NULL values.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)

val ENFORCE_RESERVED_KEYWORDS = buildConf("spark.sql.ansi.enforceReservedKeywords")
.doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser enforces the ANSI " +
"reserved keywords and forbids SQL queries that use reserved keywords as alias names " +
Expand Down Expand Up @@ -4523,9 +4512,6 @@ class SQLConf extends Serializable with Logging {

def defaultColumnAllowedProviders: String = getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)

def useNullsForMissingDefaultColumnValues: Boolean =
getConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES)

def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS)

def strictIndexOperator: Boolean = ansiEnabled && getConf(ANSI_STRICT_INDEX_OPERATOR)
Expand Down