From 1fbcd6fb247bb0be98aa0d23a14d18e55bdab1ee Mon Sep 17 00:00:00 2001 From: Vladimir Golubev Date: Tue, 8 Oct 2024 08:03:36 +0000 Subject: [PATCH] Introduce and use CONFLICTING_DIRECTORY_STRUCTURES error --- .../src/main/resources/error/error-conditions.json | 10 ++++++++++ .../spark/sql/errors/QueryExecutionErrors.scala | 10 ++++++++++ .../sql/execution/datasources/PartitioningUtils.scala | 11 +++-------- .../sql/execution/datasources/FileIndexSuite.scala | 4 ++-- .../parquet/ParquetPartitionDiscoverySuite.scala | 8 ++++---- 5 files changed, 29 insertions(+), 14 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e3bffea0b62eb..8100f0580b21f 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -669,6 +669,16 @@ ], "sqlState" : "40000" }, + "CONFLICTING_DIRECTORY_STRUCTURES" : { + "message" : [ + "Conflicting directory structures detected.", + "Suspicious paths:", + "", + "If provided paths are partition directories, please set \"basePath\" in the options of the data source to specify the root directory of the table.", + "If there are multiple root directories, please load them separately and then union them." + ], + "sqlState" : "KD009" + }, "CONFLICTING_PARTITION_COLUMN_NAMES" : { "message" : [ "Conflicting partition column names detected:", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index bc6c7681ea1a5..301880f1bfc61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2845,6 +2845,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE ) } + def conflictingDirectoryStructuresError( + discoveredBasePaths: Seq[String]): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "CONFLICTING_DIRECTORY_STRUCTURES", + messageParameters = Map( + "discoveredBasePaths" -> discoveredBasePaths.distinct.mkString("\n\t", "\n\t", "\n") + ) + ) + } + def conflictingPartitionColumnNamesError( distinctPartColLists: Seq[String], suspiciousPaths: Seq[Path]): SparkRuntimeException = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index ffdca65151052..402b70065d8e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -173,14 +173,9 @@ object PartitioningUtils extends SQLConfHelper { // "hdfs://host:9000/path" // TODO: Selective case sensitivity. val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase()) - assert( - ignoreInvalidPartitionPaths || discoveredBasePaths.distinct.size == 1, - "Conflicting directory structures detected. Suspicious paths:\b" + - discoveredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") + - "If provided paths are partition directories, please set " + - "\"basePath\" in the options of the data source to specify the " + - "root directory of the table. If there are multiple root directories, " + - "please load them separately and then union them.") + if (!ignoreInvalidPartitionPaths && discoveredBasePaths.distinct.size != 1) { + throw QueryExecutionErrors.conflictingDirectoryStructuresError(discoveredBasePaths) + } val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, caseSensitive) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 31b7380889158..e9f78f9f598e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -566,7 +566,7 @@ class FileIndexSuite extends SharedSparkSession { new File(directoryPath, "part_col=1").renameTo(new File(directoryPath, "undefined")) // By default, we expect the invalid path assertion to trigger. - val ex = intercept[AssertionError] { + val ex = intercept[SparkRuntimeException] { spark.read .format("parquet") .load(directoryPath.getCanonicalPath) @@ -585,7 +585,7 @@ class FileIndexSuite extends SharedSparkSession { // Data source option override takes precedence. withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") { - val ex = intercept[AssertionError] { + val ex = intercept[SparkRuntimeException] { spark.read .format("parquet") .option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "false") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 52d67a0954325..eb4618834504c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -111,7 +111,7 @@ abstract class ParquetPartitionDiscoverySuite "hdfs://host:9000/path/a=10/b=20", "hdfs://host:9000/path/a=10.5/b=hello") - var exception = intercept[AssertionError] { + var exception = intercept[SparkRuntimeException] { parsePartitions( paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId, false) } @@ -173,7 +173,7 @@ abstract class ParquetPartitionDiscoverySuite "hdfs://host:9000/path/a=10/b=20", "hdfs://host:9000/path/path1") - exception = intercept[AssertionError] { + exception = intercept[SparkRuntimeException] { parsePartitions( paths.map(new Path(_)), true, @@ -197,7 +197,7 @@ abstract class ParquetPartitionDiscoverySuite "hdfs://host:9000/tmp/tables/nonPartitionedTable1", "hdfs://host:9000/tmp/tables/nonPartitionedTable2") - exception = intercept[AssertionError] { + exception = intercept[SparkRuntimeException] { parsePartitions( paths.map(new Path(_)), true, @@ -878,7 +878,7 @@ abstract class ParquetPartitionDiscoverySuite checkAnswer(twoPartitionsDF, df.filter("b != 3")) - intercept[AssertionError] { + intercept[SparkRuntimeException] { spark .read .parquet(