Skip to content

Commit

Permalink
[SPARK-19329][SQL][BRANCH-2.1] Reading from or writing to a datasourc…
Browse files Browse the repository at this point in the history
…e table with a non pre-existing location should succeed

## What changes were proposed in this pull request?

This is a backport pr of #16672 into branch-2.1.

## How was this patch tested?
Existing tests.

Author: windpiger <songjun@outlook.com>

Closes #17317 from windpiger/backport-insertnotexists.
  • Loading branch information
windpiger authored and gatorsmile committed Mar 16, 2017
1 parent 0622546 commit 9d032d0
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption)

LogicalRelation(
dataSource.resolveRelation(),
dataSource.resolveRelation(checkFilesExist = false),
expectedOutputAttributes = Some(simpleCatalogRelation.output),
catalogTable = Some(table))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,4 +1760,122 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq
assert(rows.length > 0)
}

test("insert data to a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

dir.delete
val tableLocFile = new File(table.location.stripPrefix("file:"))
assert(!tableLocFile.exists)
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists)
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDir = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir.getAbsolutePath)
assert(!newDir.exists)

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(newDir.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
}
}
}

test("insert into a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|OPTIONS(path '$dir')
|PARTITIONED BY(a, b)
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(dir, "a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)
}
}
}

test("read data from a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir.getAbsolutePath)
assert(!newDir.exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|OPTIONS(path "$dir")
|PARTITIONED BY(a, b)
""".stripMargin)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

// select from a partition which location has been deleted.
Utils.deleteRecursively(dir)
assert(!dir.exists())
spark.sql("REFRESH TABLE t")
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}

0 comments on commit 9d032d0

Please sign in to comment.