Skip to content
Closed
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ license: |

- In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully.

- In Spark version 2.4 and earlier, when inserting into a table, Spark will cast the data type of input query to the data type of target table by coercion. Since Spark 3.0, by default only upcasting is allowed when inserting data into table. E.g. `int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or `string` -> `int` are not allowed. The old behaviour is preserved under a newly added configuration `spark.sql.legacy.insertUnsafeCasts` with a default value of `false`.

- Refreshing a cached table would trigger a table uncache operation and then a table cache (lazily) operation. In Spark version 2.4 and earlier, the cache name and storage level are not preserved before the uncache operation. Therefore, the cache name and storage level could be changed unexpectedly. Since Spark 3.0, cache name and storage level will be first preserved for cache recreation. It helps to maintain a consistent cache behavior upon table refreshing.

- Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2268,123 +2268,6 @@ class Analyzer(
}
}

/**
* Resolves columns of an output table from the data in a logical plan. This rule will:
*
* - Reorder columns when the write is by name
* - Insert safe casts when data types do not match
* - Insert aliases when column names do not match
* - Detect plans that are not compatible with the output table and throw AnalysisException
*/
object ResolveOutputRelation extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case append @ AppendData(table, query, isByName)
if table.resolved && query.resolved && !append.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
append.copy(query = projection)
} else {
append
}

case overwrite @ OverwriteByExpression(table, _, query, isByName)
if table.resolved && query.resolved && !overwrite.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
overwrite.copy(query = projection)
} else {
overwrite
}

case overwrite @ OverwritePartitionsDynamic(table, query, isByName)
if table.resolved && query.resolved && !overwrite.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
overwrite.copy(query = projection)
} else {
overwrite
}
}

def resolveOutputColumns(
tableName: String,
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean): LogicalPlan = {

if (expected.size < query.output.size) {
throw new AnalysisException(
s"""Cannot write to '$tableName', too many data columns:
|Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")}
|Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""".stripMargin)
}

val errors = new mutable.ArrayBuffer[String]()
val resolved: Seq[NamedExpression] = if (byName) {
expected.flatMap { tableAttr =>
query.resolveQuoted(tableAttr.name, resolver) match {
case Some(queryExpr) =>
checkField(tableAttr, queryExpr, byName, err => errors += err)
case None =>
errors += s"Cannot find data for output column '${tableAttr.name}'"
None
}
}

} else {
if (expected.size > query.output.size) {
throw new AnalysisException(
s"""Cannot write to '$tableName', not enough data columns:
|Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")}
|Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}"""
.stripMargin)
}

query.output.zip(expected).flatMap {
case (queryExpr, tableAttr) =>
checkField(tableAttr, queryExpr, byName, err => errors += err)
}
}

if (errors.nonEmpty) {
throw new AnalysisException(
s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}")
}

Project(resolved, query)
}

private def checkField(
tableAttr: Attribute,
queryExpr: NamedExpression,
byName: Boolean,
addError: String => Unit): Option[NamedExpression] = {

// run the type check first to ensure type errors are present
val canWrite = DataType.canWrite(
queryExpr.dataType, tableAttr.dataType, byName, resolver, tableAttr.name, addError)

if (queryExpr.nullable && !tableAttr.nullable) {
addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'")
None

} else if (!canWrite) {
None

} else {
// always add an UpCast. it will be removed in the optimizer if it is unnecessary.
Some(Alias(
UpCast(queryExpr, tableAttr.dataType), tableAttr.name
)(
explicitMetadata = Option(tableAttr.metadata)
))
}
}
}

private def commonNaturalJoinProcessing(
left: LogicalPlan,
right: LogicalPlan,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression, UpCast}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, NullType}

/**
* Resolves columns of an output table from the data in a logical plan. This rule will:
*
* - Reorder columns when the write is by name
* - Insert safe casts when data types do not match
* - Insert aliases when column names do not match
* - Detect plans that are not compatible with the output table and throw AnalysisException
*/
object ResolveOutputRelation extends Rule[LogicalPlan] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this moved?

It is difficult to see whether anything changed in this class. If the move was not required, please move it back.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it outside of the Analyzer class, so that we can call its methods.

As per @cloud-fan commented in https://github.com/apache/spark/pull/24721/files#r287800626

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were there any modifications other than moving this?

I think that the right way to expose those functions is to move them to a utility class, not to expose this rule itself.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with moving them to a utility class, but it's better to put analyzer/optimizer rules in its own file, instead of in the Analyzer object (can be done in another PR if we decide to create the util class in this PR)

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case append @ AppendData(table, query, isByName)
if table.resolved && query.resolved && !append.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
append.copy(query = projection)
} else {
append
}

case overwrite @ OverwriteByExpression(table, _, query, isByName)
if table.resolved && query.resolved && !overwrite.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
overwrite.copy(query = projection)
} else {
overwrite
}

case overwrite @ OverwritePartitionsDynamic(table, query, isByName)
if table.resolved && query.resolved && !overwrite.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
overwrite.copy(query = projection)
} else {
overwrite
}
}

def resolveOutputColumns(
tableName: String,
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean): LogicalPlan = {

if (expected.size < query.output.size) {
throw new AnalysisException(
s"""Cannot write to '$tableName', too many data columns:
|Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")}
|Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""".stripMargin)
}

val resolver = SQLConf.get.resolver
val errors = new mutable.ArrayBuffer[String]()
val resolved: Seq[NamedExpression] = if (byName) {
expected.flatMap { tableAttr =>
query.resolveQuoted(tableAttr.name, resolver) match {
case Some(queryExpr) =>
checkField(tableAttr, queryExpr, byName, resolver, err => errors += err)
case None =>
errors += s"Cannot find data for output column '${tableAttr.name}'"
None
}
}

} else {
if (expected.size > query.output.size) {
throw new AnalysisException(
s"""Cannot write to '$tableName', not enough data columns:
|Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")}
|Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}"""
.stripMargin)
}

query.output.zip(expected).flatMap {
case (queryExpr, tableAttr) =>
checkField(tableAttr, queryExpr, byName, resolver, err => errors += err)
}
}

if (errors.nonEmpty) {
throw new AnalysisException(
s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}")
}

Project(resolved, query)
}

def checkField(
tableAttr: Attribute,
queryExpr: NamedExpression,
byName: Boolean,
resolver: Resolver,
addError: String => Unit): Option[NamedExpression] = {

// run the type check first to ensure type errors are present
lazy val canWrite = DataType.canWrite(
queryExpr.dataType, tableAttr.dataType, byName, resolver, tableAttr.name, addError)

if (queryExpr.nullable && !tableAttr.nullable) {
addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'")
None

} else if (queryExpr.dataType == NullType && tableAttr.nullable) {
Some(Alias(Cast(queryExpr, tableAttr.dataType, Option(SQLConf.get.sessionLocalTimeZone)),
tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))

} else if (!canWrite) {
None

} else {
// always add an UpCast. it will be removed in the optimizer if it is unnecessary.
Some(Alias(
UpCast(queryExpr, tableAttr.dataType), tableAttr.name
)(
explicitMetadata = Option(tableAttr.metadata)
))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ object Cast {
case _ if from == to => true
case (from: NumericType, to: DecimalType) if to.isWiderThan(from) => true
case (from: DecimalType, to: NumericType) if from.isTighterThan(to) => true
case (f, t) if legalNumericPrecedence(f, t) => true
case (f: NumericType, t: NumericType) if legalNumericPrecedence(f, t) => true

case (DateType, TimestampType) => true
case (NullType, _) => false
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan Jun 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we upcast null to other nullable types? I think it's pretty common to write INSERT INTO tbl VALUES (1, null)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can't know the nullability of the to type. We should consider it is not nullable.
For the case you mentioned, it is handled in https://github.com/apache/spark/pull/24806/files#diff-86e655772e8f7cab055d2c2451b52275R134.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @cloud-fan that this should be allowed. Nullability is an additional check, but the types are compatible.

case (_, StringType) => true

// Spark supports casting between long and timestamp, please see `longToTimestamp` and
Expand All @@ -153,7 +155,7 @@ object Cast {
case _ => false
}

private def legalNumericPrecedence(from: DataType, to: DataType): Boolean = {
private def legalNumericPrecedence(from: NumericType, to: NumericType): Boolean = {
val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
fromPrecedence >= 0 && fromPrecedence < toPrecedence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,14 @@ object SQLConf {
.doc("When true, the upcast will be loose and allows string to atomic types.")
.booleanConf
.createWithDefault(false)

val LEGACY_INSERT_UNSAFE_CASTS = buildConf("spark.sql.legacy.insertUnsafeCasts")
.doc("When true, Spark will cast the data type of input query of table insertion to " +
"the data type of target table by coercion; otherwise, only upcasting is allowed, e.g. " +
"`int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or " +
"`string` -> `int` are not allowed.")
.booleanConf
.createWithDefault(false)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,20 +441,13 @@ object DataType {

fieldCompatible

case (w: AtomicType, r: AtomicType) =>
if (!Cast.canUpCast(w, r)) {
addError(s"Cannot safely cast '$context': $w to $r")
case _ =>
if (!Cast.canUpCast(write, read)) {
addError(s"Cannot safely cast '$context': ${write.simpleString} to ${read.simpleString}")
false
} else {
true
}

case (w, r) if w.sameType(r) && !w.isInstanceOf[NullType] =>
true

case (w, r) =>
addError(s"Cannot write '$context': $w is incompatible with $r")
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType {
(precision - scale) >= (dt.precision - dt.scale) && scale >= dt.scale
case dt: IntegralType =>
isWiderThan(DecimalType.forType(dt))
// For DoubleType/FloatType, the value can be NaN, PositiveInfinity or NegativeInfinity.
case _ => false
}

Expand All @@ -87,7 +88,7 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType {
private[sql] def isTighterThan(other: DataType): Boolean = other match {
case dt: DecimalType =>
(precision - scale) <= (dt.precision - dt.scale) && scale <= dt.scale
case dt: IntegralType =>
case dt: NumericType =>
isTighterThan(DecimalType.forType(dt))
case _ => false
}
Expand Down
Loading