From 56512248f373631e83aa00323b0f38dd55559a0e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 27 May 2019 21:49:50 +0800 Subject: [PATCH 01/16] do not forcibly add cast when inserting table --- .../sql/catalyst/analysis/Analyzer.scala | 117 -------------- .../analysis/ResolveOutputRelation.scala | 146 ++++++++++++++++++ .../spark/sql/catalyst/expressions/Cast.scala | 1 + .../apache/spark/sql/internal/SQLConf.scala | 6 + .../sql/execution/datasources/rules.scala | 30 +++- .../internal/BaseSessionStateBuilder.scala | 1 + .../spark/sql/sources/InsertSuite.scala | 72 ++++++--- .../sql/hive/HiveSessionStateBuilder.scala | 1 + 8 files changed, 231 insertions(+), 143 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f24d6f168dacf..b333a6438c163 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2268,123 +2268,6 @@ class Analyzer( } } - /** - * Resolves columns of an output table from the data in a logical plan. This rule will: - * - * - Reorder columns when the write is by name - * - Insert safe casts when data types do not match - * - Insert aliases when column names do not match - * - Detect plans that are not compatible with the output table and throw AnalysisException - */ - object ResolveOutputRelation extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { - case append @ AppendData(table, query, isByName) - if table.resolved && query.resolved && !append.outputResolved => - val projection = resolveOutputColumns(table.name, table.output, query, isByName) - - if (projection != query) { - append.copy(query = projection) - } else { - append - } - - case overwrite @ OverwriteByExpression(table, _, query, isByName) - if table.resolved && query.resolved && !overwrite.outputResolved => - val projection = resolveOutputColumns(table.name, table.output, query, isByName) - - if (projection != query) { - overwrite.copy(query = projection) - } else { - overwrite - } - - case overwrite @ OverwritePartitionsDynamic(table, query, isByName) - if table.resolved && query.resolved && !overwrite.outputResolved => - val projection = resolveOutputColumns(table.name, table.output, query, isByName) - - if (projection != query) { - overwrite.copy(query = projection) - } else { - overwrite - } - } - - def resolveOutputColumns( - tableName: String, - expected: Seq[Attribute], - query: LogicalPlan, - byName: Boolean): LogicalPlan = { - - if (expected.size < query.output.size) { - throw new AnalysisException( - s"""Cannot write to '$tableName', too many data columns: - |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")} - |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""".stripMargin) - } - - val errors = new mutable.ArrayBuffer[String]() - val resolved: Seq[NamedExpression] = if (byName) { - expected.flatMap { tableAttr => - query.resolveQuoted(tableAttr.name, resolver) match { - case Some(queryExpr) => - checkField(tableAttr, queryExpr, byName, err => errors += err) - case None => - errors += s"Cannot find data for output column '${tableAttr.name}'" - None - } - } - - } else { - if (expected.size > query.output.size) { - throw new AnalysisException( - s"""Cannot write to '$tableName', not enough data columns: - |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")} - |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""" - .stripMargin) - } - - query.output.zip(expected).flatMap { - case (queryExpr, tableAttr) => - checkField(tableAttr, queryExpr, byName, err => errors += err) - } - } - - if (errors.nonEmpty) { - throw new AnalysisException( - s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}") - } - - Project(resolved, query) - } - - private def checkField( - tableAttr: Attribute, - queryExpr: NamedExpression, - byName: Boolean, - addError: String => Unit): Option[NamedExpression] = { - - // run the type check first to ensure type errors are present - val canWrite = DataType.canWrite( - queryExpr.dataType, tableAttr.dataType, byName, resolver, tableAttr.name, addError) - - if (queryExpr.nullable && !tableAttr.nullable) { - addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'") - None - - } else if (!canWrite) { - None - - } else { - // always add an UpCast. it will be removed in the optimizer if it is unnecessary. - Some(Alias( - UpCast(queryExpr, tableAttr.dataType), tableAttr.name - )( - explicitMetadata = Option(tableAttr.metadata) - )) - } - } - } - private def commonNaturalJoinProcessing( left: LogicalPlan, right: LogicalPlan, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala new file mode 100644 index 0000000000000..e7d756b95bc2b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression, UpCast} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +/** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ +object ResolveOutputRelation extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.outputResolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + append.copy(query = projection) + } else { + append + } + + case overwrite @ OverwriteByExpression(table, _, query, isByName) + if table.resolved && query.resolved && !overwrite.outputResolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + overwrite.copy(query = projection) + } else { + overwrite + } + + case overwrite @ OverwritePartitionsDynamic(table, query, isByName) + if table.resolved && query.resolved && !overwrite.outputResolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + overwrite.copy(query = projection) + } else { + overwrite + } + } + + def resolveOutputColumns( + tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { + throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")} + |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""".stripMargin) + } + + val resolver = SQLConf.get.resolver + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { + expected.flatMap { tableAttr => + query.resolveQuoted(tableAttr.name, resolver) match { + case Some(queryExpr) => + checkField(tableAttr, queryExpr, byName, resolver, err => errors += err) + case None => + errors += s"Cannot find data for output column '${tableAttr.name}'" + None + } + } + + } else { + if (expected.size > query.output.size) { + throw new AnalysisException( + s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")} + |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}""" + .stripMargin) + } + + query.output.zip(expected).flatMap { + case (queryExpr, tableAttr) => + checkField(tableAttr, queryExpr, byName, resolver, err => errors += err) + } + } + + if (errors.nonEmpty) { + throw new AnalysisException( + s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}") + } + + Project(resolved, query) + } + + def checkField( + tableAttr: Attribute, + queryExpr: NamedExpression, + byName: Boolean, + resolver: Resolver, + addError: String => Unit): Option[NamedExpression] = { + + // run the type check first to ensure type errors are present + val canWrite = DataType.canWrite( + queryExpr.dataType, tableAttr.dataType, byName, resolver, tableAttr.name, addError) + + if (queryExpr.nullable && !tableAttr.nullable) { + addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'") + None + + } else if (!canWrite) { + None + + } else { + // always add an UpCast. it will be removed in the optimizer if it is unnecessary. + Some(Alias( + UpCast(queryExpr, tableAttr.dataType), tableAttr.name + )( + explicitMetadata = Option(tableAttr.metadata) + )) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index f8c1102953ab3..18700150038ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -131,6 +131,7 @@ object Cast { case (f, t) if legalNumericPrecedence(f, t) => true case (DateType, TimestampType) => true case (_, StringType) => true + case (NullType, _) => true // Spark supports casting between long and timestamp, please see `longToTimestamp` and // `timestampToLong` for details. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71c8302077015..dd0c8bdee1807 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1777,6 +1777,12 @@ object SQLConf { .doc("When true, the upcast will be loose and allows string to atomic types.") .booleanConf .createWithDefault(false) + + val LEGACY_INSERT_TABLE_FORCIBLE_CAST = buildConf("spark.sql.legacy.insertTable.forcibleCast") + .doc("When inserting data to a table, Spark will cast the data type of input query to " + + "the data type of target table forcibly if this config is true.") + .booleanConf + .createWithDefault(false) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 534e2fd0757f9..61aeef35fcec9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.mutable + import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ @@ -356,8 +358,32 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } - val newQuery = DDLPreprocessingUtils.castAndRenameQueryOutput( - insert.query, expectedColumns, conf) + val newQuery = if (conf.getConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST)) { + DDLPreprocessingUtils.castAndRenameQueryOutput(insert.query, expectedColumns, conf) + } else { + val errors = new mutable.ArrayBuffer[String]() + val resolved = insert.query.output.zip(expectedColumns).flatMap { + case (queryExpr, tableAttr) => + ResolveOutputRelation.checkField( + tableAttr, + queryExpr, + byName = false, + resolver = conf.resolver, + addError = err => errors += err) + } + if (errors.nonEmpty) { + throw new AnalysisException( + s"Cannot write incompatible data to table '$tblName':\n- ${errors.mkString("\n- ")}") + } else { + assert(resolved.length == insert.query.output.length) + if (resolved == insert.query.output) { + insert.query + } else { + Project(resolved, insert.query) + } + } + } + if (normalizedPartSpec.nonEmpty) { if (normalizedPartSpec.size != partColNames.length) { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index b2d065274b151..7ef03449c802c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -175,6 +175,7 @@ abstract class BaseSessionStateBuilder( override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: + ResolveUpCast +: DataSourceAnalysis(conf) +: customPostHocResolutionRules diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 4f1ae069d4b89..3e56ef90efb4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -63,7 +63,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { spark.read.json(ds).createOrReplaceTempView("jt") sql( s""" - |CREATE TEMPORARY VIEW jsonTable (a int, b string) + |CREATE TEMPORARY VIEW jsonTable (a long, b string) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( | path '${path.toURI.toString}' @@ -351,14 +351,14 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { test("SPARK-15824 - Execute an INSERT wrapped in a WITH statement immediately") { withTable("target", "target2") { - sql(s"CREATE TABLE target(a INT, b STRING) USING JSON") + sql(s"CREATE TABLE target(a LONG, b STRING) USING JSON") sql("WITH tbl AS (SELECT * FROM jt) INSERT OVERWRITE TABLE target SELECT a, b FROM tbl") checkAnswer( sql("SELECT a, b FROM target"), sql("SELECT a, b FROM jt") ) - sql(s"CREATE TABLE target2(a INT, b STRING) USING JSON") + sql(s"CREATE TABLE target2(a LONG, b STRING) USING JSON") val e = sql( """ |WITH tbl AS (SELECT * FROM jt) @@ -566,27 +566,51 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { - withTable("test_table") { - val schema = new StructType() - .add("i", LongType, false) - .add("s", StringType, false) - val newTable = CatalogTable( - identifier = TableIdentifier("test_table", None), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - properties = Map.empty), - schema = schema, - provider = Some(classOf[SimpleInsertSource].getName)) - - spark.sessionState.catalog.createTable(newTable, false) - - sql("INSERT INTO TABLE test_table SELECT 1, 'a'") - sql("INSERT INTO TABLE test_table SELECT 2, null") + // This test needs to write null value to a non-nullable column. + withSQLConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST.key -> "true") { + withTable("test_table") { + val schema = new StructType() + .add("i", LongType, false) + .add("s", StringType, false) + val newTable = CatalogTable( + identifier = TableIdentifier("test_table", None), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = Map.empty), + schema = schema, + provider = Some(classOf[SimpleInsertSource].getName)) + + spark.sessionState.catalog.createTable(newTable, false) + + sql("INSERT INTO TABLE test_table SELECT 1, 'a'") + sql("INSERT INTO TABLE test_table SELECT 2, null") + } + } + } + + test("disallow unsafe type casting during table inserting") { + withTable("t") { + sql("CREATE TABLE t(i INT, j STRING) USING json") + + // int can be casted to string safely + sql("INSERT INTO t VALUES (1, 1)") + checkAnswer(spark.table("t"), Row(1, "1")) + + // long can't be casted to int safely + val e = intercept[AnalysisException] { + sql("INSERT INTO t VALUES (2L, 'a')") + } + assert(e.message.contains("Cannot write incompatible data to table")) + + withSQLConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST.key -> "true") { + sql("INSERT INTO t VALUES (2L, 'a')") + checkAnswer(spark.table("t"), Row(1, "1") :: Row(2, "a") :: Nil) + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 0e7df8e921978..4d7638e93eec3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -81,6 +81,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session RelationConversions(conf, catalog) +: PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: + ResolveUpCast +: DataSourceAnalysis(conf) +: HiveAnalysis +: customPostHocResolutionRules From 425866578f8f18c861e64666e2b454376b6594fc Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 29 May 2019 11:47:43 +0800 Subject: [PATCH 02/16] fix tests --- .../spark/sql/catalyst/expressions/Cast.scala | 9 ++++++--- .../scala/org/apache/spark/sql/types/DataType.scala | 13 +++---------- .../org/apache/spark/sql/types/DecimalType.scala | 4 ++-- .../hive/PartitionProviderCompatibilitySuite.scala | 4 ++-- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 18700150038ec..907eedd0ee584 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -126,12 +126,15 @@ object Cast { */ def canUpCast(from: DataType, to: DataType): Boolean = (from, to) match { case _ if from == to => true + case (NullType, _) => true + case (from: NumericType, to: DecimalType) if to.isWiderThan(from) => true case (from: DecimalType, to: NumericType) if from.isTighterThan(to) => true - case (f, t) if legalNumericPrecedence(f, t) => true + case (f: NumericType, t: NumericType) if legalNumericPrecedence(f, t) => true + case (DateType, TimestampType) => true + case (_, StringType) => true - case (NullType, _) => true // Spark supports casting between long and timestamp, please see `longToTimestamp` and // `timestampToLong` for details. @@ -154,7 +157,7 @@ object Cast { case _ => false } - private def legalNumericPrecedence(from: DataType, to: DataType): Boolean = { + private def legalNumericPrecedence(from: NumericType, to: NumericType): Boolean = { val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from) val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to) fromPrecedence >= 0 && fromPrecedence < toPrecedence diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index a35e971d08823..87646866bef95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -441,20 +441,13 @@ object DataType { fieldCompatible - case (w: AtomicType, r: AtomicType) => - if (!Cast.canUpCast(w, r)) { - addError(s"Cannot safely cast '$context': $w to $r") + case _ => + if (!Cast.canUpCast(write, read)) { + addError(s"Cannot safely cast '$context': $write to $read") false } else { true } - - case (w, r) if w.sameType(r) && !w.isInstanceOf[NullType] => - true - - case (w, r) => - addError(s"Cannot write '$context': $w is incompatible with $r") - false } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala index 25eddaf06a780..119536706bd00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala @@ -75,7 +75,7 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType { private[sql] def isWiderThan(other: DataType): Boolean = other match { case dt: DecimalType => (precision - scale) >= (dt.precision - dt.scale) && scale >= dt.scale - case dt: IntegralType => + case dt: NumericType => isWiderThan(DecimalType.forType(dt)) case _ => false } @@ -87,7 +87,7 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType { private[sql] def isTighterThan(other: DataType): Boolean = other match { case dt: DecimalType => (precision - scale) <= (dt.precision - dt.scale) && scale <= dt.scale - case dt: IntegralType => + case dt: NumericType => isTighterThan(DecimalType.forType(dt)) case _ => false } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index 80afc9d8f44bc..d63690e864d85 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -40,7 +40,7 @@ class PartitionProviderCompatibilitySuite .save(dir.getAbsolutePath) spark.sql(s""" - |create table $tableName (fieldOne long, partCol int) + |create table $tableName (fieldOne long, partCol long) |using ${spark.sessionState.conf.defaultDataSourceName} |options (path "${dir.toURI}") |partitioned by (partCol)""".stripMargin) @@ -357,7 +357,7 @@ class PartitionProviderCompatibilitySuite val c = Utils.createTempDir(namePrefix = "c") try { spark.sql(s""" - |create table test (id long, P1 int, P2 int) + |create table test (id long, P1 long, P2 long) |using ${spark.sessionState.conf.defaultDataSourceName} |options (path "${base.toURI}") |partitioned by (P1, P2)""".stripMargin) From 62d9e701798b16c970e9e42999c00f81a0edc389 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 29 May 2019 15:10:06 +0800 Subject: [PATCH 03/16] fix tests --- .../org/apache/spark/sql/types/DataType.scala | 2 +- .../sql/execution/datasources/rules.scala | 6 +--- .../metric/SQLMetricsTestUtils.scala | 2 +- .../execution/HiveCompatibilitySuite.scala | 3 ++ .../spark/sql/hive/client/VersionsSuite.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 16 +++++------ .../execution/HiveSerDeReadWriteSuite.scala | 28 +++++++++---------- .../sql/hive/execution/SQLQuerySuite.scala | 10 +++---- .../sql/hive/orc/HiveOrcQuerySuite.scala | 3 +- 9 files changed, 36 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 87646866bef95..42d9fc48e66a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -443,7 +443,7 @@ object DataType { case _ => if (!Cast.canUpCast(write, read)) { - addError(s"Cannot safely cast '$context': $write to $read") + addError(s"Cannot safely cast '$context': ${write.simpleString} to ${read.simpleString}") false } else { true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 61aeef35fcec9..18c436117e77e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -376,11 +376,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"Cannot write incompatible data to table '$tblName':\n- ${errors.mkString("\n- ")}") } else { assert(resolved.length == insert.query.output.length) - if (resolved == insert.query.output) { - insert.query - } else { - Project(resolved, insert.query) - } + Project(resolved, insert.query) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index f12eeaa580642..eb229fb2d13c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -117,7 +117,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils { withTempPath { dir => spark.sql( s""" - |CREATE TABLE $tableName(a int, b int) + |CREATE TABLE $tableName(a long, b long) |USING $provider |PARTITIONED BY(a) |LOCATION '${dir.toURI}' diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index cebaad5b4ad9b..0f394fde8bdf2 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -59,6 +59,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Ensures that cross joins are enabled so that we can test them TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true) + // Force type case during table insertion + TestHive.setConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST, true) // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") @@ -74,6 +76,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) + TestHive.sparkSession.sessionState.conf.unsetConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST) // For debugging dump some statistics about how much time was spent in various optimizer rules logWarning(RuleExecutor.dumpTimeSpent()) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 328457948cb41..166dd7d5d437b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -857,7 +857,7 @@ class VersionsSuite extends SparkFunSuite with Logging { """.stripMargin ) - val errorMsg = "data type mismatch: cannot cast decimal(2,1) to binary" + val errorMsg = "Cannot safely cast 'f0': decimal(2,1) to binary" if (isPartitioned) { val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index df2f693e7147a..679ed02ba948a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -374,15 +374,15 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)") sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)") - val analyzedPlan = sql( + val optimizedPlan = sql( """ |INSERT OVERWRITE table test_partition PARTITION (b=1, c) |SELECT 'a', 'c' from ptest - """.stripMargin).queryExecution.analyzed + """.stripMargin).queryExecution.optimizedPlan - assertResult(false, "Incorrect cast detected\n" + analyzedPlan) { + assertResult(false, "Incorrect cast detected\n" + optimizedPlan) { var hasCast = false - analyzedPlan.collect { + optimizedPlan.collect { case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c } } hasCast @@ -984,16 +984,16 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd } test("SPARK-3810: PreprocessTableInsertion static partitioning support") { - val analyzedPlan = { + val optimizedPlan = { loadTestTable("srcpart") sql("DROP TABLE IF EXISTS withparts") sql("CREATE TABLE withparts LIKE srcpart") sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src") - .queryExecution.analyzed + .queryExecution.optimizedPlan } - assertResult(1, "Duplicated project detected\n" + analyzedPlan) { - analyzedPlan.collect { + assertResult(0, "Unnecessary project detected\n" + optimizedPlan) { + optimizedPlan.collect { case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size }.sum } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 25ff3544185af..444b79e6bf17e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -52,9 +52,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS private def checkNumericTypes(fileFormat: String, dataType: String, value: Any): Unit = { withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 $dataType) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values(1)") + hiveClient.runSqlHive(s"INSERT INTO TABLE hive_serde VALUES(1)") checkAnswer(spark.table("hive_serde"), Row(1)) - spark.sql(s"INSERT INTO TABLE hive_serde values($value)") + spark.sql(s"INSERT INTO TABLE hive_serde VALUES(CAST($value AS $dataType))") checkAnswer(spark.table("hive_serde"), Seq(Row(1), Row(value))) } } @@ -63,9 +63,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS // TIMESTAMP withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 TIMESTAMP) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11 15:50:00')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('2019-04-11 15:50:00')") checkAnswer(spark.table("hive_serde"), Row(Timestamp.valueOf("2019-04-11 15:50:00"))) - spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12 15:50:00')") + spark.sql("INSERT INTO TABLE hive_serde VALUES(CAST('2019-04-12 15:50:00' AS TIMESTAMP))") checkAnswer( spark.table("hive_serde"), Seq(Row(Timestamp.valueOf("2019-04-11 15:50:00")), @@ -75,9 +75,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS // DATE withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 DATE) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('2019-04-11')") checkAnswer(spark.table("hive_serde"), Row(Date.valueOf("2019-04-11"))) - spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12')") + spark.sql("INSERT INTO TABLE hive_serde VALUES(CAST('2019-04-12' AS DATE))") checkAnswer( spark.table("hive_serde"), Seq(Row(Date.valueOf("2019-04-11")), Row(Date.valueOf("2019-04-12")))) @@ -87,9 +87,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS private def checkStringTypes(fileFormat: String, dataType: String, value: String): Unit = { withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 $dataType) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('s')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('s')") checkAnswer(spark.table("hive_serde"), Row("s")) - spark.sql(s"INSERT INTO TABLE hive_serde values('$value')") + spark.sql(s"INSERT INTO TABLE hive_serde VALUES('$value')") checkAnswer(spark.table("hive_serde"), Seq(Row("s"), Row(value))) } } @@ -97,9 +97,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS private def checkCharTypes(fileFormat: String): Unit = { withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 CHAR(10)) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('s')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('s')") checkAnswer(spark.table("hive_serde"), Row("s" + " " * 9)) - spark.sql(s"INSERT INTO TABLE hive_serde values('s3')") + spark.sql(s"INSERT INTO TABLE hive_serde VALUES('s3')") checkAnswer(spark.table("hive_serde"), Seq(Row("s" + " " * 9), Row("s3" + " " * 8))) } } @@ -108,18 +108,18 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS // BOOLEAN withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 BOOLEAN) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values(false)") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES(false)") checkAnswer(spark.table("hive_serde"), Row(false)) - spark.sql("INSERT INTO TABLE hive_serde values(true)") + spark.sql("INSERT INTO TABLE hive_serde VALUES(true)") checkAnswer(spark.table("hive_serde"), Seq(Row(false), Row(true))) } // BINARY withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 BINARY) STORED AS $fileFormat") - hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('1')") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde VALUES('1')") checkAnswer(spark.table("hive_serde"), Row("1".getBytes)) - spark.sql("INSERT INTO TABLE hive_serde values('2')") + spark.sql("INSERT INTO TABLE hive_serde VALUES(CAST('2' AS BINARY))") checkAnswer(spark.table("hive_serde"), Seq(Row("1".getBytes), Row("2".getBytes))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 56c16c8d91b96..72c5f3f21fa2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1071,7 +1071,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("set hive.exec.dynamic.partition.mode=nonstrict") // date sql("drop table if exists dynparttest1") - sql("create table dynparttest1 (value int) partitioned by (pdate date)") + sql("create table dynparttest1 (value long) partitioned by (pdate date)") sql( """ |insert into table dynparttest1 partition(pdate) @@ -1083,7 +1083,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // decimal sql("drop table if exists dynparttest2") - sql("create table dynparttest2 (value int) partitioned by (pdec decimal(5, 1))") + sql("create table dynparttest2 (value long) partitioned by (pdec decimal(5, 1))") sql( """ |insert into table dynparttest2 partition(pdec) @@ -1580,8 +1580,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { .select(array($"id", $"id" + 1).as("arr"), $"id") .createOrReplaceTempView("source") withTable("dest1", "dest2") { - sql("CREATE TABLE dest1 (i INT)") - sql("CREATE TABLE dest2 (i INT)") + sql("CREATE TABLE dest1 (i LONG)") + sql("CREATE TABLE dest2 (i LONG)") sql( """ |FROM source @@ -2339,7 +2339,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { withTable("t") { - sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") + sql("CREATE TABLE t (col1 LONG, p1 LONG) USING PARQUET PARTITIONED BY (p1)") sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") if (enableOptimizeMetadataOnlyQuery) { // The result is wrong if we enable the configuration. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 94f35b0b3d523..9e4363661207d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -213,7 +213,8 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "false") { withTable("spark_23340") { sql("CREATE TABLE spark_23340(a array, b array) STORED AS ORC") - sql("INSERT INTO spark_23340 VALUES (array(), array())") + sql("INSERT INTO spark_23340 VALUES " + + "(CAST(array() AS array), CAST(array() AS array))") checkAnswer(spark.table("spark_23340"), Seq(Row(Array.empty[Float], Array.empty[Double]))) } } From c5b626cebedb39459cfe9445eb8dc595812bdff8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 29 May 2019 20:57:28 +0800 Subject: [PATCH 04/16] fix test --- .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala | 2 +- .../spark/sql/catalyst/encoders/EncoderResolutionSuite.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 907eedd0ee584..9d9043b4c0c12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -165,7 +165,7 @@ object Cast { def canNullSafeCastToDecimal(from: DataType, to: DecimalType): Boolean = from match { case from: BooleanType if to.isWiderThan(DecimalType.BooleanDecimal) => true - case from: NumericType if to.isWiderThan(from) => true + case from: IntegralType if to.isWiderThan(from) => true case from: DecimalType => // truncating or precision lose (to.precision - to.scale) > (from.precision - from.scale) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala index da1b695919dec..662ea2b1ddf92 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala @@ -230,10 +230,11 @@ class EncoderResolutionSuite extends PlanTest { castSuccess[Long, String] castSuccess[Int, java.math.BigDecimal] castSuccess[Long, java.math.BigDecimal] + castSuccess[java.math.BigDecimal, Double] castFail[Long, Int] castFail[java.sql.Timestamp, java.sql.Date] - castFail[java.math.BigDecimal, Double] + castFail[java.math.BigDecimal, Long] castFail[Double, java.math.BigDecimal] castFail[java.math.BigDecimal, Int] castFail[String, Long] From b91498f6f7af3feed7c188929335554c06b84f35 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 5 Jun 2019 22:54:43 +0800 Subject: [PATCH 05/16] change sqlconf; add migration doc; add comments --- docs/sql-migration-guide-upgrade.md | 2 ++ .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 +++++--- .../apache/spark/sql/execution/datasources/rules.scala | 2 +- .../spark/sql/internal/BaseSessionStateBuilder.scala | 2 ++ .../scala/org/apache/spark/sql/sources/InsertSuite.scala | 4 ++-- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 4 ++-- 6 files changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 5dc7a89514974..33d7a103b7666 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -50,6 +50,8 @@ license: | - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, when inserting into a table, Spark will cast the data type of input query to the data type of target table by coercion. Since Spark 3.0, by default only upcasting is allowed when inserting data into table. E.g. `int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or `string` -> `int` are not allowed. The old behaviour is preserved under a newly added configuration `spark.sql.legacy.insertTable.typeCoercion` with a default value of `false`. + - Refreshing a cached table would trigger a table uncache operation and then a table cache (lazily) operation. In Spark version 2.4 and earlier, the cache name and storage level are not preserved before the uncache operation. Therefore, the cache name and storage level could be changed unexpectedly. Since Spark 3.0, cache name and storage level will be first preserved for cache recreation. It helps to maintain a consistent cache behavior upon table refreshing. - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dd0c8bdee1807..d766e09e0c040 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1778,9 +1778,11 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_INSERT_TABLE_FORCIBLE_CAST = buildConf("spark.sql.legacy.insertTable.forcibleCast") - .doc("When inserting data to a table, Spark will cast the data type of input query to " + - "the data type of target table forcibly if this config is true.") + val LEGACY_INSERT_TABLE_TYPE_COERCION = buildConf("spark.sql.legacy.insertTable.typeCoercion") + .doc("When true, Spark will cast the data type of input query of table insertion to " + + "the data type of target table by coercion; otherwise, only upcasting is allowed, e.g. " + + "`int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or " + + "`string` -> `int` are not allowed.") .booleanConf .createWithDefault(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 18c436117e77e..f4a3be5237f83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -358,7 +358,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } - val newQuery = if (conf.getConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST)) { + val newQuery = if (conf.getConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION)) { DDLPreprocessingUtils.castAndRenameQueryOutput(insert.query, expectedColumns, conf) } else { val errors = new mutable.ArrayBuffer[String]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 7ef03449c802c..7d870ca49e4f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -175,6 +175,8 @@ abstract class BaseSessionStateBuilder( override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: + // In the rule `PreprocessTableInsertion`, the fields of input query might be converted as + // unresolved `UpCast`s. The following rule `ResolveUpCast` would resolve those `UpCast`s. ResolveUpCast +: DataSourceAnalysis(conf) +: customPostHocResolutionRules diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 3e56ef90efb4d..482ddb18a9459 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -567,7 +567,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { // This test needs to write null value to a non-nullable column. - withSQLConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST.key -> "true") { + withSQLConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION.key -> "true") { withTable("test_table") { val schema = new StructType() .add("i", LongType, false) @@ -607,7 +607,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } assert(e.message.contains("Cannot write incompatible data to table")) - withSQLConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST.key -> "true") { + withSQLConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION.key -> "true") { sql("INSERT INTO t VALUES (2L, 'a')") checkAnswer(spark.table("t"), Row(1, "1") :: Row(2, "a") :: Nil) } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 0f394fde8bdf2..7999c67457924 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -60,7 +60,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Ensures that cross joins are enabled so that we can test them TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true) // Force type case during table insertion - TestHive.setConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST, true) + TestHive.setConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION, true) // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") @@ -76,7 +76,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) - TestHive.sparkSession.sessionState.conf.unsetConf(SQLConf.LEGACY_INSERT_TABLE_FORCIBLE_CAST) + TestHive.sparkSession.sessionState.conf.unsetConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION) // For debugging dump some statistics about how much time was spent in various optimizer rules logWarning(RuleExecutor.dumpTimeSpent()) From babb288edabfaa79b23b4d5ca544dee2e891f155 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Jun 2019 00:02:18 +0800 Subject: [PATCH 06/16] extra spaces before if --- .../spark/sql/catalyst/analysis/ResolveOutputRelation.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala index e7d756b95bc2b..c494467523c89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.types.DataType object ResolveOutputRelation extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case append @ AppendData(table, query, isByName) - if table.resolved && query.resolved && !append.outputResolved => + if table.resolved && query.resolved && !append.outputResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { @@ -47,7 +47,7 @@ object ResolveOutputRelation extends Rule[LogicalPlan] { } case overwrite @ OverwriteByExpression(table, _, query, isByName) - if table.resolved && query.resolved && !overwrite.outputResolved => + if table.resolved && query.resolved && !overwrite.outputResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { @@ -57,7 +57,7 @@ object ResolveOutputRelation extends Rule[LogicalPlan] { } case overwrite @ OverwritePartitionsDynamic(table, query, isByName) - if table.resolved && query.resolved && !overwrite.outputResolved => + if table.resolved && query.resolved && !overwrite.outputResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { From 98dd42ea988a32012edfd7680cb2ae6417fb4c09 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Jun 2019 11:12:50 +0800 Subject: [PATCH 07/16] rename conf --- docs/sql-migration-guide-upgrade.md | 2 +- .../org/apache/spark/sql/catalyst/expressions/Cast.scala | 1 - .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../org/apache/spark/sql/execution/datasources/rules.scala | 2 +- .../test/scala/org/apache/spark/sql/sources/InsertSuite.scala | 4 ++-- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 4 ++-- 6 files changed, 7 insertions(+), 8 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 33d7a103b7666..3e2068fa75309 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -50,7 +50,7 @@ license: | - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. - - In Spark version 2.4 and earlier, when inserting into a table, Spark will cast the data type of input query to the data type of target table by coercion. Since Spark 3.0, by default only upcasting is allowed when inserting data into table. E.g. `int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or `string` -> `int` are not allowed. The old behaviour is preserved under a newly added configuration `spark.sql.legacy.insertTable.typeCoercion` with a default value of `false`. + - In Spark version 2.4 and earlier, when inserting into a table, Spark will cast the data type of input query to the data type of target table by coercion. Since Spark 3.0, by default only upcasting is allowed when inserting data into table. E.g. `int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or `string` -> `int` are not allowed. The old behaviour is preserved under a newly added configuration `spark.sql.legacy.insertUnsafeCasts` with a default value of `false`. - Refreshing a cached table would trigger a table uncache operation and then a table cache (lazily) operation. In Spark version 2.4 and earlier, the cache name and storage level are not preserved before the uncache operation. Therefore, the cache name and storage level could be changed unexpectedly. Since Spark 3.0, cache name and storage level will be first preserved for cache recreation. It helps to maintain a consistent cache behavior upon table refreshing. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 9d9043b4c0c12..ef4587bfdf865 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -133,7 +133,6 @@ object Cast { case (f: NumericType, t: NumericType) if legalNumericPrecedence(f, t) => true case (DateType, TimestampType) => true - case (_, StringType) => true // Spark supports casting between long and timestamp, please see `longToTimestamp` and diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d766e09e0c040..ef1b407df0d57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1778,7 +1778,7 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_INSERT_TABLE_TYPE_COERCION = buildConf("spark.sql.legacy.insertTable.typeCoercion") + val LEGACY_INSERT_UNSAFE_CASTS = buildConf("spark.sql.legacy.insertUnsafeCasts") .doc("When true, Spark will cast the data type of input query of table insertion to " + "the data type of target table by coercion; otherwise, only upcasting is allowed, e.g. " + "`int` -> `long` and `int` -> `string` are allowed, while `long` -> `int` or " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index f4a3be5237f83..b5f3bdd697846 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -358,7 +358,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } - val newQuery = if (conf.getConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION)) { + val newQuery = if (conf.getConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS)) { DDLPreprocessingUtils.castAndRenameQueryOutput(insert.query, expectedColumns, conf) } else { val errors = new mutable.ArrayBuffer[String]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 482ddb18a9459..592c609b016b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -567,7 +567,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { // This test needs to write null value to a non-nullable column. - withSQLConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION.key -> "true") { + withSQLConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS.key -> "true") { withTable("test_table") { val schema = new StructType() .add("i", LongType, false) @@ -607,7 +607,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } assert(e.message.contains("Cannot write incompatible data to table")) - withSQLConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION.key -> "true") { + withSQLConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS.key -> "true") { sql("INSERT INTO t VALUES (2L, 'a')") checkAnswer(spark.table("t"), Row(1, "1") :: Row(2, "a") :: Nil) } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 7999c67457924..8118b984e680b 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -60,7 +60,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Ensures that cross joins are enabled so that we can test them TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true) // Force type case during table insertion - TestHive.setConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION, true) + TestHive.setConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS, true) // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") @@ -76,7 +76,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) - TestHive.sparkSession.sessionState.conf.unsetConf(SQLConf.LEGACY_INSERT_TABLE_TYPE_COERCION) + TestHive.sparkSession.sessionState.conf.unsetConf(SQLConf.LEGACY_INSERT_UNSAFE_CASTS) // For debugging dump some statistics about how much time was spent in various optimizer rules logWarning(RuleExecutor.dumpTimeSpent()) From c520108e2e5b79d0508333675c9a9ef1bcd38e01 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Jun 2019 13:14:53 +0800 Subject: [PATCH 08/16] fix EncoderResolutionSuite.scala --- .../spark/sql/catalyst/encoders/EncoderResolutionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala index 662ea2b1ddf92..daff30727c150 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala @@ -230,12 +230,12 @@ class EncoderResolutionSuite extends PlanTest { castSuccess[Long, String] castSuccess[Int, java.math.BigDecimal] castSuccess[Long, java.math.BigDecimal] - castSuccess[java.math.BigDecimal, Double] + castSuccess[Double, java.math.BigDecimal] castFail[Long, Int] castFail[java.sql.Timestamp, java.sql.Date] castFail[java.math.BigDecimal, Long] - castFail[Double, java.math.BigDecimal] + castFail[java.math.BigDecimal, Double] castFail[java.math.BigDecimal, Int] castFail[String, Long] From 900e2d614c5a891c3c27cc361a7eb1a60b396e1d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Jun 2019 14:48:46 +0800 Subject: [PATCH 09/16] address comments --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 1 + .../spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 679ed02ba948a..5b2d5414126cb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -374,6 +374,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)") sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)") + // The `Cast` operator will be eliminated by optimization rule `ConstantFolding`. val optimizedPlan = sql( """ |INSERT OVERWRITE table test_partition PARTITION (b=1, c) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 444b79e6bf17e..4b2bde9268515 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -52,7 +52,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS private def checkNumericTypes(fileFormat: String, dataType: String, value: Any): Unit = { withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 $dataType) STORED AS $fileFormat") - hiveClient.runSqlHive(s"INSERT INTO TABLE hive_serde VALUES(1)") + hiveClient.runSqlHive(s"INSERT INTO TABLE hive_serde values(1)") checkAnswer(spark.table("hive_serde"), Row(1)) spark.sql(s"INSERT INTO TABLE hive_serde VALUES(CAST($value AS $dataType))") checkAnswer(spark.table("hive_serde"), Seq(Row(1), Row(value))) From 85fa370e8e7c4779e23d0695c4b814c4a033fcf4 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Jun 2019 14:51:41 +0800 Subject: [PATCH 10/16] revise --- .../spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 4b2bde9268515..5491fbff5c54e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -52,7 +52,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS private def checkNumericTypes(fileFormat: String, dataType: String, value: Any): Unit = { withTable("hive_serde") { hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 $dataType) STORED AS $fileFormat") - hiveClient.runSqlHive(s"INSERT INTO TABLE hive_serde values(1)") + hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values(1)") checkAnswer(spark.table("hive_serde"), Row(1)) spark.sql(s"INSERT INTO TABLE hive_serde VALUES(CAST($value AS $dataType))") checkAnswer(spark.table("hive_serde"), Seq(Row(1), Row(value))) From 7f5c15987c2ede25e1c3c39a54a4e2581dbd3f65 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Jun 2019 17:46:37 +0800 Subject: [PATCH 11/16] fix --- .../sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 58923f26c1ead..2d1d82a864353 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -250,7 +250,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "'y'", "double to float")) } test("byName: insert safe cast") { @@ -298,7 +298,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", - "Cannot safely cast", "'x'", "DoubleType to FloatType", + "Cannot safely cast", "'x'", "double to float", "Cannot write nullable values to non-null column", "'x'", "Cannot find data for output column", "'y'")) } @@ -396,7 +396,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "'y'", "double to float")) } test("byPosition: insert safe cast") { @@ -449,7 +449,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", "Cannot write nullable values to non-null column", "'x'", - "Cannot safely cast", "'x'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "double to float")) } test("bypass output column resolution") { From d9e7e4edf55d487c29de6153ed31fcf6a0921834 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Jun 2019 20:49:34 +0800 Subject: [PATCH 12/16] fix --- .../spark/sql/catalyst/expressions/Cast.scala | 4 +-- .../sql/catalyst/analysis/AnalysisTest.scala | 32 ++++++++++--------- .../DataTypeWriteCompatibilitySuite.scala | 16 +++++----- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index ef4587bfdf865..975b7d83debdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -126,7 +126,7 @@ object Cast { */ def canUpCast(from: DataType, to: DataType): Boolean = (from, to) match { case _ if from == to => true - case (NullType, _) => true + case (NullType, _) => false case (from: NumericType, to: DecimalType) if to.isWiderThan(from) => true case (from: DecimalType, to: NumericType) if from.isTighterThan(to) => true @@ -164,7 +164,7 @@ object Cast { def canNullSafeCastToDecimal(from: DataType, to: DecimalType): Boolean = from match { case from: BooleanType if to.isWiderThan(DecimalType.BooleanDecimal) => true - case from: IntegralType if to.isWiderThan(from) => true + case from: NumericType if to.isWiderThan(from) => true case from: DecimalType => // truncating or precision lose (to.precision - to.scale) > (from.precision - from.scale) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index fab1b776a3c72..d13804ee9f6f7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -89,22 +89,24 @@ trait AnalysisTest extends PlanTest { inputPlan: LogicalPlan, expectedErrors: Seq[String], caseSensitive: Boolean = true): Unit = { - val analyzer = getAnalyzer(caseSensitive) - val e = intercept[AnalysisException] { - analyzer.checkAnalysis(analyzer.execute(inputPlan)) - } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { + val analyzer = getAnalyzer(caseSensitive) + val e = intercept[AnalysisException] { + analyzer.checkAnalysis(analyzer.execute(inputPlan)) + } - if (!expectedErrors.map(_.toLowerCase(Locale.ROOT)).forall( - e.getMessage.toLowerCase(Locale.ROOT).contains)) { - fail( - s"""Exception message should contain the following substrings: - | - | ${expectedErrors.mkString("\n ")} - | - |Actual exception message: - | - | ${e.getMessage} - """.stripMargin) + if (!expectedErrors.map(_.toLowerCase(Locale.ROOT)).forall( + e.getMessage.toLowerCase(Locale.ROOT).contains)) { + fail( + s"""Exception message should contain the following substrings: + | + | ${expectedErrors.mkString("\n ")} + | + |Actual exception message: + | + | ${e.getMessage} + """.stripMargin) + } } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala index 6b5fc5f0d4434..8e9f8ab4a2a96 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala @@ -53,7 +53,7 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { test("Check NullType is incompatible with all other types") { allNonNullTypes.foreach { t => assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => - assert(err.contains(s"incompatible with $t")) + assert(err.contains(s"Cannot safely cast 'nulls': null to ${t.simpleString}")) } } } @@ -75,8 +75,8 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { s"Should not allow writing $w to $r because cast is not safe") { err => assert(err.contains("'t'"), "Should include the field name context") assert(err.contains("Cannot safely cast"), "Should identify unsafe cast") - assert(err.contains(s"$w"), "Should include write type") - assert(err.contains(s"$r"), "Should include read type") + assert(err.contains(s"${w.simpleString}"), "Should include write type") + assert(err.contains(s"${r.simpleString}"), "Should include read type") } } } @@ -325,7 +325,7 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { assertNumErrors(writeType, readType, "top", "Should catch 14 errors", 14) { errs => assert(errs(0).contains("'top.a.element'"), "Should identify bad type") assert(errs(0).contains("Cannot safely cast")) - assert(errs(0).contains("StringType to DoubleType")) + assert(errs(0).contains("string to double")) assert(errs(1).contains("'top.a'"), "Should identify bad type") assert(errs(1).contains("Cannot write nullable elements to array of non-nulls")) @@ -338,15 +338,15 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { assert(errs(3).contains("Cannot write nullable elements to array of non-nulls")) assert(errs(4).contains("'top.bad_nested_type'"), "Should identify bad type") - assert(errs(4).contains("is incompatible with")) + assert(errs(4).contains("Cannot safely cast")) assert(errs(5).contains("'top.m.key'"), "Should identify bad type") assert(errs(5).contains("Cannot safely cast")) - assert(errs(5).contains("DoubleType to LongType")) + assert(errs(5).contains("double to bigint")) assert(errs(6).contains("'top.m.value'"), "Should identify bad type") assert(errs(6).contains("Cannot safely cast")) - assert(errs(6).contains("DoubleType to FloatType")) + assert(errs(6).contains("double to float")) assert(errs(7).contains("'top.m'"), "Should identify bad type") assert(errs(7).contains("Cannot write nullable values to map of non-nulls")) @@ -364,7 +364,7 @@ class DataTypeWriteCompatibilitySuite extends SparkFunSuite { assert(errs(11).contains("'top.x'"), "Should identify bad type") assert(errs(11).contains("Cannot safely cast")) - assert(errs(11).contains("LongType to IntegerType")) + assert(errs(11).contains("bigint to int")) assert(errs(12).contains("'top'"), "Should identify bad type") assert(errs(12).contains("expected 'x', found 'y'"), "Should detect name mismatch") From 75b3bf4746cb367b758b4c4c2707d73252fd74d8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 10 Jun 2019 22:41:05 +0800 Subject: [PATCH 13/16] fix null --- .../catalyst/analysis/ResolveOutputRelation.scala | 12 ++++++++---- .../apache/spark/sql/catalyst/expressions/Cast.scala | 2 +- .../spark/sql/hive/HiveSessionStateBuilder.scala | 2 ++ 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala index c494467523c89..ebda40bb0729d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveOutputRelation.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression, UpCast} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Project} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression, UpCast} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, NullType} /** * Resolves columns of an output table from the data in a logical plan. This rule will: @@ -124,13 +124,17 @@ object ResolveOutputRelation extends Rule[LogicalPlan] { addError: String => Unit): Option[NamedExpression] = { // run the type check first to ensure type errors are present - val canWrite = DataType.canWrite( + lazy val canWrite = DataType.canWrite( queryExpr.dataType, tableAttr.dataType, byName, resolver, tableAttr.name, addError) if (queryExpr.nullable && !tableAttr.nullable) { addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'") None + } else if (queryExpr.dataType == NullType && tableAttr.nullable) { + Some(Alias(Cast(queryExpr, tableAttr.dataType, Option(SQLConf.get.sessionLocalTimeZone)), + tableAttr.name)(explicitMetadata = Option(tableAttr.metadata))) + } else if (!canWrite) { None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 975b7d83debdb..0320b1dcb2550 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -126,13 +126,13 @@ object Cast { */ def canUpCast(from: DataType, to: DataType): Boolean = (from, to) match { case _ if from == to => true - case (NullType, _) => false case (from: NumericType, to: DecimalType) if to.isWiderThan(from) => true case (from: DecimalType, to: NumericType) if from.isTighterThan(to) => true case (f: NumericType, t: NumericType) if legalNumericPrecedence(f, t) => true case (DateType, TimestampType) => true + case (NullType, _) => false case (_, StringType) => true // Spark supports casting between long and timestamp, please see `longToTimestamp` and diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 4d7638e93eec3..e7f938028bbff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -81,6 +81,8 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session RelationConversions(conf, catalog) +: PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: + // In the rule `PreprocessTableInsertion`, the fields of input query might be converted as + // unresolved `UpCast`s. The following rule `ResolveUpCast` would resolve those `UpCast`s. ResolveUpCast +: DataSourceAnalysis(conf) +: HiveAnalysis +: From a72d3a2fd9e1b73e255eb4fb81178b7635b04356 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 11 Jun 2019 16:35:47 +0800 Subject: [PATCH 14/16] fix decimal cast --- .../org/apache/spark/sql/catalyst/expressions/Cast.scala | 1 - .../main/scala/org/apache/spark/sql/types/DecimalType.scala | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 0320b1dcb2550..a309eefc2701c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -126,7 +126,6 @@ object Cast { */ def canUpCast(from: DataType, to: DataType): Boolean = (from, to) match { case _ if from == to => true - case (from: NumericType, to: DecimalType) if to.isWiderThan(from) => true case (from: DecimalType, to: NumericType) if from.isTighterThan(to) => true case (f: NumericType, t: NumericType) if legalNumericPrecedence(f, t) => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala index 119536706bd00..827cab460cb8f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala @@ -75,8 +75,9 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType { private[sql] def isWiderThan(other: DataType): Boolean = other match { case dt: DecimalType => (precision - scale) >= (dt.precision - dt.scale) && scale >= dt.scale - case dt: NumericType => + case dt: IntegralType => isWiderThan(DecimalType.forType(dt)) + // For DoubleType/FloatType, the value can be NaN, PositiveInfinity or NegativeInfinity. case _ => false } @@ -87,8 +88,9 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType { private[sql] def isTighterThan(other: DataType): Boolean = other match { case dt: DecimalType => (precision - scale) <= (dt.precision - dt.scale) && scale <= dt.scale - case dt: NumericType => + case dt: IntegralType => isTighterThan(DecimalType.forType(dt)) + // For DoubleType/FloatType, the value can be NaN, PositiveInfinity or NegativeInfinity. case _ => false } From f182938cd63125da75e61a02b5603fe411357e0d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 11 Jun 2019 21:22:55 +0800 Subject: [PATCH 15/16] fix test case --- .../sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 2d1d82a864353..3c45eafe289cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -290,7 +290,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { StructField("y", DoubleType))).toAttributes) val query = TestRelation(StructType(Seq( - StructField("x", DoubleType), + StructField("x", FloatType), StructField("b", FloatType))).toAttributes) val parsedPlan = byName(xRequiredTable, query) @@ -298,7 +298,6 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", - "Cannot safely cast", "'x'", "double to float", "Cannot write nullable values to non-null column", "'x'", "Cannot find data for output column", "'y'")) } @@ -440,7 +439,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { StructField("y", DoubleType))).toAttributes) val query = TestRelation(StructType(Seq( - StructField("x", DoubleType), + StructField("x", FloatType), StructField("b", FloatType))).toAttributes) val parsedPlan = byPosition(xRequiredTable, query) @@ -448,8 +447,7 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", - "Cannot write nullable values to non-null column", "'x'", - "Cannot safely cast", "'x'", "double to float")) + "Cannot write nullable values to non-null column", "'x'")) } test("bypass output column resolution") { From 3e2949131a8a7579149a7dd4153650461f5b5da2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 12 Jun 2019 15:04:04 +0800 Subject: [PATCH 16/16] fix --- .../main/scala/org/apache/spark/sql/types/DecimalType.scala | 3 +-- .../spark/sql/catalyst/encoders/EncoderResolutionSuite.scala | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala index 827cab460cb8f..83607a78036a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala @@ -88,9 +88,8 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType { private[sql] def isTighterThan(other: DataType): Boolean = other match { case dt: DecimalType => (precision - scale) <= (dt.precision - dt.scale) && scale <= dt.scale - case dt: IntegralType => + case dt: NumericType => isTighterThan(DecimalType.forType(dt)) - // For DoubleType/FloatType, the value can be NaN, PositiveInfinity or NegativeInfinity. case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala index daff30727c150..d639e5cc6721f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala @@ -230,7 +230,6 @@ class EncoderResolutionSuite extends PlanTest { castSuccess[Long, String] castSuccess[Int, java.math.BigDecimal] castSuccess[Long, java.math.BigDecimal] - castSuccess[Double, java.math.BigDecimal] castFail[Long, Int] castFail[java.sql.Timestamp, java.sql.Date]