Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
if (t.bucketSpec.isDefined) {
throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
}
if (t.partitionColumnNames.nonEmpty && query.isDefined) {
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
"create a partitioned table using Hive's file formats. " +
"Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
"OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
"CTAS statement."
throw new AnalysisException(errorMessage)
}

val defaultStorage = HiveSerDe.getDefaultStorage(conf)
val options = new HiveOptions(t.storage.properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ case class CreateHiveTableAsSelectCommand(
compressed = tableDesc.storage.compressed)

val withSchema = if (withFormat.schema.isEmpty) {
// Hive doesn't support specifying the column list for target table in CTAS
// However we don't think SparkSQL should follow that.
tableDesc.copy(schema = query.output.toStructType)
tableDesc.copy(schema = query.schema)
} else {
withFormat
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,12 +1353,6 @@ class HiveDDLSuite
sql("INSERT INTO t SELECT 2, 'b'")
checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)

val e = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
}
assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
"to create a partitioned table using Hive"))

val e2 = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
}
Expand All @@ -1371,6 +1365,22 @@ class HiveDDLSuite
}
}

test("create partitioned hive serde table as select") {
withTable("t", "t1") {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
Seq(10 -> "y").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t")
checkAnswer(spark.table("t"), Row("y", 10) :: Nil)

Seq((1, 2, 3)).toDF("i", "j", "k").write.mode("overwrite").format("hive")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to test overwrite behavior so many times, just create a table with Seq(10 -> "y").toDF("i", "j").write.partitionBy("i") and overwrite it with Seq((1, 2, 3)).toDF("i", "j", "k").write.partitionBy("j", "k")

.partitionBy("j", "k").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, 2, 3) :: Nil)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test case is a bit fat, maybe we can split it into two or three smaller ones? e.g.:

  test("create hive serde table with DataFrameWriter.saveAsTable - basic") ...
  test("create hive serde table with DataFrameWriter.saveAsTable - overwrite and append") ...
  test("create hive serde table with DataFrameWriter.saveAsTable - partitioned") ...

spark.sql("create table t1 using hive partitioned by (i) as select 1 as i, 'a' as j")
checkAnswer(spark.table("t1"), Row("a", 1) :: Nil)
}
}
}

test("read/write files with hive data source is not allowed") {
withTempDir { dir =>
val e = intercept[AnalysisException] {
Expand All @@ -1390,7 +1400,7 @@ class HiveDDLSuite
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name)
}

withTable("t", "t1", "t2", "t3", "t4") {
withTable("t", "t1", "t2", "t3", "t4", "t5", "t6") {
sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)")
assert(getTableColumns("t") == Seq("a", "c", "d", "b"))

Expand All @@ -1411,7 +1421,14 @@ class HiveDDLSuite
sql("CREATE TABLE t4(a int, b int, c int, d int) USING hive PARTITIONED BY (d, b)")
assert(getTableColumns("t4") == Seq("a", "c", "d", "b"))

// TODO: add test for creating partitioned hive serde table as select, once we support it.
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql("CREATE TABLE t5 USING hive PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also test DataFrameWriter

assert(getTableColumns("t5") == Seq("a", "c", "d", "b"))

Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.format("hive")
.partitionBy("d", "b").saveAsTable("t6")
assert(getTableColumns("t6") == Seq("a", "c", "d", "b"))
}
}
}
}