diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 5dc7a89514974..3e2068fa75309 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -50,6 +50,8 @@ license: | - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, when inserting into a table, Spark will cast the data type of input query to the data type of target table by coercion. Since Spark 3.0, by default only upcasting is allowed when inserting data into table. E.g. `int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or `string` -> `int` are not allowed. The old behaviour is preserved under a newly added configuration `spark.sql.legacy.insertUnsafeCasts` with a default value of `false`. + - Refreshing a cached table would trigger a table uncache operation and then a table cache (lazily) operation. In Spark version 2.4 and earlier, the cache name and storage level are not preserved before the uncache operation. Therefore, the cache name and storage level could be changed unexpectedly. Since Spark 3.0, cache name and storage level will be first preserved for cache recreation. It helps to maintain a consistent cache behavior upon table refreshing. - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f24d6f168dacf..b333a6438c163 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2268,123 +2268,6 @@ class Analyzer( } } - /** - * Resolves columns of an output table from the data in a logical plan. This rule will: - * - * - Reorder columns when the write is by name - * - Insert safe casts when data types do not match - * - Insert aliases when column names do not match - * - Detect plans that are not compatible with the output table and throw AnalysisException - */ - object ResolveOutputRelation extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { - case append @ AppendData(table, query, isByName) - if table.resolved && query.resolved && !append.outputResolved => - val projection = resolveOutputColumns(table.name, table.output, query, isByName) - - if (projection != query) { - append.copy(query = projection) - } else { - append - } - - case overwrite @ OverwriteByExpression(table, _, query, isByName) - if table.resolved && query.resolved && !overwrite.outputResolved => - val projection = resolveOutputColumns(table.name, table.output, query, isByName) - - if (projection != query) { - overwrite.copy(query = projection) - } else { - overwrite - } - - case overwrite @ OverwritePartitionsDynamic(table, query, isByName) - if table.resolved && query.resolved && !overwrite.outputResolved => - val projection = resolveOutputColumns(table.name, table.output, query, isByName) - - if (projection != query) { - overwrite.copy(query = projection) - } else { - overwrite - } - } - - def resolveOutputColumns( - tableName: String, - expected: Seq[Attribute], - query: LogicalPlan, - byName: Boolean): LogicalPlan = { - - if (expected.size < query.output.size) { - throw new AnalysisException( - s"""Cannot write to '$tableName', too many data columns: - |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")} - |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""".stripMargin) - } - - val errors = new mutable.ArrayBuffer[String]() - val resolved: Seq[NamedExpression] = if (byName) { - expected.flatMap { tableAttr => - query.resolveQuoted(tableAttr.name, resolver) match { - case Some(queryExpr) => - checkField(tableAttr, queryExpr, byName, err => errors += err) - case None => - errors += s"Cannot find data for output column '${tableAttr.name}'" - None - } - } - - } else { - if (expected.size > query.output.size) { - throw new AnalysisException( - s"""Cannot write to '$tableName', not enough data columns: - |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")} - |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""" - .stripMargin) - } - - query.output.zip(expected).flatMap { - case (queryExpr, tableAttr) => - checkField(tableAttr, queryExpr, byName, err => errors += err) - } - } - - if (errors.nonEmpty) { - throw new AnalysisException( - s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}") - } - - Project(resolved, query) - } - - private def checkField( - tableAttr: Attribute, - queryExpr: NamedExpression, - byName: Boolean, - addError: String => Unit): Option[NamedExpression] = { - - // run the type check first to ensure type errors are present - val canWrite = DataType.canWrite( - queryExpr.dataType, tableAttr.dataType, byName, resolver, tableAttr.name, addError) - - if (queryExpr.nullable && !tableAttr.nullable) { - addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'") - None - - } else if (!canWrite) { - None - - } else { - // always add an UpCast. it will be removed in the optimizer if it is unnecessary. - Some(Alias( - UpCast(queryExpr, tableAttr.dataType), tableAttr.name - )( - explicitMetadata = Option(tableAttr.metadata) - )) - } - } - } - private def commonNaturalJoinProcessing( left: LogicalPlan, right: LogicalPlan, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala new file mode 100644 index 0000000000000..ebda40bb0729d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression, UpCast} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, NullType} + +/** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ +object ResolveOutputRelation extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.outputResolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + append.copy(query = projection) + } else { + append + } + + case overwrite @ OverwriteByExpression(table, _, query, isByName) + if table.resolved && query.resolved && !overwrite.outputResolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + overwrite.copy(query = projection) + } else { + overwrite + } + + case overwrite @ OverwritePartitionsDynamic(table, query, isByName) + if table.resolved && query.resolved && !overwrite.outputResolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + overwrite.copy(query = projection) + } else { + overwrite + } + } + + def resolveOutputColumns( + tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { + throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")} + |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""".stripMargin) + } + + val resolver = SQLConf.get.resolver + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { + expected.flatMap { tableAttr => + query.resolveQuoted(tableAttr.name, resolver) match { + case Some(queryExpr) => + checkField(tableAttr, queryExpr, byName, resolver, err => errors += err) + case None => + errors += s"Cannot find data for output column '${tableAttr.name}'" + None + } + } + + } else { + if (expected.size > query.output.size) { + throw new AnalysisException( + s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")} + |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""" + .stripMargin) + } + + query.output.zip(expected).flatMap { + case (queryExpr, tableAttr) => + checkField(tableAttr, queryExpr, byName, resolver, err => errors += err) + } + } + + if (errors.nonEmpty) { + throw new AnalysisException( + s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}") + } + + Project(resolved, query) + } + + def checkField( + tableAttr: Attribute, + queryExpr: NamedExpression, + byName: Boolean, + resolver: Resolver, + addError: String => Unit): Option[NamedExpression] = { + + // run the type check first to ensure type errors are present + lazy val canWrite = DataType.canWrite( + queryExpr.dataType, tableAttr.dataType, byName, resolver, tableAttr.name, addError) + + if (queryExpr.nullable && !tableAttr.nullable) { + addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'") + None + + } else if (queryExpr.dataType == NullType && tableAttr.nullable) { + Some(Alias(Cast(queryExpr, tableAttr.dataType, Option(SQLConf.get.sessionLocalTimeZone)), + tableAttr.name)(explicitMetadata = Option(tableAttr.metadata))) + + } else if (!canWrite) { + None + + } else { + // always add an UpCast. it will be removed in the optimizer if it is unnecessary. + Some(Alias( + UpCast(queryExpr, tableAttr.dataType), tableAttr.name + )( + explicitMetadata = Option(tableAttr.metadata) + )) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index f8c1102953ab3..a309eefc2701c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -128,8 +128,10 @@ object Cast { case _ if from == to => true case (from: NumericType, to: DecimalType) if to.isWiderThan(from) => true case (from: DecimalType, to: NumericType) if from.isTighterThan(to) => true - case (f, t) if legalNumericPrecedence(f, t) => true + case (f: NumericType, t: NumericType) if legalNumericPrecedence(f, t) => true + case (DateType, TimestampType) => true + case (NullType, _) => false case (_, StringType) => true // Spark supports casting between long and timestamp, please see `longToTimestamp` and @@ -153,7 +155,7 @@ object Cast { case _ => false } - private def legalNumericPrecedence(from: DataType, to: DataType): Boolean = { + private def legalNumericPrecedence(from: NumericType, to: NumericType): Boolean = { val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from) val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to) fromPrecedence >= 0 && fromPrecedence < toPrecedence diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71c8302077015..ef1b407df0d57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1777,6 +1777,14 @@ object SQLConf { .doc("When true, the upcast will be loose and allows string to atomic types.") .booleanConf .createWithDefault(false) + + val LEGACY_INSERT_UNSAFE_CASTS = buildConf("spark.sql.legacy.insertUnsafeCasts") + .doc("When true, Spark will cast the data type of input query of table insertion to " + + "the data type of target table by coercion; otherwise, only upcasting is allowed, e.g. " + + "`int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or " + + "`string` -> `int` are not allowed.") + .booleanConf + .createWithDefault(false) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index a35e971d08823..42d9fc48e66a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -441,20 +441,13 @@ object DataType { fieldCompatible - case (w: AtomicType, r: AtomicType) => - if (!Cast.canUpCast(w, r)) { - addError(s"Cannot safely cast '$context': $w to $r") + case _ => + if (!Cast.canUpCast(write, read)) { + addError(s"Cannot safely cast '$context': ${write.simpleString} to ${read.simpleString}") false } else { true } - - case (w, r) if w.sameType(r) && !w.isInstanceOf[NullType] => - true - - case (w, r) => - addError(s"Cannot write '$context': $w is incompatible with $r") - false } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala index 25eddaf06a780..83607a78036a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala @@ -77,6 +77,7 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType { (precision - scale) >= (dt.precision - dt.scale) && scale >= dt.scale case dt: IntegralType => isWiderThan(DecimalType.forType(dt)) + // For DoubleType/FloatType, the value can be NaN, PositiveInfinity or NegativeInfinity. case _ => false } @@ -87,7 +88,7 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType { private[sql] def isTighterThan(other: DataType): Boolean = other match { case dt: DecimalType => (precision - scale) <= (dt.precision - dt.scale) && scale <= dt.scale - case dt: IntegralType => + case dt: NumericType => isTighterThan(DecimalType.forType(dt)) case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index fab1b776a3c72..d13804ee9f6f7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -89,22 +89,24 @@ trait AnalysisTest extends PlanTest { inputPlan: LogicalPlan, expectedErrors: Seq[String], caseSensitive: Boolean = true): Unit = { - val analyzer = getAnalyzer(caseSensitive) - val e = intercept[AnalysisException] { - analyzer.checkAnalysis(analyzer.execute(inputPlan)) - } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { + val analyzer = getAnalyzer(caseSensitive) + val e = intercept[AnalysisException] { + analyzer.checkAnalysis(analyzer.execute(inputPlan)) + } - if (!expectedErrors.map(_.toLowerCase(Locale.ROOT)).forall( - e.getMessage.toLowerCase(Locale.ROOT).contains)) { - fail( - s"""Exception message should contain the following substrings: - | - | ${expectedErrors.mkString("\n ")} - | - |Actual exception message: - | - | ${e.getMessage} - """.stripMargin) + if (!expectedErrors.map(_.toLowerCase(Locale.ROOT)).forall( + e.getMessage.toLowerCase(Locale.ROOT).contains)) { + fail( + s"""Exception message should contain the following substrings: + | + | ${expectedErrors.mkString("\n ")} + | + |Actual exception message: + | + | ${e.getMessage} + """.stripMargin) + } } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 58923f26c1ead..3c45eafe289cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -250,7 +250,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "'y'", "double to float")) } test("byName: insert safe cast") { @@ -290,7 +290,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { StructField("y", DoubleType))).toAttributes) val query = TestRelation(StructType(Seq( - StructField("x", DoubleType), + StructField("x", FloatType), StructField("b", FloatType))).toAttributes) val parsedPlan = byName(xRequiredTable, query) @@ -298,7 +298,6 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", - "Cannot safely cast", "'x'", "DoubleType to FloatType", "Cannot write nullable values to non-null column", "'x'", "Cannot find data for output column", "'y'")) } @@ -396,7 +395,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "'y'", "double to float")) } test("byPosition: insert safe cast") { @@ -440,7 +439,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { StructField("y", DoubleType))).toAttributes) val query = TestRelation(StructType(Seq( - StructField("x", DoubleType), + StructField("x", FloatType), StructField("b", FloatType))).toAttributes) val parsedPlan = byPosition(xRequiredTable, query) @@ -448,8 +447,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", - "Cannot write nullable values to non-null column", "'x'", - "Cannot safely cast", "'x'", "DoubleType to FloatType")) + "Cannot write nullable values to non-null column", "'x'")) } test("bypass output column resolution") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala index da1b695919dec..d639e5cc6721f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala @@ -233,8 +233,8 @@ class EncoderResolutionSuite extends PlanTest { castFail[Long, Int] castFail[java.sql.Timestamp, java.sql.Date] + castFail[java.math.BigDecimal, Long] castFail[java.math.BigDecimal, Double] - castFail[Double, java.math.BigDecimal] castFail[java.math.BigDecimal, Int] castFail[String, Long] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala index 6b5fc5f0d4434..8e9f8ab4a2a96 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala @@ -53,7 +53,7 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { test("Check NullType is incompatible with all other types") { allNonNullTypes.foreach { t => assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => - assert(err.contains(s"incompatible with $t")) + assert(err.contains(s"Cannot safely cast 'nulls': null to ${t.simpleString}")) } } } @@ -75,8 +75,8 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { s"Should not allow writing $w to $r because cast is not safe") { err => assert(err.contains("'t'"), "Should include the field name context") assert(err.contains("Cannot safely cast"), "Should identify unsafe cast") - assert(err.contains(s"$w"), "Should include write type") - assert(err.contains(s"$r"), "Should include read type") + assert(err.contains(s"${w.simpleString}"), "Should include write type") + assert(err.contains(s"${r.simpleString}"), "Should include read type") } } } @@ -325,7 +325,7 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { assertNumErrors(writeType, readType, "top", "Should catch 14 errors", 14) { errs => assert(errs(0).contains("'top.a.element'"), "Should identify bad type") assert(errs(0).contains("Cannot safely cast")) - assert(errs(0).contains("StringType to DoubleType")) + assert(errs(0).contains("string to double")) assert(errs(1).contains("'top.a'"), "Should identify bad type") assert(errs(1).contains("Cannot write nullable elements to array of non-nulls")) @@ -338,15 +338,15 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { assert(errs(3).contains("Cannot write nullable elements to array of non-nulls")) assert(errs(4).contains("'top.bad_nested_type'"), "Should identify bad type") - assert(errs(4).contains("is incompatible with")) + assert(errs(4).contains("Cannot safely cast")) assert(errs(5).contains("'top.m.key'"), "Should identify bad type") assert(errs(5).contains("Cannot safely cast")) - assert(errs(5).contains("DoubleType to LongType")) + assert(errs(5).contains("double to bigint")) assert(errs(6).contains("'top.m.value'"), "Should identify bad type") assert(errs(6).contains("Cannot safely cast")) - assert(errs(6).contains("DoubleType to FloatType")) + assert(errs(6).contains("double to float")) assert(errs(7).contains("'top.m'"), "Should identify bad type") assert(errs(7).contains("Cannot write nullable values to map of non-nulls")) @@ -364,7 +364,7 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { assert(errs(11).contains("'top.x'"), "Should identify bad type") assert(errs(11).contains("Cannot safely cast")) - assert(errs(11).contains("LongType to IntegerType")) + assert(errs(11).contains("bigint to int")) assert(errs(12).contains("'top'"), "Should identify bad type") assert(errs(12).contains("expected 'x', found 'y'"), "Should detect name mismatch") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 534e2fd0757f9..b5f3bdd697846 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.mutable + import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ @@ -356,8 +358,28 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } - val newQuery = DDLPreprocessingUtils.castAndRenameQueryOutput( - insert.query, expectedColumns, conf) + val newQuery = if (conf.getConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS)) { + DDLPreprocessingUtils.castAndRenameQueryOutput(insert.query, expectedColumns, conf) + } else { + val errors = new mutable.ArrayBuffer[String]() + val resolved = insert.query.output.zip(expectedColumns).flatMap { + case (queryExpr, tableAttr) => + ResolveOutputRelation.checkField( + tableAttr, + queryExpr, + byName = false, + resolver = conf.resolver, + addError = err => errors += err) + } + if (errors.nonEmpty) { + throw new AnalysisException( + s"Cannot write incompatible data to table '$tblName':\n- ${errors.mkString("\n- ")}") + } else { + assert(resolved.length == insert.query.output.length) + Project(resolved, insert.query) + } + } + if (normalizedPartSpec.nonEmpty) { if (normalizedPartSpec.size != partColNames.length) { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index b2d065274b151..7d870ca49e4f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -175,6 +175,9 @@ abstract class BaseSessionStateBuilder( override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: + // In the rule `PreprocessTableInsertion`, the fields of input query might be converted as + // unresolved `UpCast`s. The following rule `ResolveUpCast` would resolve those `UpCast`s. + ResolveUpCast +: DataSourceAnalysis(conf) +: customPostHocResolutionRules diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index f12eeaa580642..eb229fb2d13c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -117,7 +117,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils { withTempPath { dir => spark.sql( s""" - |CREATE TABLE $tableName(a int, b int) + |CREATE TABLE $tableName(a long, b long) |USING $provider |PARTITIONED BY(a) |LOCATION '${dir.toURI}' diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 4f1ae069d4b89..592c609b016b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -63,7 +63,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { spark.read.json(ds).createOrReplaceTempView("jt") sql( s""" - |CREATE TEMPORARY VIEW jsonTable (a int, b string) + |CREATE TEMPORARY VIEW jsonTable (a long, b string) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( | path '${path.toURI.toString}' @@ -351,14 +351,14 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { test("SPARK-15824 - Execute an INSERT wrapped in a WITH statement immediately") { withTable("target", "target2") { - sql(s"CREATE TABLE target(a INT, b STRING) USING JSON") + sql(s"CREATE TABLE target(a LONG, b STRING) USING JSON") sql("WITH tbl AS (SELECT * FROM jt) INSERT OVERWRITE TABLE target SELECT a, b FROM tbl") checkAnswer( sql("SELECT a, b FROM target"), sql("SELECT a, b FROM jt") ) - sql(s"CREATE TABLE target2(a INT, b STRING) USING JSON") + sql(s"CREATE TABLE target2(a LONG, b STRING) USING JSON") val e = sql( """ |WITH tbl AS (SELECT * FROM jt) @@ -566,27 +566,51 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { - withTable("test_table") { - val schema = new StructType() - .add("i", LongType, false) - .add("s", StringType, false) - val newTable = CatalogTable( - identifier = TableIdentifier("test_table", None), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - properties = Map.empty), - schema = schema, - provider = Some(classOf[SimpleInsertSource].getName)) - - spark.sessionState.catalog.createTable(newTable, false) - - sql("INSERT INTO TABLE test_table SELECT 1, 'a'") - sql("INSERT INTO TABLE test_table SELECT 2, null") + // This test needs to write null value to a non-nullable column. + withSQLConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS.key -> "true") { + withTable("test_table") { + val schema = new StructType() + .add("i", LongType, false) + .add("s", StringType, false) + val newTable = CatalogTable( + identifier = TableIdentifier("test_table", None), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = Map.empty), + schema = schema, + provider = Some(classOf[SimpleInsertSource].getName)) + + spark.sessionState.catalog.createTable(newTable, false) + + sql("INSERT INTO TABLE test_table SELECT 1, 'a'") + sql("INSERT INTO TABLE test_table SELECT 2, null") + } + } + } + + test("disallow unsafe type casting during table inserting") { + withTable("t") { + sql("CREATE TABLE t(i INT, j STRING) USING json") + + // int can be casted to string safely + sql("INSERT INTO t VALUES (1, 1)") + checkAnswer(spark.table("t"), Row(1, "1")) + + // long can't be casted to int safely + val e = intercept[AnalysisException] { + sql("INSERT INTO t VALUES (2L, 'a')") + } + assert(e.message.contains("Cannot write incompatible data to table")) + + withSQLConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS.key -> "true") { + sql("INSERT INTO t VALUES (2L, 'a')") + checkAnswer(spark.table("t"), Row(1, "1") :: Row(2, "a") :: Nil) + } } } } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index cebaad5b4ad9b..8118b984e680b 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -59,6 +59,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Ensures that cross joins are enabled so that we can test them TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true) + // Force type case during table insertion + TestHive.setConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS, true) // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") @@ -74,6 +76,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) + TestHive.sparkSession.sessionState.conf.unsetConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS) // For debugging dump some statistics about how much time was spent in various optimizer rules logWarning(RuleExecutor.dumpTimeSpent()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 0e7df8e921978..e7f938028bbff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -81,6 +81,9 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session RelationConversions(conf, catalog) +: PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: + // In the rule `PreprocessTableInsertion`, the fields of input query might be converted as + // unresolved `UpCast`s. The following rule `ResolveUpCast` would resolve those `UpCast`s. + ResolveUpCast +: DataSourceAnalysis(conf) +: HiveAnalysis +: customPostHocResolutionRules diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index 80afc9d8f44bc..d63690e864d85 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -40,7 +40,7 @@ class PartitionProviderCompatibilitySuite .save(dir.getAbsolutePath) spark.sql(s""" - |create table $tableName (fieldOne long, partCol int) + |create table $tableName (fieldOne long, partCol long) |using ${spark.sessionState.conf.defaultDataSourceName} |options (path "${dir.toURI}") |partitioned by (partCol)""".stripMargin) @@ -357,7 +357,7 @@ class PartitionProviderCompatibilitySuite val c = Utils.createTempDir(namePrefix = "c") try { spark.sql(s""" - |create table test (id long, P1 int, P2 int) + |create table test (id long, P1 long, P2 long) |using ${spark.sessionState.conf.defaultDataSourceName} |options (path "${base.toURI}") |partitioned by (P1, P2)""".stripMargin) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 328457948cb41..166dd7d5d437b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -857,7 +857,7 @@ class VersionsSuite extends SparkFunSuite with Logging { """.stripMargin ) - val errorMsg = "data type mismatch: cannot cast decimal(2,1) to binary" + val errorMsg = "Cannot safely cast 'f0': decimal(2,1) to binary" if (isPartitioned) { val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index df2f693e7147a..5b2d5414126cb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -374,15 +374,16 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)") sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)") - val analyzedPlan = sql( + // The `Cast` operator will be eliminated by optimization rule `ConstantFolding`. + val optimizedPlan = sql( """ |INSERT OVERWRITE table test_partition PARTITION (b=1, c) |SELECT 'a', 'c' from ptest - """.stripMargin).queryExecution.analyzed + """.stripMargin).queryExecution.optimizedPlan - assertResult(false, "Incorrect cast detected\n" + analyzedPlan) { + assertResult(false, "Incorrect cast detected\n" + optimizedPlan) { var hasCast = false - analyzedPlan.collect { + optimizedPlan.collect { case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c } } hasCast @@ -984,16 +985,16 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd } test("SPARK-3810: PreprocessTableInsertion static partitioning support") { - val analyzedPlan = { + val optimizedPlan = { loadTestTable("srcpart") sql("DROP TABLE IF EXISTS withparts") sql("CREATE TABLE withparts LIKE srcpart") sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src") - .queryExecution.analyzed + .queryExecution.optimizedPlan } - assertResult(1, "Duplicated project detected\n" + analyzedPlan) { - analyzedPlan.collect { + assertResult(0, "Unnecessary project detected\n" + optimizedPlan) { + optimizedPlan.collect { case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size }.sum } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 25ff3544185af..5491fbff5c54e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -54,7 +54,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 $dataType) STORED AS $fileFormat") hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values(1)") checkAnswer(spark.table("hive_serde"), Row(1)) - spark.sql(s"INSERT INTO TABLE hive_serde values($value)") + spark.sql(s"INSERT INTO TABLE hive_serde VALUES(CAST($value AS $dataType))") checkAnswer(spark.table("hive_serde"), Seq(Row(1), Row(value))) } } @@ -63,9 +63,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS // TIMESTAMP withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 TIMESTAMP) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11 15:50:00')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('2019-04-11 15:50:00')") checkAnswer(spark.table("hive_serde"), Row(Timestamp.valueOf("2019-04-11 15:50:00"))) - spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12 15:50:00')") + spark.sql("INSERT INTO TABLE hive_serde VALUES(CAST('2019-04-12 15:50:00' AS TIMESTAMP))") checkAnswer( spark.table("hive_serde"), Seq(Row(Timestamp.valueOf("2019-04-11 15:50:00")), @@ -75,9 +75,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS // DATE withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 DATE) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('2019-04-11')") checkAnswer(spark.table("hive_serde"), Row(Date.valueOf("2019-04-11"))) - spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12')") + spark.sql("INSERT INTO TABLE hive_serde VALUES(CAST('2019-04-12' AS DATE))") checkAnswer( spark.table("hive_serde"), Seq(Row(Date.valueOf("2019-04-11")), Row(Date.valueOf("2019-04-12")))) @@ -87,9 +87,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS private def checkStringTypes(fileFormat: String, dataType: String, value: String): Unit = { withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 $dataType) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('s')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('s')") checkAnswer(spark.table("hive_serde"), Row("s")) - spark.sql(s"INSERT INTO TABLE hive_serde values('$value')") + spark.sql(s"INSERT INTO TABLE hive_serde VALUES('$value')") checkAnswer(spark.table("hive_serde"), Seq(Row("s"), Row(value))) } } @@ -97,9 +97,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS private def checkCharTypes(fileFormat: String): Unit = { withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 CHAR(10)) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('s')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('s')") checkAnswer(spark.table("hive_serde"), Row("s" + " " * 9)) - spark.sql(s"INSERT INTO TABLE hive_serde values('s3')") + spark.sql(s"INSERT INTO TABLE hive_serde VALUES('s3')") checkAnswer(spark.table("hive_serde"), Seq(Row("s" + " " * 9), Row("s3" + " " * 8))) } } @@ -108,18 +108,18 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS // BOOLEAN withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 BOOLEAN) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values(false)") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES(false)") checkAnswer(spark.table("hive_serde"), Row(false)) - spark.sql("INSERT INTO TABLE hive_serde values(true)") + spark.sql("INSERT INTO TABLE hive_serde VALUES(true)") checkAnswer(spark.table("hive_serde"), Seq(Row(false), Row(true))) } // BINARY withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 BINARY) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('1')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('1')") checkAnswer(spark.table("hive_serde"), Row("1".getBytes)) - spark.sql("INSERT INTO TABLE hive_serde values('2')") + spark.sql("INSERT INTO TABLE hive_serde VALUES(CAST('2' AS BINARY))") checkAnswer(spark.table("hive_serde"), Seq(Row("1".getBytes), Row("2".getBytes))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 56c16c8d91b96..72c5f3f21fa2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1071,7 +1071,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("set hive.exec.dynamic.partition.mode=nonstrict") // date sql("drop table if exists dynparttest1") - sql("create table dynparttest1 (value int) partitioned by (pdate date)") + sql("create table dynparttest1 (value long) partitioned by (pdate date)") sql( """ |insert into table dynparttest1 partition(pdate) @@ -1083,7 +1083,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // decimal sql("drop table if exists dynparttest2") - sql("create table dynparttest2 (value int) partitioned by (pdec decimal(5, 1))") + sql("create table dynparttest2 (value long) partitioned by (pdec decimal(5, 1))") sql( """ |insert into table dynparttest2 partition(pdec) @@ -1580,8 +1580,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { .select(array($"id", $"id" + 1).as("arr"), $"id") .createOrReplaceTempView("source") withTable("dest1", "dest2") { - sql("CREATE TABLE dest1 (i INT)") - sql("CREATE TABLE dest2 (i INT)") + sql("CREATE TABLE dest1 (i LONG)") + sql("CREATE TABLE dest2 (i LONG)") sql( """ |FROM source @@ -2339,7 +2339,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { withTable("t") { - sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") + sql("CREATE TABLE t (col1 LONG, p1 LONG) USING PARQUET PARTITIONED BY (p1)") sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") if (enableOptimizeMetadataOnlyQuery) { // The result is wrong if we enable the configuration. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 94f35b0b3d523..9e4363661207d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -213,7 +213,8 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "false") { withTable("spark_23340") { sql("CREATE TABLE spark_23340(a array, b array) STORED AS ORC") - sql("INSERT INTO spark_23340 VALUES (array(), array())") + sql("INSERT INTO spark_23340 VALUES " + + "(CAST(array() AS array), CAST(array() AS array))") checkAnswer(spark.table("spark_23340"), Seq(Row(Array.empty[Float], Array.empty[Double]))) } }