From 13bd3ac68d1b82247ae44fed22c64c811ce78363 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 2 Jun 2016 16:14:50 -0700 Subject: [PATCH 1/2] [SPARK-15743][SQL] Prevent saving with all-column partitioning --- .../execution/datasources/DataSource.scala | 32 ++++++++--------- .../datasources/PartitioningUtils.scala | 8 +++-- .../sql/execution/datasources/rules.scala | 4 +-- .../execution/streaming/FileStreamSink.scala | 2 +- .../datasources/PartitioningUtilsSuite.scala | 36 +++++++++++++++++++ 5 files changed, 61 insertions(+), 21 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PartitioningUtilsSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5f17fdf9467db..a1030fa8e6358 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.execution.datasources @@ -432,7 +432,7 @@ case class DataSource( } val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - PartitioningUtils.validatePartitionColumnDataTypes( + PartitioningUtils.validatePartitionColumnDataTypesAndCount( data.schema, partitionColumns, caseSensitive) // If we are appending to a table that already exists, make sure the partitioning matches diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 74f2993754f8f..0565e602c00c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -339,7 +339,7 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) - def validatePartitionColumnDataTypes( + def validatePartitionColumnDataTypesAndCount( schema: StructType, partitionColumns: Seq[String], caseSensitive: Boolean): Unit = { @@ -350,6 +350,10 @@ private[sql] object PartitioningUtils { case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") } } + + if (partitionColumns.size == schema.fields.size) { + throw new AnalysisException(s"Cannot use all columns for partition columns") + } } def partitionColumnsSchema( @@ -359,7 +363,7 @@ private[sql] object PartitioningUtils { val equality = columnNameEquality(caseSensitive) StructType(partitionColumns.map { col => schema.find(f => equality(f.name, col)).getOrElse { - throw new RuntimeException(s"Partition column $col not found in schema $schema") + throw new AnalysisException(s"Partition column $col not found in schema $schema") } }).asNullable } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 9afd715016d88..de19fccb2513a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // OK } - PartitioningUtils.validatePartitionColumnDataTypes( + PartitioningUtils.validatePartitionColumnDataTypesAndCount( r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) // Get all input data source relations of the query. @@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // OK } - PartitioningUtils.validatePartitionColumnDataTypes( + PartitioningUtils.validatePartitionColumnDataTypesAndCount( c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) for { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index e19101032967b..088e448569b88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -91,7 +91,7 @@ class FileStreamSinkWriter( hadoopConf: Configuration, options: Map[String, String]) extends Serializable with Logging { - PartitioningUtils.validatePartitionColumnDataTypes( + PartitioningUtils.validatePartitionColumnDataTypesAndCount( data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis) private val serializableConf = new SerializableConfiguration(hadoopConf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PartitioningUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PartitioningUtilsSuite.scala new file mode 100644 index 0000000000000..05ef6ac06431c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PartitioningUtilsSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.test.SharedSQLContext + +class PartitioningUtilsSuite extends SharedSQLContext { + + test("prevent all column partitioning") { + withTempDir { dir => + val path = dir.getCanonicalPath + intercept[AnalysisException] { + spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) + } + intercept[AnalysisException] { + spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) + } + } + } +} From 46d420849cab97581c9833c8601ee10e037dc45e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 8 Jun 2016 15:14:07 -0700 Subject: [PATCH 2/2] Address comments. --- .../execution/datasources/DataSource.scala | 2 +- .../datasources/PartitioningUtils.scala | 2 +- .../sql/execution/datasources/rules.scala | 4 +-- .../execution/streaming/FileStreamSink.scala | 2 +- .../datasources/PartitioningUtilsSuite.scala | 36 ------------------- .../test/DataFrameReaderWriterSuite.scala | 12 +++++++ 6 files changed, 17 insertions(+), 41 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PartitioningUtilsSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a1030fa8e6358..d3273025b6885 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -432,7 +432,7 @@ case class DataSource( } val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - PartitioningUtils.validatePartitionColumnDataTypesAndCount( + PartitioningUtils.validatePartitionColumn( data.schema, partitionColumns, caseSensitive) // If we are appending to a table that already exists, make sure the partitioning matches diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 0565e602c00c5..2340ff0afed74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -339,7 +339,7 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) - def validatePartitionColumnDataTypesAndCount( + def validatePartitionColumn( schema: StructType, partitionColumns: Seq[String], caseSensitive: Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index de19fccb2513a..7ac62fb191d40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // OK } - PartitioningUtils.validatePartitionColumnDataTypesAndCount( + PartitioningUtils.validatePartitionColumn( r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) // Get all input data source relations of the query. @@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // OK } - PartitioningUtils.validatePartitionColumnDataTypesAndCount( + PartitioningUtils.validatePartitionColumn( c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) for { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 088e448569b88..efb04912d76bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -91,7 +91,7 @@ class FileStreamSinkWriter( hadoopConf: Configuration, options: Map[String, String]) extends Serializable with Logging { - PartitioningUtils.validatePartitionColumnDataTypesAndCount( + PartitioningUtils.validatePartitionColumn( data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis) private val serializableConf = new SerializableConfiguration(hadoopConf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PartitioningUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PartitioningUtilsSuite.scala deleted file mode 100644 index 05ef6ac06431c..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PartitioningUtilsSuite.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.test.SharedSQLContext - -class PartitioningUtilsSuite extends SharedSQLContext { - - test("prevent all column partitioning") { - withTempDir { dir => - val path = dir.getCanonicalPath - intercept[AnalysisException] { - spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) - } - intercept[AnalysisException] { - spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) - } - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index 431a943304f5b..bf6063a4c457c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -572,4 +572,16 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { cq.awaitTermination(2000L) } + + test("prevent all column partitioning") { + withTempDir { dir => + val path = dir.getCanonicalPath + intercept[AnalysisException] { + spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) + } + intercept[AnalysisException] { + spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) + } + } + } }