Skip to content

Commit

Permalink
[SPARK-16033][SQL] insertInto() can't be used together with partition…
Browse files Browse the repository at this point in the history
…By()

## What changes were proposed in this pull request?

When inserting into an existing partitioned table, partitioning columns should always be determined by catalog metadata of the existing table to be inserted. Extra `partitionBy()` calls don't make sense, and mess up existing data because newly inserted data may have wrong partitioning directory layout.

## How was this patch tested?

New test case added in `InsertIntoHiveTableSuite`.

Author: Cheng Lian <lian@databricks.com>

Closes #13747 from liancheng/spark-16033-insert-into-without-partition-by.
  • Loading branch information
liancheng authored and yhuai committed Jun 18, 2016
1 parent ebb9a3b commit 10b6714
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
Expand Down Expand Up @@ -243,7 +241,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

private def insertInto(tableIdent: TableIdentifier): Unit = {
assertNotBucketed("insertInto")
val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap)

if (partitioningColumns.isDefined) {
throw new AnalysisException(
"insertInto() can't be used together with partitionBy(). " +
"Partition columns are defined by the table into which is being inserted."
)
}

val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap)
val overwrite = mode == SaveMode.Overwrite

// A partitioned relation's schema can be different from the input logicalPlan, since
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,43 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
}
}

private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = {
test(s"Hive SerDe table - $testName") {
val hiveTable = "hive_table"

withTable(hiveTable) {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql(s"CREATE TABLE $hiveTable (a INT) PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE")
f(hiveTable)
}
}
}
}

private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = {
test(s"Data source table - $testName") {
val dsTable = "ds_table"

withTable(dsTable) {
sql(s"CREATE TABLE $dsTable (a INT, b INT, c INT) USING PARQUET PARTITIONED BY (b, c)")
f(dsTable)
}
}
}

private def testPartitionedTable(testName: String)(f: String => Unit): Unit = {
testPartitionedHiveSerDeTable(testName)(f)
testPartitionedDataSourceTable(testName)(f)
}

testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
val cause = intercept[AnalysisException] {
Seq((1, 2, 3)).toDF("a", "b", "c").write.partitionBy("b", "c").insertInto(tableName)
}

assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
}

test("InsertIntoTable#resolved should include dynamic partitions") {
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
Expand Down

0 comments on commit 10b6714

Please sign in to comment.