From 2470538583435b190c5e91b2ac5f99c6cb1e7827 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 13 Aug 2015 01:33:52 +0800 Subject: [PATCH 1/3] Explicit define which data types can be used as dynamic partition columns --- .../datasources/PartitioningUtils.scala | 2 ++ .../datasources/ResolvedDataSource.scala | 9 ++++++++- .../execution/datasources/WriterContainer.scala | 2 +- .../spark/sql/execution/datasources/rules.scala | 16 ++++++++++++++-- .../sql/sources/hadoopFsRelationSuites.scala | 17 +++++++++++++++++ 5 files changed, 42 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 66dfcc308ceca..c8cc735736bbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -270,6 +270,8 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) + val validPartitionColumnTypes: Set[DataType] = upCastingOrder.toSet + /** * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" * types. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 7770bbd712f04..1aef67e9ef841 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -143,7 +143,7 @@ object ResolvedDataSource extends Logging { new ResolvedDataSource(clazz, relation) } - private def partitionColumnsSchema( + def partitionColumnsSchema( schema: StructType, partitionColumns: Array[String]): StructType = { StructType(partitionColumns.map { col => @@ -179,6 +179,13 @@ object ResolvedDataSource extends Logging { val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } + + partitionColumnsSchema(data.schema, partitionColumns).foreach { field => + if (!PartitioningUtils.validPartitionColumnTypes.contains(field.dataType)) { + throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") + } + } + val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name))) val r = dataSource.createRelation( sqlContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 2f11f40422402..d36197e50d448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -287,7 +287,7 @@ private[sql] class DynamicPartitionWriterContainer( PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType)) val str = If(IsNull(c), Literal(defaultPartitionName), escaped) val partitionName = Literal(c.name + "=") :: str :: Nil - if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName + if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName } // Returns the partition path given a partition key. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 40ca8bf4095d8..9c25031b0e5b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -116,6 +116,12 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } + r.partitionColumns.fields.foreach { field => + if (!PartitioningUtils.validPartitionColumnTypes.contains(field.dataType)) { + failAnalysis(s"Cannot use ${field.dataType} for partition column") + } + } + // Get all input data source relations of the query. val srcRelations = query.collect { case LogicalRelation(src: BaseRelation) => src @@ -138,10 +144,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } - case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => + case CreateTableUsingAsSelect(tableName, _, _, partitionColumns, mode, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (catalog.tableExists(Seq(tableName))) { + if (mode == SaveMode.Overwrite && catalog.tableExists(Seq(tableName))) { // Need to remove SubQuery operator. EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match { // Only do the check if the table is a data source table @@ -164,6 +170,12 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } + ResolvedDataSource.partitionColumnsSchema(query.schema, partitionColumns).foreach { field => + if (!PartitioningUtils.validPartitionColumnTypes.contains(field.dataType)) { + throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") + } + } + case _ => // OK } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 2a69d331b6e52..afa3557342440 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources +import java.sql.Date + import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration @@ -554,6 +556,21 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } + + test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") { + val df = Seq( + (1, "v1", Date.valueOf("2015-08-10")), + (2, "v2", Date.valueOf("2015-08-11")), + (3, "v3", Date.valueOf("2015-08-12"))).toDF("a", "b", "c") + withTempDir { file => + intercept[AnalysisException] { + df.write.format(dataSourceName).partitionBy("c").save(file.getCanonicalPath) + } + } + intercept[AnalysisException] { + df.write.format(dataSourceName).partitionBy("c").saveAsTable("t") + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when From 93d2788e2434df1520579b4f61618602260cece6 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 13 Aug 2015 17:01:12 +0800 Subject: [PATCH 2/3] extend allowed types --- .../execution/datasources/PartitioningUtils.scala | 13 ++++++++++++- .../execution/datasources/ResolvedDataSource.scala | 6 +----- .../spark/sql/execution/datasources/rules.scala | 12 ++---------- .../spark/sql/sources/hadoopFsRelationSuites.scala | 10 +++++----- 4 files changed, 20 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index c8cc735736bbf..9f21271add3a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -26,6 +26,7 @@ import scala.util.Try import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Shell +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types._ @@ -270,7 +271,17 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) - val validPartitionColumnTypes: Set[DataType] = upCastingOrder.toSet + def checkPartitionColumnOfValidDataType( + schema: StructType, + partitionColumns: Array[String]): Unit = { + + ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field => + field.dataType match { + case _: AtomicType | NullType => // OK + case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") + } + } + } /** * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 1aef67e9ef841..aa33afefbdc3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -180,11 +180,7 @@ object ResolvedDataSource extends Logging { path.makeQualified(fs.getUri, fs.getWorkingDirectory) } - partitionColumnsSchema(data.schema, partitionColumns).foreach { field => - if (!PartitioningUtils.validPartitionColumnTypes.contains(field.dataType)) { - throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") - } - } + PartitioningUtils.checkPartitionColumnOfValidDataType(data.schema, partitionColumns) val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name))) val r = dataSource.createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 9c25031b0e5b3..ab17401336195 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -116,11 +116,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } - r.partitionColumns.fields.foreach { field => - if (!PartitioningUtils.validPartitionColumnTypes.contains(field.dataType)) { - failAnalysis(s"Cannot use ${field.dataType} for partition column") - } - } + PartitioningUtils.checkPartitionColumnOfValidDataType(r.schema, part.keySet.toArray) // Get all input data source relations of the query. val srcRelations = query.collect { @@ -170,11 +166,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } - ResolvedDataSource.partitionColumnsSchema(query.schema, partitionColumns).foreach { field => - if (!PartitioningUtils.validPartitionColumnTypes.contains(field.dataType)) { - throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") - } - } + PartitioningUtils.checkPartitionColumnOfValidDataType(query.schema, partitionColumns) case _ => // OK } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index afa3557342440..59f2864625a6e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -559,16 +559,16 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") { val df = Seq( - (1, "v1", Date.valueOf("2015-08-10")), - (2, "v2", Date.valueOf("2015-08-11")), - (3, "v3", Date.valueOf("2015-08-12"))).toDF("a", "b", "c") + (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")), + (2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")), + (3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", "b", "c", "d", "e") withTempDir { file => intercept[AnalysisException] { - df.write.format(dataSourceName).partitionBy("c").save(file.getCanonicalPath) + df.write.format(dataSourceName).partitionBy("c", "d", "e").save(file.getCanonicalPath) } } intercept[AnalysisException] { - df.write.format(dataSourceName).partitionBy("c").saveAsTable("t") + df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") } } } From d926a615bbbcb40aaeba2a977c9c8c4b1787ecd2 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 13 Aug 2015 18:20:43 +0800 Subject: [PATCH 3/3] remove nullType pattern --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 4 ++-- .../spark/sql/execution/datasources/ResolvedDataSource.scala | 2 +- .../org/apache/spark/sql/execution/datasources/rules.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 9f21271add3a1..0a2007e15843c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -271,13 +271,13 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) - def checkPartitionColumnOfValidDataType( + def validatePartitionColumnDataTypes( schema: StructType, partitionColumns: Array[String]): Unit = { ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field => field.dataType match { - case _: AtomicType | NullType => // OK + case _: AtomicType => // OK case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index aa33afefbdc3e..8fbaf3a3059db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -180,7 +180,7 @@ object ResolvedDataSource extends Logging { path.makeQualified(fs.getUri, fs.getWorkingDirectory) } - PartitioningUtils.checkPartitionColumnOfValidDataType(data.schema, partitionColumns) + PartitioningUtils.validatePartitionColumnDataTypes(data.schema, partitionColumns) val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name))) val r = dataSource.createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index ab17401336195..9d3d35692ffcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -116,7 +116,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } - PartitioningUtils.checkPartitionColumnOfValidDataType(r.schema, part.keySet.toArray) + PartitioningUtils.validatePartitionColumnDataTypes(r.schema, part.keySet.toArray) // Get all input data source relations of the query. val srcRelations = query.collect { @@ -166,7 +166,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } - PartitioningUtils.checkPartitionColumnOfValidDataType(query.schema, partitionColumns) + PartitioningUtils.validatePartitionColumnDataTypes(query.schema, partitionColumns) case _ => // OK }