Skip to content

Commit

Permalink
Use fs.delete returned value, revert the change to old tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Aug 14, 2017
1 parent 13defbb commit 2b0fd74
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,8 @@ case class InsertIntoHiveTable(
try {
createdTempDir.foreach { path =>
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
// If we successfully delete the staging directory, remove it from FileSystem's cache.
if (!fs.exists(path)) {
if (fs.delete(path, true)) {
// If we successfully delete the staging directory, remove it from FileSystem's cache.
fs.cancelDeleteOnExit(path)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,71 +156,69 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}

test("SPARK-6851: Self-joined converted parquet tables") {
withTable("orders", "orderupdates") {
val orders = Seq(
Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151),
Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151),
Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151),
Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151),
Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151),
Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151),
Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152),
Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152),
Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152),
Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152),
Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152))

val orderUpdates = Seq(
Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151),
Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151))

orders.toDF.createOrReplaceTempView("orders1")
orderUpdates.toDF.createOrReplaceTempView("orderupdates1")
val orders = Seq(
Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151),
Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151),
Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151),
Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151),
Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151),
Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151),
Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152),
Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152),
Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152),
Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152),
Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152))

val orderUpdates = Seq(
Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151),
Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151))

orders.toDF.createOrReplaceTempView("orders1")
orderUpdates.toDF.createOrReplaceTempView("orderupdates1")

sql(
"""CREATE TABLE orders(
| id INT,
| make String,
| type String,
| price INT,
| pdate String,
| customer String,
| city String)
|PARTITIONED BY (state STRING, month INT)
|STORED AS PARQUET
""".stripMargin)
sql(
"""CREATE TABLE orders(
| id INT,
| make String,
| type String,
| price INT,
| pdate String,
| customer String,
| city String)
|PARTITIONED BY (state STRING, month INT)
|STORED AS PARQUET
""".stripMargin)

sql(
"""CREATE TABLE orderupdates(
| id INT,
| make String,
| type String,
| price INT,
| pdate String,
| customer String,
| city String)
|PARTITIONED BY (state STRING, month INT)
|STORED AS PARQUET
""".stripMargin)
sql(
"""CREATE TABLE orderupdates(
| id INT,
| make String,
| type String,
| price INT,
| pdate String,
| customer String,
| city String)
|PARTITIONED BY (state STRING, month INT)
|STORED AS PARQUET
""".stripMargin)

sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")
sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")

checkAnswer(
sql(
"""
|select orders.state, orders.month
|from orders
|join (
| select distinct orders.state,orders.month
| from orders
| join orderupdates
| on orderupdates.id = orders.id) ao
| on ao.state = orders.state and ao.month = orders.month
""".stripMargin),
(1 to 6).map(_ => Row("CA", 20151)))
}
checkAnswer(
sql(
"""
|select orders.state, orders.month
|from orders
|join (
| select distinct orders.state,orders.month
| from orders
| join orderupdates
| on orderupdates.id = orders.id) ao
| on ao.state = orders.state and ao.month = orders.month
""".stripMargin),
(1 to 6).map(_ => Row("CA", 20151)))
}

test("show functions") {
Expand Down Expand Up @@ -391,25 +389,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}

test("CTAS with WITH clause") {
withTable("with_table1") {
withTempView("table1") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.createOrReplaceTempView("table1")
val df = Seq((1, 1)).toDF("c1", "c2")
df.createOrReplaceTempView("table1")

sql(
"""
|CREATE TABLE with_table1 AS
|WITH T AS (
| SELECT *
| FROM table1
|)
|SELECT *
|FROM T
""".stripMargin)
val query = sql("SELECT * FROM with_table1")
checkAnswer(query, Row(1, 1) :: Nil)
}
}
sql(
"""
|CREATE TABLE with_table1 AS
|WITH T AS (
| SELECT *
| FROM table1
|)
|SELECT *
|FROM T
""".stripMargin)
val query = sql("SELECT * FROM with_table1")
checkAnswer(query, Row(1, 1) :: Nil)
}

test("explode nested Field") {
Expand Down Expand Up @@ -691,11 +685,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
sql("SELECT key, value FROM src ORDER BY key, value"))
}
sql("DROP TABLE ctas1")
sql("DROP TABLE ctas2")
sql("DROP TABLE ctas3")
sql("DROP TABLE ctas4")
sql("DROP TABLE ctas5")
}

test("specifying the column list for CTAS") {
Expand Down Expand Up @@ -767,46 +756,40 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}

test("double nested data") {
withTable("test_ctas_1234") {
sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil)
.toDF().createOrReplaceTempView("nested")
checkAnswer(
sql("SELECT f1.f2.f3 FROM nested"),
Row(1))
sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil)
.toDF().createOrReplaceTempView("nested")
checkAnswer(
sql("SELECT f1.f2.f3 FROM nested"),
Row(1))

sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested")
checkAnswer(
sql("SELECT * FROM test_ctas_1234"),
sql("SELECT * FROM nested").collect().toSeq)
sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested")
checkAnswer(
sql("SELECT * FROM test_ctas_1234"),
sql("SELECT * FROM nested").collect().toSeq)

intercept[AnalysisException] {
sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect()
}
intercept[AnalysisException] {
sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect()
}
}

test("test CTAS") {
withTable("test_ctas_123") {
sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src")
checkAnswer(
sql("SELECT key, value FROM test_ctas_123 ORDER BY key"),
sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
}
sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src")
checkAnswer(
sql("SELECT key, value FROM test_ctas_123 ORDER BY key"),
sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
}

test("SPARK-4825 save join to table") {
withTable("test", "test1", "test2") {
val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
sql("CREATE TABLE test1 (key INT, value STRING)")
testData.write.mode(SaveMode.Append).insertInto("test1")
sql("CREATE TABLE test2 (key INT, value STRING)")
testData.write.mode(SaveMode.Append).insertInto("test2")
testData.write.mode(SaveMode.Append).insertInto("test2")
sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
checkAnswer(
table("test"),
sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)
}
val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
sql("CREATE TABLE test1 (key INT, value STRING)")
testData.write.mode(SaveMode.Append).insertInto("test1")
sql("CREATE TABLE test2 (key INT, value STRING)")
testData.write.mode(SaveMode.Append).insertInto("test2")
testData.write.mode(SaveMode.Append).insertInto("test2")
sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
checkAnswer(
table("test"),
sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)
}

test("SPARK-3708 Backticks aren't handled correctly is aliases") {
Expand Down Expand Up @@ -2040,15 +2023,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}

test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") {
withTable("test1") {
withTable("test21721") {
val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
deleteOnExitField.setAccessible(true)

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]

val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
sql("CREATE TABLE test1 (key INT, value STRING)")
sql("CREATE TABLE test21721 (key INT, value STRING)")
val pathSizeToDeleteOnExit = setOfPath.size()

(0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1"))
Expand Down

0 comments on commit 2b0fd74

Please sign in to comment.