Skip to content

Commit

Permalink
[SPARK-19723][SQL]create datasource table with an non-existent locati…
Browse files Browse the repository at this point in the history
…on should work
  • Loading branch information
windpiger committed Feb 24, 2017
1 parent d7e43b6 commit 89eb03a
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
className = table.provider.get,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
// As discussed in SPARK-19583, we don't check if the location is existed
catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)

val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1952,4 +1952,49 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}

test("create datasource table with an non-existent location") {
withTable("t", "t1") {
withTempDir {
dir =>
dir.delete()
spark.sql(
s"""
|CREATE TABLE t(a int, b int)
|USING parquet
|LOCATION '$dir'
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/"))

spark.sql("INSERT INTO TABLE t SELECT 1, 2")
assert(dir.exists())

checkAnswer(spark.table("t"), Row(1, 2))
}
// partition table
withTempDir {
dir =>
dir.delete()
spark.sql(
s"""
|CREATE TABLE t1(a int, b int)
|USING parquet
|PARTITIONED BY(a)
|LOCATION '$dir'
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/"))

spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")

val partDir = new File(dir, "a=1")
assert(partDir.exists())

checkAnswer(spark.table("t1"), Row(2, 1))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1587,4 +1587,96 @@ class HiveDDLSuite
}
}
}

test("create datasource table with an non-existent location") {
withTable("t", "t1") {
withTempDir {
dir =>
dir.delete()
spark.sql(
s"""
|CREATE TABLE t(a int, b int)
|USING parquet
|LOCATION '$dir'
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/"))

spark.sql("INSERT INTO TABLE t SELECT 1, 2")
assert(dir.exists())

checkAnswer(spark.table("t"), Row(1, 2))
}
// partition table
withTempDir {
dir =>
dir.delete()
spark.sql(
s"""
|CREATE TABLE t1(a int, b int)
|USING parquet
|PARTITIONED BY(a)
|LOCATION '$dir'
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/"))

spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")

val partDir = new File(dir, "a=1")
assert(partDir.exists())

checkAnswer(spark.table("t1"), Row(2, 1))
}
}
}

test("create hive table with an non-existent location") {
withTable("t", "t1") {
withTempDir {
dir =>
dir.delete()
spark.sql(
s"""
|CREATE TABLE t(a int, b int)
|USING hive
|LOCATION '$dir'
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

spark.sql("INSERT INTO TABLE t SELECT 1, 2")
assert(dir.exists())

checkAnswer(spark.table("t"), Row(1, 2))
}
// partition table
withTempDir {
dir =>
dir.delete()
spark.sql(
s"""
|CREATE TABLE t1(a int, b int)
|USING hive
|PARTITIONED BY(a)
|LOCATION '$dir'
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")

val partDir = new File(dir, "a=1")
assert(partDir.exists())

checkAnswer(spark.table("t1"), Row(2, 1))
}
}
}
}

0 comments on commit 89eb03a

Please sign in to comment.