From 054f3e65ed1d035f7605502b398c948a6c97ef97 Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Mon, 7 Oct 2024 18:32:48 +0900 Subject: [PATCH 1/7] [SPARK-49891][SQL] Assign proper error class for _LEGACY_ERROR_TEMP_1136 --- .../src/main/resources/error/error-conditions.json | 11 ++++++----- .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 +- .../spark/sql/errors/QueryCompilationErrors.scala | 2 +- .../apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e3bffea0b62eb..6c36d0efd5369 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -400,6 +400,12 @@ ], "sqlState" : "58030" }, + "CANNOT_SAVE_INTERVAL_TO_EXTERNAL_STORAGE" : { + "message" : [ + "Cannot save interval data type into external storage." + ], + "sqlState" : "42846" + }, "CANNOT_UPDATE_FIELD" : { "message" : [ "Cannot update field type:" @@ -6091,11 +6097,6 @@ " is not a valid Spark SQL Data Source." ] }, - "_LEGACY_ERROR_TEMP_1136" : { - "message" : [ - "Cannot save interval data type into external storage." - ] - }, "_LEGACY_ERROR_TEMP_1137" : { "message" : [ "Unable to resolve given []." diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index be887bd5237b0..cfaf7d74ba6b0 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1673,7 +1673,7 @@ abstract class AvroSuite exception = intercept[AnalysisException] { sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir) }, - condition = "_LEGACY_ERROR_TEMP_1136", + condition = "CANNOT_SAVE_INTERVAL_TO_EXTERNAL_STORAGE", parameters = Map.empty ) checkError( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 22cc001c0c78e..cd786c7cb0493 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1685,7 +1685,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def cannotSaveIntervalIntoExternalStorageError(): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1136", + errorClass = "CANNOT_SAVE_INTERVAL_TO_EXTERNAL_STORAGE", messageParameters = Map.empty) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index e44bd5de4f4c4..195947b08a6f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -512,7 +512,7 @@ class FileBasedDataSourceSuite extends QueryTest exception = intercept[AnalysisException] { sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) }, - condition = "_LEGACY_ERROR_TEMP_1136", + condition = "CANNOT_SAVE_INTERVAL_TO_EXTERNAL_STORAGE", parameters = Map.empty ) } From 311fca4fca8f740770f9d94964732cc8855db81c Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Tue, 8 Oct 2024 12:03:55 +0900 Subject: [PATCH 2/7] Use UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE --- .../resources/error/error-conditions.json | 6 ----- .../org/apache/spark/sql/avro/AvroSuite.scala | 8 +++++-- .../sql/errors/QueryCompilationErrors.scala | 14 ++++++++--- .../execution/datasources/DataSource.scala | 23 +++++++++++-------- .../spark/sql/FileBasedDataSourceSuite.scala | 13 +++++++++-- 5 files changed, 41 insertions(+), 23 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 6c36d0efd5369..6a70db5aa9935 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -400,12 +400,6 @@ ], "sqlState" : "58030" }, - "CANNOT_SAVE_INTERVAL_TO_EXTERNAL_STORAGE" : { - "message" : [ - "Cannot save interval data type into external storage." - ], - "sqlState" : "42846" - }, "CANNOT_UPDATE_FIELD" : { "message" : [ "Cannot update
field type:" diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index cfaf7d74ba6b0..d38a99889c2bb 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1673,8 +1673,12 @@ abstract class AvroSuite exception = intercept[AnalysisException] { sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir) }, - condition = "CANNOT_SAVE_INTERVAL_TO_EXTERNAL_STORAGE", - parameters = Map.empty + condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE", + parameters = Map( + "format" -> "Avro", + "columnName" -> "`INTERVAL '1 days'`", + "columnType" -> "\"INTERVAL\"" + ) ) checkError( exception = intercept[AnalysisException] { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index cd786c7cb0493..50bdd420ecaf1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1683,10 +1683,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("className" -> className)) } - def cannotSaveIntervalIntoExternalStorageError(): Throwable = { + def cannotSaveIntervalIntoExternalStorageError( + format: String, + columnName: String, + columnType: String): Throwable = { new AnalysisException( - errorClass = "CANNOT_SAVE_INTERVAL_TO_EXTERNAL_STORAGE", - messageParameters = Map.empty) + errorClass = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE", + messageParameters = Map( + "format" -> format, + "columnName" -> columnName, + "columnType" -> columnType + ) + ) } def cannotResolveAttributeError(name: String, outputStr: String): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 968c204841e46..ff61470acce0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -18,13 +18,10 @@ package org.apache.spark.sql.execution.datasources import java.util.{Locale, ServiceConfigurationError, ServiceLoader} - import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} @@ -35,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.connector.catalog.TableProvider +import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat @@ -514,7 +512,8 @@ case class DataSource( dataSource.createRelation( sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data)) case format: FileFormat => - disallowWritingIntervals(outputColumns.map(_.dataType), forbidAnsiIntervals = false) + val dataTypesWithNames = outputColumns.map(attr => (attr.dataType, attr.name)) + disallowWritingIntervals(dataTypesWithNames, format.toString, forbidAnsiIntervals = false) val cmd = planForWritingFileFormat(format, mode, data) val qe = sparkSession.sessionState.executePlan(cmd) qe.assertCommandExecuted() @@ -539,7 +538,8 @@ case class DataSource( } SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode) case format: FileFormat => - disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = false) + val dataTypesWithNames = data.schema.map(field => (field.dataType, field.name)) + disallowWritingIntervals(dataTypesWithNames, format.toString, forbidAnsiIntervals = false) DataSource.validateSchema(data.schema, sparkSession.sessionState.conf) planForWritingFileFormat(format, mode, data) case _ => throw SparkException.internalError( @@ -566,12 +566,15 @@ case class DataSource( } private def disallowWritingIntervals( - dataTypes: Seq[DataType], + dataTypesWithNames: Seq[(DataType, String)], + format: String, forbidAnsiIntervals: Boolean): Unit = { - dataTypes.foreach( - TypeUtils.invokeOnceForInterval(_, forbidAnsiIntervals) { - throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError() - }) + dataTypesWithNames.foreach { case (dataType, columnName) => + TypeUtils.invokeOnceForInterval(dataType, forbidAnsiIntervals) { + throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError( + format, toSQLId(columnName), toSQLType(dataType) + )} + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 195947b08a6f7..6661c4473c7b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -506,14 +506,23 @@ class FileBasedDataSourceSuite extends QueryTest withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> useV1List, SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") { + val formatMapping = Map( + "csv" -> "CSV", + "json" -> "JSON", + "parquet" -> "Parquet", + "orc" -> "ORC" + ) // write path Seq("csv", "json", "parquet", "orc").foreach { format => checkError( exception = intercept[AnalysisException] { sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) }, - condition = "CANNOT_SAVE_INTERVAL_TO_EXTERNAL_STORAGE", - parameters = Map.empty + condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE", + parameters = Map( + "format" -> formatMapping(format), + "columnName" -> "`INTERVAL '1 days'`", + "columnType" -> "\"INTERVAL\"") ) } From 204871a66eed6d34924fe2870031fd53df07820f Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Tue, 8 Oct 2024 12:05:03 +0900 Subject: [PATCH 3/7] fix style --- .../apache/spark/sql/execution/datasources/DataSource.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ff61470acce0b..58a27de1b73d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -18,10 +18,13 @@ package org.apache.spark.sql.execution.datasources import java.util.{Locale, ServiceConfigurationError, ServiceLoader} + import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} From 7b7fcdfad0dbc0a091b4f64324b189ecb6d108c9 Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Tue, 8 Oct 2024 18:26:41 +0900 Subject: [PATCH 4/7] fix style --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 58a27de1b73d1..5eb62175ab736 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -35,8 +35,8 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.connector.catalog.TableProvider -import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType} import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider From 578bec521a94c3af4fceccaa25addfb6fc7b4dc5 Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Mon, 14 Oct 2024 10:30:05 +0900 Subject: [PATCH 5/7] fix --- .../sql/execution/datasources/DataSource.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5eb62175ab736..d31def1cf5c27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -515,8 +515,8 @@ case class DataSource( dataSource.createRelation( sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data)) case format: FileFormat => - val dataTypesWithNames = outputColumns.map(attr => (attr.dataType, attr.name)) - disallowWritingIntervals(dataTypesWithNames, format.toString, forbidAnsiIntervals = false) + disallowWritingIntervals( + outputColumns.toStructType.asNullable, format.toString, forbidAnsiIntervals = false) val cmd = planForWritingFileFormat(format, mode, data) val qe = sparkSession.sessionState.executePlan(cmd) qe.assertCommandExecuted() @@ -541,8 +541,7 @@ case class DataSource( } SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode) case format: FileFormat => - val dataTypesWithNames = data.schema.map(field => (field.dataType, field.name)) - disallowWritingIntervals(dataTypesWithNames, format.toString, forbidAnsiIntervals = false) + disallowWritingIntervals(data.schema, format.toString, forbidAnsiIntervals = false) DataSource.validateSchema(data.schema, sparkSession.sessionState.conf) planForWritingFileFormat(format, mode, data) case _ => throw SparkException.internalError( @@ -569,13 +568,13 @@ case class DataSource( } private def disallowWritingIntervals( - dataTypesWithNames: Seq[(DataType, String)], + outputColumns: Seq[StructField], format: String, forbidAnsiIntervals: Boolean): Unit = { - dataTypesWithNames.foreach { case (dataType, columnName) => - TypeUtils.invokeOnceForInterval(dataType, forbidAnsiIntervals) { - throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError( - format, toSQLId(columnName), toSQLType(dataType) + outputColumns.foreach { field => + TypeUtils.invokeOnceForInterval(field.dataType, forbidAnsiIntervals) { + throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError( + format, field )} } } From 707c55c0b2729f0b24ab67a1d560e864e970e33d Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Mon, 14 Oct 2024 10:39:07 +0900 Subject: [PATCH 6/7] remove import --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d31def1cf5c27..e4870c9821f64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.connector.catalog.TableProvider import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType} import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider From c7356afd86fa340b3855d11389fa574e75da9063 Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Mon, 14 Oct 2024 10:40:17 +0900 Subject: [PATCH 7/7] remove cannotSaveIntervalIntoExternalStorageError --- .../spark/sql/errors/QueryCompilationErrors.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index e1030253f1370..3d3d9cb70bcf3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1691,20 +1691,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("className" -> className)) } - def cannotSaveIntervalIntoExternalStorageError( - format: String, - columnName: String, - columnType: String): Throwable = { - new AnalysisException( - errorClass = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE", - messageParameters = Map( - "format" -> format, - "columnName" -> columnName, - "columnType" -> columnType - ) - ) - } - def cannotResolveAttributeError(name: String, outputStr: String): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1137",