Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Aug 25, 2019
1 parent 008b8dd commit d21efa6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 109 deletions.
Expand Up @@ -28,6 +28,11 @@ class DataSourceV2DataFrameSuite
spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName)
}

after {
spark.sessionState.catalogManager.reset()
spark.sessionState.conf.clear()
}

override protected val catalogAndNamespace: String = "testcat.ns1.ns2.tbls"
override protected val v2Format: String = classOf[FakeV2Provider].getName

Expand Down
Expand Up @@ -19,18 +19,15 @@ package org.apache.spark.sql.sources.v2

import scala.collection.JavaConverters._

import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG}
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG
import org.apache.spark.sql.sources.v2.internal.UnresolvedTable
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -41,7 +38,7 @@ class DataSourceV2SQLSuite

private val orc2 = classOf[OrcDataSourceV2].getName
private val v2Source = classOf[FakeV2Provider].getName
override protected val v2Format = "foo"
override protected val v2Format = v2Source
override protected val catalogAndNamespace = "testcat.ns1.ns2."

private def catalog(name: String): CatalogPlugin = {
Expand Down Expand Up @@ -1422,15 +1419,6 @@ class DataSourceV2SQLSuite
}
}

test("InsertInto: append") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
sql(s"INSERT INTO $t1 SELECT id, data FROM source")
checkAnswer(spark.table(t1), spark.table("source"))
}
}

test("InsertInto: append - across catalog") {
val t1 = "testcat.ns1.ns2.tbl"
val t2 = "testcat2.db.tbl"
Expand All @@ -1442,89 +1430,6 @@ class DataSourceV2SQLSuite
}
}

test("InsertInto: append to partitioned table - without PARTITION clause") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)")
sql(s"INSERT INTO TABLE $t1 SELECT * FROM source")
checkAnswer(spark.table(t1), spark.table("source"))
}
}

test("InsertInto: append to partitioned table - with PARTITION clause") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)")
sql(s"INSERT INTO TABLE $t1 PARTITION (id) SELECT * FROM source")
checkAnswer(spark.table(t1), spark.table("source"))
}
}

test("InsertInto: dynamic PARTITION clause fails with non-partition column") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)")

val exc = intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $t1 PARTITION (data) SELECT * FROM source")
}

assert(spark.table(t1).count === 0)
assert(exc.getMessage.contains("PARTITION clause cannot contain a non-partition column name"))
assert(exc.getMessage.contains("data"))
}
}

test("InsertInto: static PARTITION clause fails with non-partition column") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (data)")

val exc = intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $t1 PARTITION (id=1) SELECT data FROM source")
}

assert(spark.table(t1).count === 0)
assert(exc.getMessage.contains("PARTITION clause cannot contain a non-partition column name"))
assert(exc.getMessage.contains("id"))
}
}

test("InsertInto: fails when missing a column") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING foo")
val exc = intercept[AnalysisException] {
sql(s"INSERT INTO $t1 SELECT id, data FROM source")
}

assert(spark.table(t1).count === 0)
assert(exc.getMessage.contains(s"Cannot write to '$t1', not enough data columns"))
}
}

test("InsertInto: fails when an extra column is present") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
val exc = intercept[AnalysisException] {
sql(s"INSERT INTO $t1 SELECT id, data, 'fruit' FROM source")
}

assert(spark.table(t1).count === 0)
assert(exc.getMessage.contains(s"Cannot write to '$t1', too many data columns"))
}
}

test("InsertInto: overwrite non-partitioned table") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT * FROM source")
sql(s"INSERT OVERWRITE TABLE $t1 SELECT * FROM source2")
checkAnswer(spark.table(t1), spark.table("source2"))
}
}

test("tableCreation: partition column case insensitive resolution") {
val testCatalog = catalog("testcat").asTableCatalog
val sessionCatalog = catalog("session").asTableCatalog
Expand Down
Expand Up @@ -30,15 +30,15 @@ abstract class InsertIntoTests(
import testImplicits._

test("insertInto: append") {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
doInsert(t1, df)
verifyTable(t1, df)
}

test("insertInto: append by position") {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
Expand All @@ -48,7 +48,7 @@ abstract class InsertIntoTests(
}

test("insertInto: append partitioned table") {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
Expand All @@ -58,7 +58,7 @@ abstract class InsertIntoTests(
}

test("insertInto: overwrite non-partitioned table") {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data")
Expand All @@ -69,7 +69,7 @@ abstract class InsertIntoTests(

test("insertInto: overwrite partitioned table in static mode") {
withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
doInsert(t1, init)
Expand All @@ -83,7 +83,7 @@ abstract class InsertIntoTests(

test("insertInto: overwrite partitioned table in static mode by position") {
withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
Expand All @@ -99,19 +99,20 @@ abstract class InsertIntoTests(
}

test("insertInto: fails when missing a column") {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING $v2Format")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
val exc = intercept[AnalysisException] {
doInsert(t1, df)
}

verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing"))
assert(exc.getMessage.contains(s"Cannot write to 'default.$t1', not enough data columns"))
val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1
assert(exc.getMessage.contains(s"Cannot write to '$tableName', not enough data columns"))
}

test("insertInto: fails when an extra column is present") {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit")
Expand All @@ -120,12 +121,13 @@ abstract class InsertIntoTests(
}

verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
assert(exc.getMessage.contains(s"Cannot write to 'default.$t1', too many data columns"))
val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1
assert(exc.getMessage.contains(s"Cannot write to '$tableName', too many data columns"))
}
}

dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic mode") {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
Expand All @@ -139,7 +141,7 @@ abstract class InsertIntoTests(
}

dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic mode by position") {
val t1 = "tbl"
val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
Expand Down Expand Up @@ -215,6 +217,38 @@ private[v2] trait InsertIntoSQLTests extends QueryTest with SharedSparkSession w
}
}

test("InsertInto: static PARTITION clause fails with non-partition column") {
val t1 = s"${catalogAndNamespace}tbl"
withTableAndData(t1) { view =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (data)")

val exc = intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $t1 PARTITION (id=1) SELECT data FROM $view")
}

verifyTable(t1, spark.emptyDataFrame)
assert(exc.getMessage.contains(
"PARTITION clause cannot contain a non-partition column name"))
assert(exc.getMessage.contains("id"))
}
}

test("InsertInto: dynamic PARTITION clause fails with non-partition column") {
val t1 = s"${catalogAndNamespace}tbl"
withTableAndData(t1) { view =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")

val exc = intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $t1 PARTITION (data) SELECT * FROM $view")
}

verifyTable(t1, spark.emptyDataFrame)
assert(exc.getMessage.contains(
"PARTITION clause cannot contain a non-partition column name"))
assert(exc.getMessage.contains("data"))
}
}

test("InsertInto: overwrite - dynamic clause - static mode") {
withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) {
val t1 = s"${catalogAndNamespace}tbl"
Expand Down

0 comments on commit d21efa6

Please sign in to comment.