Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19329][SQL][BRANCH-2.1]Reading from or writing to a datasource table with a non pre-existing location should succeed #17317

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption)

LogicalRelation(
dataSource.resolveRelation(),
dataSource.resolveRelation(checkFilesExist = false),
expectedOutputAttributes = Some(simpleCatalogRelation.output),
catalogTable = Some(table))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,4 +1760,122 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq
assert(rows.length > 0)
}

test("insert data to a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

dir.delete
val tableLocFile = new File(table.location.stripPrefix("file:"))
assert(!tableLocFile.exists)
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists)
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDir = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir.getAbsolutePath)
assert(!newDir.exists)

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(newDir.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
}
}
}

test("insert into a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|OPTIONS(path '$dir')
|PARTITIONED BY(a, b)
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(dir, "a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)
}
}
}

test("read data from a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir.getAbsolutePath)
assert(!newDir.exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|OPTIONS(path "$dir")
|PARTITIONED BY(a, b)
""".stripMargin)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

// select from a partition which location has been deleted.
Utils.deleteRecursively(dir)
assert(!dir.exists())
spark.sql("REFRESH TABLE t")
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}