From 6d052130051a21b9aa7c3ffce56a556bee129a5e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 30 Nov 2018 17:23:32 +0800 Subject: [PATCH 1/3] if case sensitive, validate partitions with original column names --- .../datasources/PartitioningUtils.scala | 14 ++++++---- .../datasources/FileIndexSuite.scala | 28 +++++++++++++++++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 9d2c9ba0c1a5b..9f0959a7f046f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -155,7 +155,8 @@ object PartitioningUtils { "root directory of the table. If there are multiple root directories, " + "please load them separately and then union them.") - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone) + val resolvedPartitionValues = + resolvePartitions(pathsWithPartitionValues, caseSensitive, timeZone) // Creates the StructType which represents the partition columns. val fields = { @@ -345,15 +346,18 @@ object PartitioningUtils { */ def resolvePartitions( pathsWithPartitionValues: Seq[(Path, PartitionValues)], + caseSensitive: Boolean, timeZone: TimeZone): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { - // TODO: Selective case sensitivity. - val distinctPartColNames = - pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct + val distinctPartColNames = if (caseSensitive) { + pathsWithPartitionValues.map(_._2.columnNames) + } else { + pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())) + } assert( - distinctPartColNames.size == 1, + distinctPartColNames.distinct.size == 1, listConflictingPartitionColumns(pathsWithPartitionValues)) // Resolves possible type conflicts for each column diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index fdb0511f01a22..559ccc05de299 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -65,6 +65,34 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26230: if case sensitive, validate partitions with original column names") { + withTempDir { dir => + val partitionDirectory = new File(dir, s"a=1") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val partitionDirectory2 = new File(dir, s"A=2") + partitionDirectory2.mkdir() + val file2 = new File(partitionDirectory2, "text.txt") + stringToFile(file2, "text") + val path = new Path(dir.getCanonicalPath) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) + val partitionValues = fileIndex.partitionSpec().partitions.map(_.values) + assert(partitionValues.length == 2) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val msg = intercept[AssertionError] { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) + fileIndex.partitionSpec() + }.getMessage + assert(msg.contains("Conflicting partition column names detected")) + } + } + } + test("InMemoryFileIndex: input paths are converted to qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt") From 313366d58075daba055c392682abaa01fbb574ee Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sat, 1 Dec 2018 16:57:47 +0800 Subject: [PATCH 2/3] address comments --- .../sql/execution/datasources/PartitioningUtils.scala | 4 ++-- .../spark/sql/execution/datasources/FileIndexSuite.scala | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 9f0959a7f046f..d66cb09bda0cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -351,13 +351,13 @@ object PartitioningUtils { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { - val distinctPartColNames = if (caseSensitive) { + val partColNames = if (caseSensitive) { pathsWithPartitionValues.map(_._2.columnNames) } else { pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())) } assert( - distinctPartColNames.distinct.size == 1, + partColNames.distinct.size == 1, listConflictingPartitionColumns(pathsWithPartitionValues)) // Resolves possible type conflicts for each column diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 559ccc05de299..b3629c31e74d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -52,7 +52,7 @@ class FileIndexSuite extends SharedSQLContext { test("SPARK-26188: don't infer data types of partition columns if user specifies schema") { withTempDir { dir => - val partitionDirectory = new File(dir, s"a=4d") + val partitionDirectory = new File(dir, "a=4d") partitionDirectory.mkdir() val file = new File(partitionDirectory, "text.txt") stringToFile(file, "text") @@ -67,11 +67,11 @@ class FileIndexSuite extends SharedSQLContext { test("SPARK-26230: if case sensitive, validate partitions with original column names") { withTempDir { dir => - val partitionDirectory = new File(dir, s"a=1") + val partitionDirectory = new File(dir, "a=1") partitionDirectory.mkdir() val file = new File(partitionDirectory, "text.txt") stringToFile(file, "text") - val partitionDirectory2 = new File(dir, s"A=2") + val partitionDirectory2 = new File(dir, "A=2") partitionDirectory2.mkdir() val file2 = new File(partitionDirectory2, "text.txt") stringToFile(file2, "text") @@ -88,6 +88,8 @@ class FileIndexSuite extends SharedSQLContext { val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) fileIndex.partitionSpec() }.getMessage + msg.matches("Partition column name list #[0,1]: A") + msg.matches("Partition column name list #[0,1]: a") assert(msg.contains("Conflicting partition column names detected")) } } From 88d8f0a0bb9b408df6c4135448035336e329f3e5 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 3 Dec 2018 12:59:46 +0800 Subject: [PATCH 3/3] update test case --- .../spark/sql/execution/datasources/FileIndexSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index b3629c31e74d3..ec552f7ddf47a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -88,9 +88,9 @@ class FileIndexSuite extends SharedSQLContext { val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) fileIndex.partitionSpec() }.getMessage - msg.matches("Partition column name list #[0,1]: A") - msg.matches("Partition column name list #[0,1]: a") assert(msg.contains("Conflicting partition column names detected")) + assert("Partition column name list #[0-1]: A".r.findFirstIn(msg).isDefined) + assert("Partition column name list #[0-1]: a".r.findFirstIn(msg).isDefined) } } }