Skip to content

Commit

Permalink
[SPARK-32508][SQL] Disallow empty part col values in partition spec b…
Browse files Browse the repository at this point in the history
…efore static partition writing

### What changes were proposed in this pull request?
Write to static partition, check in advance that the partition field is empty.

### Why are the changes needed?
When writing to the current static partition, the partition field is empty, and an error will be reported when all tasks are completed.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add ut

Closes #29316 from cxzl25/SPARK-32508.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cxzl25 authored and cloud-fan committed Sep 17, 2020
1 parent bd38e0b commit 92b75dc
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 4 deletions.
Expand Up @@ -386,7 +386,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
private def preprocess(
insert: InsertIntoStatement,
tblName: String,
partColNames: Seq[String]): InsertIntoStatement = {
partColNames: Seq[String],
catalogTable: Option[CatalogTable]): InsertIntoStatement = {

val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec(
insert.partitionSpec, partColNames, tblName, conf.resolver)
Expand All @@ -402,6 +403,18 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
}

val partitionsTrackedByCatalog = catalogTable.isDefined &&
catalogTable.get.partitionColumnNames.nonEmpty &&
catalogTable.get.tracksPartitionsInCatalog
if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
// empty partition column value
if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {
val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
}
}

val newQuery = TableOutputResolver.resolveOutputColumns(
tblName, expectedColumns, insert.query, byName = false, conf)
if (normalizedPartSpec.nonEmpty) {
Expand All @@ -427,13 +440,14 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
table match {
case relation: HiveTableRelation =>
val metadata = relation.tableMeta
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames,
Some(metadata))
case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
preprocess(i, tblName, h.partitionSchema.map(_.name), catalogTable)
case LogicalRelation(_: InsertableRelation, _, catalogTable, _) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
preprocess(i, tblName, Nil, catalogTable)
case _ => i
}
}
Expand Down
Expand Up @@ -866,6 +866,28 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}.getMessage
assert(message.contains("LOCAL is supported only with file: scheme"))
}

test("SPARK-32508 " +
"Disallow empty part col values in partition spec before static partition writing") {
withTable("insertTable") {
sql(
"""
|CREATE TABLE insertTable(i int, part1 string, part2 string) USING PARQUET
|PARTITIONED BY (part1, part2)
""".stripMargin)
val msg = "Partition spec is invalid"
assert(intercept[AnalysisException] {
sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1")
}.getMessage.contains(msg))
assert(intercept[AnalysisException] {
sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2")
}.getMessage.contains(msg))

sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1")
sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2")
sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'' AS part2")
}
}
}

class FileExistingTestFileSystem extends RawLocalFileSystem {
Expand Down
Expand Up @@ -847,4 +847,26 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}
}

test("SPARK-32508 " +
"Disallow empty part col values in partition spec before static partition writing") {
withTable("t1") {
spark.sql(
"""
|CREATE TABLE t1 (c1 int)
|PARTITIONED BY (d string)
""".stripMargin)

val e = intercept[AnalysisException] {
spark.sql(
"""
|INSERT OVERWRITE TABLE t1 PARTITION(d='')
|SELECT 1
""".stripMargin)
}.getMessage

assert(!e.contains("get partition: Value for key d is null or empty"))
assert(e.contains("Partition spec is invalid"))
}
}
}

0 comments on commit 92b75dc

Please sign in to comment.