From 92a116d3fed9acac9710d66e318b04a860b49ba2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 22 May 2015 11:34:00 +0800 Subject: [PATCH 1/4] Fixes minor styling issues --- .../sql/hive/MetastoreDataSourcesSuite.scala | 266 +++++++++--------- 1 file changed, 133 insertions(+), 133 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 9623ef06aa9b0..809325a484cd1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -51,12 +51,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test ("persistent JSON table") { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), @@ -66,16 +66,16 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test ("persistent JSON table with a user specified schema") { sql( s""" - |CREATE TABLE jsonTable ( - |a string, - |b String, - |`c_!@(3)` int, - |`` Struct<`d!`:array, `=`:array>>) - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable ( + |a string, + |b String, + |`c_!@(3)` int, + |`` Struct<`d!`:array, `=`:array>>) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) read.json(filePath).registerTempTable("expectedJsonTable") @@ -89,12 +89,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // field values based on field names. sql( s""" - |CREATE TABLE jsonTable (`` Struct<`=`:array>>, b String) - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable (`` Struct<`=`:array>>, b String) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) val innerStruct = StructType( StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))) :: Nil) @@ -114,12 +114,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("resolve shortened provider names") { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), @@ -129,12 +129,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("drop table") { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), @@ -147,7 +147,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { } assert( - (new File(filePath)).exists(), + new File(filePath).exists(), "The table with specified path is considered as an external table, " + "its data should not deleted after DROP TABLE.") } @@ -160,12 +160,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${tempDir.getCanonicalPath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), @@ -198,12 +198,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${tempDir.getCanonicalPath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), @@ -217,12 +217,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${tempDir.getCanonicalPath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) // New table should reflect new schema. checkAnswer( @@ -234,12 +234,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("invalidate cache and reload") { sql( s""" - |CREATE TABLE jsonTable (`c_!@(3)` int) - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable (`c_!@(3)` int) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) read.json(filePath).registerTempTable("expectedJsonTable") @@ -263,22 +263,22 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("CTAS") { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) sql( s""" - |CREATE TABLE ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${tempPath}' - |) AS - |SELECT * FROM jsonTable - """.stripMargin) + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$tempPath' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) assert(table("ctasJsonTable").schema === table("jsonTable").schema) @@ -290,34 +290,34 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("CTAS with IF NOT EXISTS") { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) sql( s""" - |CREATE TABLE ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${tempPath}' - |) AS - |SELECT * FROM jsonTable - """.stripMargin) + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$tempPath' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) // Create the table again should trigger a AnalysisException. val message = intercept[AnalysisException] { sql( s""" - |CREATE TABLE ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${tempPath}' - |) AS - |SELECT * FROM jsonTable - """.stripMargin) + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$tempPath' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) }.getMessage assert(message.contains("Table ctasJsonTable already exists."), "We should complain that ctasJsonTable already exists") @@ -327,13 +327,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // The actual table's schema and data should not be changed. sql( s""" - |CREATE TABLE IF NOT EXISTS ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${tempPath}' - |) AS - |SELECT a FROM jsonTable - """.stripMargin) + |CREATE TABLE IF NOT EXISTS ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$tempPath' + |) AS + |SELECT a FROM jsonTable + """.stripMargin) // Discard the cached relation. invalidateTable("ctasJsonTable") @@ -349,12 +349,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("CTAS a managed table") { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${filePath}' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$filePath' + |) + """.stripMargin) val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") val filesystemPath = new Path(expectedPath) @@ -364,22 +364,22 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // It is a managed table when we do not specify the location. sql( s""" - |CREATE TABLE ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |AS - |SELECT * FROM jsonTable - """.stripMargin) + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |AS + |SELECT * FROM jsonTable + """.stripMargin) assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.") sql( s""" - |CREATE TABLE loadedTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${expectedPath}' - |) - """.stripMargin) + |CREATE TABLE loadedTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$expectedPath' + |) + """.stripMargin) assert(table("ctasJsonTable").schema === table("loadedTable").schema) @@ -395,12 +395,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("SPARK-5286 Fail to drop an invalid table when using the data source API") { sql( s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path 'it is not a path at all!' - |) - """.stripMargin) + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path 'it is not a path at all!' + |) + """.stripMargin) sql("DROP TABLE jsonTable").collect().foreach(println) } @@ -408,7 +408,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") { val originalDefaultSource = conf.defaultDataSourceName - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) val df = read.json(rdd) conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") @@ -417,21 +417,21 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { checkAnswer( sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), - (1 to 4).map(i => Row(i, s"str${i}"))) + (1 to 4).map(i => Row(i, s"str$i"))) checkAnswer( sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), - (6 to 10).map(i => Row(i, s"str${i}"))) + (6 to 10).map(i => Row(i, s"str$i"))) invalidateTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), - (1 to 4).map(i => Row(i, s"str${i}"))) + (1 to 4).map(i => Row(i, s"str$i"))) checkAnswer( sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), - (6 to 10).map(i => Row(i, s"str${i}"))) + (6 to 10).map(i => Row(i, s"str$i"))) // Drop table will also delete the data. sql("DROP TABLE savedJsonTable") @@ -442,7 +442,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("save table") { val originalDefaultSource = conf.defaultDataSourceName - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) val df = read.json(rdd) conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") @@ -500,7 +500,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("create external table") { val originalDefaultSource = conf.defaultDataSourceName - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) val df = read.json(rdd) conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") @@ -564,7 +564,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { setConf("spark.sql.hive.convertMetastoreParquet", "true") setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) read.json(rdd).registerTempTable("jt") sql( """ @@ -661,11 +661,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("SPARK-6024 wide schema support") { // We will need 80 splits for this schema if the threshold is 4000. - val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true))) + val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) assert( - schema.json.size > conf.schemaStringLengthThreshold, + schema.json.length > conf.schemaStringLengthThreshold, "To correctly test the fix of SPARK-6024, the value of " + - s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}") + s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.length}") // Manually create a metastore data source table. catalog.createDataSourceTable( tableName = "wide_schema", From dfe45d0231dc8c8a09c41952c3c9d4799babfa5b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 22 May 2015 18:30:42 +0800 Subject: [PATCH 2/4] Refactors MetastoreDataSourcesSuite to workaround SPARK-7684 --- .../org/apache/spark/sql/QueryTest.scala | 4 + .../apache/spark/sql/test/SQLTestUtils.scala | 12 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 1358 +++++++++-------- 3 files changed, 711 insertions(+), 663 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index bbf9ab113ca43..98ba3c99283a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -67,6 +67,10 @@ class QueryTest extends PlanTest { checkAnswer(df, Seq(expectedAnswer)) } + protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = { + checkAnswer(df, expectedAnswer.collect()) + } + def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext) { test(sqlString) { checkAnswer(sqlContext.sql(sqlString), expectedAnswer) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index ca66cdc48272d..17a8b0cca09df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -75,14 +75,18 @@ trait SQLTestUtils { /** * Drops temporary table `tableName` after calling `f`. */ - protected def withTempTable(tableName: String)(f: => Unit): Unit = { - try f finally sqlContext.dropTempTable(tableName) + protected def withTempTable(tableNames: String*)(f: => Unit): Unit = { + try f finally tableNames.foreach(sqlContext.dropTempTable) } /** * Drops table `tableName` after calling `f`. */ - protected def withTable(tableName: String)(f: => Unit): Unit = { - try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName") + protected def withTable(tableNames: String*)(f: => Unit): Unit = { + try f finally { + tableNames.foreach { name => + sqlContext.sql(s"DROP TABLE IF EXISTS $name") + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 809325a484cd1..44a450b2f90b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -23,768 +23,808 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.InvalidInputException -import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql._ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils /** * Tests for persisting tables created though the data sources API into the metastore. */ -class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { +class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils { + override val sqlContext = TestHive - override def afterEach(): Unit = { - reset() - Utils.deleteRecursively(tempPath) - } + val jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + + test("persistent JSON table") { + withTable("jsonTable") { + sql( + s"""CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$jsonFilePath' + |) + """.stripMargin) - val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile - var tempPath: File = Utils.createTempDir() - tempPath.delete() - - test ("persistent JSON table") { - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) - - checkAnswer( - sql("SELECT * FROM jsonTable"), - read.json(filePath).collect().toSeq) + checkAnswer( + sql("SELECT * FROM jsonTable"), + read.json(jsonFilePath).collect().toSeq) + } } - test ("persistent JSON table with a user specified schema") { - sql( - s""" - |CREATE TABLE jsonTable ( - |a string, - |b String, - |`c_!@(3)` int, - |`` Struct<`d!`:array, `=`:array>>) - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) - - read.json(filePath).registerTempTable("expectedJsonTable") - - checkAnswer( - sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM jsonTable"), - sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM expectedJsonTable").collect().toSeq) + test("persistent JSON table with a user specified schema") { + withTable("jsonTable") { + sql( + s"""CREATE TABLE jsonTable ( + |a string, + |b String, + |`c_!@(3)` int, + |`` Struct<`d!`:array, `=`:array>>) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$jsonFilePath' + |) + """.stripMargin) + + withTempTable("expectedJsonTable") { + read.json(jsonFilePath).registerTempTable("expectedJsonTable") + checkAnswer( + sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM jsonTable"), + sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM expectedJsonTable")) + } + } } - test ("persistent JSON table with a user specified schema with a subset of fields") { - // This works because JSON objects are self-describing and JSONRelation can get needed - // field values based on field names. - sql( - s""" - |CREATE TABLE jsonTable (`` Struct<`=`:array>>, b String) - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) - - val innerStruct = StructType( - StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))) :: Nil) - val expectedSchema = StructType( - StructField("", innerStruct, true) :: - StructField("b", StringType, true) :: Nil) - - assert(expectedSchema === table("jsonTable").schema) - - read.json(filePath).registerTempTable("expectedJsonTable") - - checkAnswer( - sql("SELECT b, ``.`=` FROM jsonTable"), - sql("SELECT b, ``.`=` FROM expectedJsonTable").collect().toSeq) + test("persistent JSON table with a user specified schema with a subset of fields") { + withTable("jsonTable") { + // This works because JSON objects are self-describing and JSONRelation can get needed + // field values based on field names. + sql( + s"""CREATE TABLE jsonTable (`` Struct<`=`:array>>, b String) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$jsonFilePath' + |) + """.stripMargin) + + val innerStruct = StructType(Seq( + StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))))) + + val expectedSchema = StructType(Seq( + StructField("", innerStruct, true), + StructField("b", StringType, true))) + + assert(expectedSchema === table("jsonTable").schema) + + withTempTable("expectedJsonTable") { + read.json(jsonFilePath).registerTempTable("expectedJsonTable") + checkAnswer( + sql("SELECT b, ``.`=` FROM jsonTable"), + sql("SELECT b, ``.`=` FROM expectedJsonTable")) + } + } } test("resolve shortened provider names") { - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) - - checkAnswer( - sql("SELECT * FROM jsonTable"), - read.json(filePath).collect().toSeq) + withTable("jsonTable") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '$jsonFilePath' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + read.json(jsonFilePath).collect().toSeq) + } } test("drop table") { - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) - - checkAnswer( - sql("SELECT * FROM jsonTable"), - read.json(filePath).collect().toSeq) - - sql("DROP TABLE jsonTable") - - intercept[Exception] { - sql("SELECT * FROM jsonTable").collect() - } + withTable("jsonTable") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '$jsonFilePath' + |) + """.stripMargin) - assert( - new File(filePath).exists(), - "The table with specified path is considered as an external table, " + - "its data should not deleted after DROP TABLE.") + checkAnswer( + sql("SELECT * FROM jsonTable"), + read.json(jsonFilePath)) + + sql("DROP TABLE jsonTable") + + intercept[Exception] { + sql("SELECT * FROM jsonTable").collect() + } + + assert( + new File(jsonFilePath).exists(), + "The table with specified path is considered as an external table, " + + "its data should not deleted after DROP TABLE.") + } } test("check change without refresh") { - val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir()) - tempDir.delete() - sparkContext.parallelize(("a", "b") :: Nil).toDF() - .toJSON.saveAsTextFile(tempDir.getCanonicalPath) - - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${tempDir.getCanonicalPath}' - |) - """.stripMargin) - - checkAnswer( - sql("SELECT * FROM jsonTable"), - Row("a", "b")) - - Utils.deleteRecursively(tempDir) - sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF() - .toJSON.saveAsTextFile(tempDir.getCanonicalPath) - - // Schema is cached so the new column does not show. The updated values in existing columns - // will show. - checkAnswer( - sql("SELECT * FROM jsonTable"), - Row("a1", "b1")) - - sql("REFRESH TABLE jsonTable") - - // Check that the refresh worked - checkAnswer( - sql("SELECT * FROM jsonTable"), - Row("a1", "b1", "c1")) - Utils.deleteRecursively(tempDir) + withTempPath { tempDir => + withTable("jsonTable") { + (("a", "b") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql( + s"""CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + Row("a", "b")) + + Utils.deleteRecursively(tempDir) + (("a1", "b1", "c1") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + // Schema is cached so the new column does not show. The updated values in existing columns + // will show. + checkAnswer( + sql("SELECT * FROM jsonTable"), + Row("a1", "b1")) + + sql("REFRESH TABLE jsonTable") + + // Check that the refresh worked + checkAnswer( + sql("SELECT * FROM jsonTable"), + Row("a1", "b1", "c1")) + } + } } test("drop, change, recreate") { - val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir()) - tempDir.delete() - sparkContext.parallelize(("a", "b") :: Nil).toDF() - .toJSON.saveAsTextFile(tempDir.getCanonicalPath) - - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${tempDir.getCanonicalPath}' - |) - """.stripMargin) - - checkAnswer( - sql("SELECT * FROM jsonTable"), - Row("a", "b")) - - Utils.deleteRecursively(tempDir) - sparkContext.parallelize(("a", "b", "c") :: Nil).toDF() - .toJSON.saveAsTextFile(tempDir.getCanonicalPath) - - sql("DROP TABLE jsonTable") - - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${tempDir.getCanonicalPath}' - |) - """.stripMargin) - - // New table should reflect new schema. - checkAnswer( - sql("SELECT * FROM jsonTable"), - Row("a", "b", "c")) - Utils.deleteRecursively(tempDir) + withTempPath { tempDir => + (("a", "b") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + withTable("jsonTable") { + sql( + s"""CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + Row("a", "b")) + + Utils.deleteRecursively(tempDir) + (("a", "b", "c") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql("DROP TABLE jsonTable") + + sql( + s"""CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + // New table should reflect new schema. + checkAnswer( + sql("SELECT * FROM jsonTable"), + Row("a", "b", "c")) + } + } } test("invalidate cache and reload") { - sql( - s""" - |CREATE TABLE jsonTable (`c_!@(3)` int) - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) + withTable("jsonTable") { + sql( + s"""CREATE TABLE jsonTable (`c_!@(3)` int) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$jsonFilePath' + |) + """.stripMargin) - read.json(filePath).registerTempTable("expectedJsonTable") + withTempTable("expectedJsonTable") { + read.json(jsonFilePath).registerTempTable("expectedJsonTable") - checkAnswer( - sql("SELECT * FROM jsonTable"), - sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) - // Discard the cached relation. - invalidateTable("jsonTable") + // Discard the cached relation. + invalidateTable("jsonTable") - checkAnswer( - sql("SELECT * FROM jsonTable"), - sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) - invalidateTable("jsonTable") - val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) + invalidateTable("jsonTable") + val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) - assert(expectedSchema === table("jsonTable").schema) + assert(expectedSchema === table("jsonTable").schema) + } + } } test("CTAS") { - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) - - sql( - s""" - |CREATE TABLE ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$tempPath' - |) AS - |SELECT * FROM jsonTable - """.stripMargin) - - assert(table("ctasJsonTable").schema === table("jsonTable").schema) - - checkAnswer( - sql("SELECT * FROM ctasJsonTable"), - sql("SELECT * FROM jsonTable").collect()) + withTempPath { tempPath => + withTable("jsonTable", "ctasJsonTable") { + sql( + s"""CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$jsonFilePath' + |) + """.stripMargin) + + sql( + s"""CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$tempPath' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + assert(table("ctasJsonTable").schema === table("jsonTable").schema) + + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM jsonTable").collect()) + } + } } test("CTAS with IF NOT EXISTS") { - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) - - sql( - s""" - |CREATE TABLE ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$tempPath' - |) AS - |SELECT * FROM jsonTable - """.stripMargin) - - // Create the table again should trigger a AnalysisException. - val message = intercept[AnalysisException] { + withTempPath { path => + val tempPath = path.getCanonicalPath + + withTable("jsonTable", "ctasJsonTable") { + sql( + s"""CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$jsonFilePath' + |) + """.stripMargin) + + sql( + s"""CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$tempPath' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + // Create the table again should trigger a AnalysisException. + val message = intercept[AnalysisException] { + sql( + s"""CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$tempPath' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + }.getMessage + + assert( + message.contains("Table ctasJsonTable already exists."), + "We should complain that ctasJsonTable already exists") + + // The following statement should be fine if it has IF NOT EXISTS. + // It tries to create a table ctasJsonTable with a new schema. + // The actual table's schema and data should not be changed. + sql( + s"""CREATE TABLE IF NOT EXISTS ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$tempPath' + |) AS + |SELECT a FROM jsonTable + """.stripMargin) + + // Discard the cached relation. + invalidateTable("ctasJsonTable") + + // Schema should not be changed. + assert(table("ctasJsonTable").schema === table("jsonTable").schema) + // Table data should not be changed. + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM jsonTable").collect()) + } + } + } + + test("CTAS a managed table") { + withTable("jsonTable", "ctasJsonTable", "loadedTable") { sql( - s""" - |CREATE TABLE ctasJsonTable + s"""CREATE TABLE jsonTable |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path '$tempPath' - |) AS - |SELECT * FROM jsonTable + | path '$jsonFilePath' + |) """.stripMargin) - }.getMessage - assert(message.contains("Table ctasJsonTable already exists."), - "We should complain that ctasJsonTable already exists") - - // The following statement should be fine if it has IF NOT EXISTS. - // It tries to create a table ctasJsonTable with a new schema. - // The actual table's schema and data should not be changed. - sql( - s""" - |CREATE TABLE IF NOT EXISTS ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$tempPath' - |) AS - |SELECT a FROM jsonTable - """.stripMargin) - - // Discard the cached relation. - invalidateTable("ctasJsonTable") - - // Schema should not be changed. - assert(table("ctasJsonTable").schema === table("jsonTable").schema) - // Table data should not be changed. - checkAnswer( - sql("SELECT * FROM ctasJsonTable"), - sql("SELECT * FROM jsonTable").collect()) - } - - test("CTAS a managed table") { - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$filePath' - |) - """.stripMargin) - - val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") - val filesystemPath = new Path(expectedPath) - val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) - if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) - - // It is a managed table when we do not specify the location. - sql( - s""" - |CREATE TABLE ctasJsonTable - |USING org.apache.spark.sql.json.DefaultSource - |AS - |SELECT * FROM jsonTable - """.stripMargin) - - assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.") - - sql( - s""" - |CREATE TABLE loadedTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '$expectedPath' - |) - """.stripMargin) - - assert(table("ctasJsonTable").schema === table("loadedTable").schema) - - checkAnswer( - sql("SELECT * FROM ctasJsonTable"), - sql("SELECT * FROM loadedTable").collect() - ) - - sql("DROP TABLE ctasJsonTable") - assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.") - } - test("SPARK-5286 Fail to drop an invalid table when using the data source API") { - sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path 'it is not a path at all!' - |) - """.stripMargin) - - sql("DROP TABLE jsonTable").collect().foreach(println) - } - - test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") { - val originalDefaultSource = conf.defaultDataSourceName + val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") + val filesystemPath = new Path(expectedPath) + val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) + if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - val df = read.json(rdd) - - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") - // Save the df as a managed table (by not specifiying the path). - df.write.saveAsTable("savedJsonTable") + // It is a managed table when we do not specify the location. + sql( + s"""CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |AS + |SELECT * FROM jsonTable + """.stripMargin) - checkAnswer( - sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), - (1 to 4).map(i => Row(i, s"str$i"))) + assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.") - checkAnswer( - sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), - (6 to 10).map(i => Row(i, s"str$i"))) + sql( + s"""CREATE TABLE loadedTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '$expectedPath' + |) + """.stripMargin) - invalidateTable("savedJsonTable") + assert(table("ctasJsonTable").schema === table("loadedTable").schema) - checkAnswer( - sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), - (1 to 4).map(i => Row(i, s"str$i"))) + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM loadedTable")) - checkAnswer( - sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), - (6 to 10).map(i => Row(i, s"str$i"))) + sql("DROP TABLE ctasJsonTable") + assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.") + } + } - // Drop table will also delete the data. - sql("DROP TABLE savedJsonTable") + test("SPARK-5286 Fail to drop an invalid table when using the data source API") { + withTable("jsonTable") { + sql( + s"""CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path 'it is not a path at all!' + |) + """.stripMargin) - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) + sql("DROP TABLE jsonTable").collect().foreach(println) + } } - test("save table") { - val originalDefaultSource = conf.defaultDataSourceName + test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") { + withTable("savedJsonTable") { + // Save the df as a managed table (by not specifying the path). + (1 to 10) + .map(i => i -> s"str$i") + .toDF("a", "b") + .write + .format("json") + .saveAsTable("savedJsonTable") - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - val df = read.json(rdd) + checkAnswer( + sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), + (1 to 4).map(i => Row(i, s"str$i"))) - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") - // Save the df as a managed table (by not specifiying the path). - df.write.saveAsTable("savedJsonTable") + checkAnswer( + sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), + (6 to 10).map(i => Row(i, s"str$i"))) - checkAnswer( - sql("SELECT * FROM savedJsonTable"), - df.collect()) + invalidateTable("savedJsonTable") - // Right now, we cannot append to an existing JSON table. - intercept[RuntimeException] { - df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable") - } + checkAnswer( + sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), + (1 to 4).map(i => Row(i, s"str$i"))) - // We can overwrite it. - df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable") - checkAnswer( - sql("SELECT * FROM savedJsonTable"), - df.collect()) - - // When the save mode is Ignore, we will do nothing when the table already exists. - df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable") - assert(df.schema === table("savedJsonTable").schema) - checkAnswer( - sql("SELECT * FROM savedJsonTable"), - df.collect()) - - // Drop table will also delete the data. - sql("DROP TABLE savedJsonTable") - intercept[InvalidInputException] { - read.json(catalog.hiveDefaultTableFilePath("savedJsonTable")) + checkAnswer( + sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), + (6 to 10).map(i => Row(i, s"str$i"))) } + } - // Create an external table by specifying the path. - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") - df.write - .format("org.apache.spark.sql.json") - .mode(SaveMode.Append) - .option("path", tempPath.toString) - .saveAsTable("savedJsonTable") - checkAnswer( - sql("SELECT * FROM savedJsonTable"), - df.collect()) - - // Data should not be deleted after we drop the table. - sql("DROP TABLE savedJsonTable") - checkAnswer( - read.json(tempPath.toString), - df.collect()) - - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) + test("save table") { + withTempPath { path => + val tempPath = path.getCanonicalPath + + withTable("savedJsonTable") { + val df = (1 to 10).map(i => i -> s"str$i").toDF("a", "b") + + withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "json") { + // Save the df as a managed table (by not specifying the path). + df.write.saveAsTable("savedJsonTable") + + checkAnswer(sql("SELECT * FROM savedJsonTable"), df) + + // Right now, we cannot append to an existing JSON table. + intercept[RuntimeException] { + df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable") + } + + // We can overwrite it. + df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable") + checkAnswer(sql("SELECT * FROM savedJsonTable"), df) + + // When the save mode is Ignore, we will do nothing when the table already exists. + df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable") + assert(df.schema === table("savedJsonTable").schema) + checkAnswer(sql("SELECT * FROM savedJsonTable"), df) + + // Drop table will also delete the data. + sql("DROP TABLE savedJsonTable") + intercept[InvalidInputException] { + read.json(catalog.hiveDefaultTableFilePath("savedJsonTable")) + } + } + + // Create an external table by specifying the path. + withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") { + df.write + .format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .option("path", tempPath.toString) + .saveAsTable("savedJsonTable") + + checkAnswer(sql("SELECT * FROM savedJsonTable"), df) + } + + // Data should not be deleted after we drop the table. + sql("DROP TABLE savedJsonTable") + checkAnswer(read.json(tempPath.toString), df) + } + } } test("create external table") { - val originalDefaultSource = conf.defaultDataSourceName - - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - val df = read.json(rdd) - - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") - df.write.format("org.apache.spark.sql.json") - .mode(SaveMode.Append) - .option("path", tempPath.toString) - .saveAsTable("savedJsonTable") - - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") - createExternalTable("createdJsonTable", tempPath.toString) - assert(table("createdJsonTable").schema === df.schema) - checkAnswer( - sql("SELECT * FROM createdJsonTable"), - df.collect()) - - var message = intercept[AnalysisException] { - createExternalTable("createdJsonTable", filePath.toString) - }.getMessage - assert(message.contains("Table createdJsonTable already exists."), - "We should complain that ctasJsonTable already exists") - - // Data should not be deleted. - sql("DROP TABLE createdJsonTable") - checkAnswer( - read.json(tempPath.toString), - df.collect()) - - // Try to specify the schema. - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") - val schema = StructType(StructField("b", StringType, true) :: Nil) - createExternalTable( - "createdJsonTable", - "org.apache.spark.sql.json", - schema, - Map("path" -> tempPath.toString)) - checkAnswer( - sql("SELECT * FROM createdJsonTable"), - sql("SELECT b FROM savedJsonTable").collect()) - - sql("DROP TABLE createdJsonTable") - - message = intercept[RuntimeException] { - createExternalTable( - "createdJsonTable", - "org.apache.spark.sql.json", - schema, - Map.empty[String, String]) - }.getMessage - assert( - message.contains("'path' must be specified for json data."), - "We should complain that path is not specified.") - - sql("DROP TABLE savedJsonTable") - conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) + withTempPath { tempPath => + withTable("savedJsonTable", "createdJsonTable") { + val df = read.json(sparkContext.parallelize((1 to 10).map { i => + s"""{ "a": $i, "b": "str$i" }""" + })) + + withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") { + df.write + .format("json") + .mode(SaveMode.Append) + .option("path", tempPath.toString) + .saveAsTable("savedJsonTable") + } + + withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "json") { + createExternalTable("createdJsonTable", tempPath.toString) + assert(table("createdJsonTable").schema === df.schema) + checkAnswer(sql("SELECT * FROM createdJsonTable"), df) + + assert( + intercept[AnalysisException] { + createExternalTable("createdJsonTable", jsonFilePath.toString) + }.getMessage.contains("Table createdJsonTable already exists."), + "We should complain that createdJsonTable already exists") + } + + // Data should not be deleted. + sql("DROP TABLE createdJsonTable") + checkAnswer(read.json(tempPath.toString), df) + + // Try to specify the schema. + withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") { + val schema = StructType(StructField("b", StringType, true) :: Nil) + createExternalTable( + "createdJsonTable", + "org.apache.spark.sql.json", + schema, + Map("path" -> tempPath.toString)) + + checkAnswer( + sql("SELECT * FROM createdJsonTable"), + sql("SELECT b FROM savedJsonTable")) + + sql("DROP TABLE createdJsonTable") + + assert( + intercept[RuntimeException] { + createExternalTable( + "createdJsonTable", + "org.apache.spark.sql.json", + schema, + Map.empty[String, String]) + }.getMessage.contains("'path' must be specified for json data."), + "We should complain that path is not specified.") + } + } + } } if (HiveShim.version == "0.13.1") { test("scan a parquet table created through a CTAS statement") { - val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true") - val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") - setConf("spark.sql.hive.convertMetastoreParquet", "true") - setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") - - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - read.json(rdd).registerTempTable("jt") - sql( - """ - |create table test_parquet_ctas STORED AS parquET - |AS select tmp.a from jt tmp where tmp.a < 5 - """.stripMargin) - - checkAnswer( - sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "), - Row(3) :: Row(4) :: Nil - ) - - table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: ParquetRelation2) => // OK - case _ => - fail( - "test_parquet_ctas should be converted to " + - s"${classOf[ParquetRelation2].getCanonicalName}") + withSQLConf( + "spark.sql.hive.convertMetastoreParquet" -> "true", + SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") { + + withTempTable("jt") { + (1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt") + + withTable("test_parquet_ctas") { + sql( + """CREATE TABLE test_parquet_ctas STORED AS PARQUET + |AS SELECT tmp.a FROM jt tmp WHERE tmp.a < 5 + """.stripMargin) + + checkAnswer( + sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "), + Row(3) :: Row(4) :: Nil) + + table("test_parquet_ctas").queryExecution.optimizedPlan match { + case LogicalRelation(p: ParquetRelation2) => // OK + case _ => + fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}") + } + } + } } - - // Clenup and reset confs. - sql("DROP TABLE IF EXISTS jt") - sql("DROP TABLE IF EXISTS test_parquet_ctas") - setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore) - setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource) } } test("Pre insert nullability check (ArrayType)") { - val df1 = - createDataFrame(Tuple1(Seq(Int.box(1), null.asInstanceOf[Integer])) :: Nil).toDF("a") - val expectedSchema1 = - StructType( - StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil) - assert(df1.schema === expectedSchema1) - df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("arrayInParquet") - - val df2 = - createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a") - val expectedSchema2 = - StructType( - StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil) - assert(df2.schema === expectedSchema2) - df2.write.mode(SaveMode.Append).insertInto("arrayInParquet") - createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append) - .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto. - createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write - .mode(SaveMode.Append).saveAsTable("arrayInParquet") - refreshTable("arrayInParquet") - - checkAnswer( - sql("SELECT a FROM arrayInParquet"), - Row(ArrayBuffer(1, null)) :: - Row(ArrayBuffer(2, 3)) :: - Row(ArrayBuffer(4, 5)) :: - Row(ArrayBuffer(6, null)) :: Nil) - - sql("DROP TABLE arrayInParquet") + withTable("arrayInParquet") { + { + val df = (Tuple1(Seq(Int.box(1), null: Integer)) :: Nil).toDF("a") + val expectedSchema = + StructType( + StructField( + "a", + ArrayType(IntegerType, containsNull = true), + nullable = true) :: Nil) + + assert(df.schema === expectedSchema) + + df.write + .format("parquet") + .mode(SaveMode.Overwrite) + .saveAsTable("arrayInParquet") + } + + { + val df = (Tuple1(Seq(2, 3)) :: Nil).toDF("a") + val expectedSchema = + StructType( + StructField( + "a", + ArrayType(IntegerType, containsNull = false), + nullable = true) :: Nil) + + assert(df.schema === expectedSchema) + + df.write + .format("parquet") + .mode(SaveMode.Append) + .insertInto("arrayInParquet") + } + + (Tuple1(Seq(4, 5)) :: Nil).toDF("a") + .write + .mode(SaveMode.Append) + .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto. + + (Tuple1(Seq(Int.box(6), null: Integer)) :: Nil).toDF("a") + .write + .mode(SaveMode.Append) + .saveAsTable("arrayInParquet") + + refreshTable("arrayInParquet") + + checkAnswer( + sql("SELECT a FROM arrayInParquet"), + Row(ArrayBuffer(1, null)) :: + Row(ArrayBuffer(2, 3)) :: + Row(ArrayBuffer(4, 5)) :: + Row(ArrayBuffer(6, null)) :: Nil) + } } test("Pre insert nullability check (MapType)") { - val df1 = - createDataFrame(Tuple1(Map(1 -> null.asInstanceOf[Integer])) :: Nil).toDF("a") - val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = true) - val expectedSchema1 = - StructType( - StructField("a", mapType1, nullable = true) :: Nil) - assert(df1.schema === expectedSchema1) - df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("mapInParquet") - - val df2 = - createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a") - val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = false) - val expectedSchema2 = - StructType( - StructField("a", mapType2, nullable = true) :: Nil) - assert(df2.schema === expectedSchema2) - df2.write.mode(SaveMode.Append).insertInto("mapInParquet") - createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append) - .saveAsTable("mapInParquet") // This one internally calls df2.insertInto. - createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write - .format("parquet").mode(SaveMode.Append).saveAsTable("mapInParquet") - refreshTable("mapInParquet") - - checkAnswer( - sql("SELECT a FROM mapInParquet"), - Row(Map(1 -> null)) :: - Row(Map(2 -> 3)) :: - Row(Map(4 -> 5)) :: - Row(Map(6 -> null)) :: Nil) - - sql("DROP TABLE mapInParquet") + withTable("mapInParquet") { + { + val df = (Tuple1(Map(1 -> (null: Integer))) :: Nil).toDF("a") + val expectedSchema = + StructType( + StructField( + "a", + MapType(IntegerType, IntegerType, valueContainsNull = true), + nullable = true) :: Nil) + + assert(df.schema === expectedSchema) + + df.write + .format("parquet") + .mode(SaveMode.Overwrite) + .saveAsTable("mapInParquet") + } + + { + val df = (Tuple1(Map(2 -> 3)) :: Nil).toDF("a") + val expectedSchema = + StructType( + StructField( + "a", + MapType(IntegerType, IntegerType, valueContainsNull = false), + nullable = true) :: Nil) + + assert(df.schema === expectedSchema) + + df.write + .format("parquet") + .mode(SaveMode.Append) + .insertInto("mapInParquet") + } + + (Tuple1(Map(4 -> 5)) :: Nil).toDF("a") + .write + .format("parquet") + .mode(SaveMode.Append) + .saveAsTable("mapInParquet") // This one internally calls df2.insertInto. + + (Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a") + .write + .format("parquet") + .mode(SaveMode.Append) + .saveAsTable("mapInParquet") + + refreshTable("mapInParquet") + + checkAnswer( + sql("SELECT a FROM mapInParquet"), + Row(Map(1 -> null)) :: + Row(Map(2 -> 3)) :: + Row(Map(4 -> 5)) :: + Row(Map(6 -> null)) :: Nil) + } } test("SPARK-6024 wide schema support") { - // We will need 80 splits for this schema if the threshold is 4000. - val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) - assert( - schema.json.length > conf.schemaStringLengthThreshold, - "To correctly test the fix of SPARK-6024, the value of " + - s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.length}") - // Manually create a metastore data source table. - catalog.createDataSourceTable( - tableName = "wide_schema", - userSpecifiedSchema = Some(schema), - partitionColumns = Array.empty[String], - provider = "json", - options = Map("path" -> "just a dummy path"), - isExternal = false) - - invalidateTable("wide_schema") - - val actualSchema = table("wide_schema").schema - assert(schema === actualSchema) + withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD -> "4000") { + withTable("wide_schema") { + // We will need 80 splits for this schema if the threshold is 4000. + val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) + + // Manually create a metastore data source table. + catalog.createDataSourceTable( + tableName = "wide_schema", + userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], + provider = "json", + options = Map("path" -> "just a dummy path"), + isExternal = false) + + invalidateTable("wide_schema") + + val actualSchema = table("wide_schema").schema + assert(schema === actualSchema) + } + } } test("SPARK-6655 still support a schema stored in spark.sql.sources.schema") { val tableName = "spark6655" - val schema = StructType(StructField("int", IntegerType, true) :: Nil) - - val hiveTable = HiveTable( - specifiedDatabase = Some("default"), - name = tableName, - schema = Seq.empty, - partitionColumns = Seq.empty, - properties = Map( - "spark.sql.sources.provider" -> "json", - "spark.sql.sources.schema" -> schema.json, - "EXTERNAL" -> "FALSE"), - tableType = ManagedTable, - serdeProperties = Map( - "path" -> catalog.hiveDefaultTableFilePath(tableName))) - - catalog.client.createTable(hiveTable) - - invalidateTable(tableName) - val actualSchema = table(tableName).schema - assert(schema === actualSchema) - sql(s"drop table $tableName") + withTable(tableName) { + val schema = StructType(StructField("int", IntegerType, true) :: Nil) + val hiveTable = HiveTable( + specifiedDatabase = Some("default"), + name = tableName, + schema = Seq.empty, + partitionColumns = Seq.empty, + properties = Map( + "spark.sql.sources.provider" -> "json", + "spark.sql.sources.schema" -> schema.json, + "EXTERNAL" -> "FALSE"), + tableType = ManagedTable, + serdeProperties = Map( + "path" -> catalog.hiveDefaultTableFilePath(tableName))) + + catalog.client.createTable(hiveTable) + + invalidateTable(tableName) + val actualSchema = table(tableName).schema + assert(schema === actualSchema) + } } test("Saving partition columns information") { - val df = - sparkContext.parallelize(1 to 10, 4).map { i => - Tuple4(i, i + 1, s"str$i", s"str${i + 1}") - }.toDF("a", "b", "c", "d") - + val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d") val tableName = s"partitionInfo_${System.currentTimeMillis()}" - df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) - invalidateTable(tableName) - val metastoreTable = catalog.client.getTable("default", tableName) - val expectedPartitionColumns = - StructType(df.schema("d") :: df.schema("b") :: Nil) - val actualPartitionColumns = - StructType( - metastoreTable.partitionColumns.map(c => - StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType)))) - // Make sure partition columns are correctly stored in metastore. - assert( - expectedPartitionColumns.sameType(actualPartitionColumns), - s"Partitions columns stored in metastore $actualPartitionColumns is not the " + - s"partition columns defined by the saveAsTable operation $expectedPartitionColumns.") - - // Check the content of the saved table. - checkAnswer( - table(tableName).selectExpr("c", "b", "d", "a"), - df.selectExpr("c", "b", "d", "a").collect()) - - sql(s"drop table $tableName") + + withTable(tableName) { + df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) + invalidateTable(tableName) + val metastoreTable = catalog.client.getTable("default", tableName) + val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) + val actualPartitionColumns = + StructType( + metastoreTable.partitionColumns.map(c => + StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType)))) + // Make sure partition columns are correctly stored in metastore. + assert( + expectedPartitionColumns.sameType(actualPartitionColumns), + s"Partitions columns stored in metastore $actualPartitionColumns is not the " + + s"partition columns defined by the saveAsTable operation $expectedPartitionColumns.") + + // Check the content of the saved table. + checkAnswer( + table(tableName).select("c", "b", "d", "a"), + df.select("c", "b", "d", "a")) + } } test("insert into a table") { - def createDF(from: Int, to: Int): DataFrame = - createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2") + def createDF(from: Int, to: Int) = (from to to).map(i => i -> s"str$i").toDF("c1", "c2") - createDF(0, 9).write.format("parquet").saveAsTable("insertParquet") - checkAnswer( - sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), - (6 to 9).map(i => Row(i, s"str$i"))) + withTable("insertParquet") { + createDF(0, 9).write.format("parquet").saveAsTable("insertParquet") + checkAnswer( + sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), + (6 to 9).map(i => Row(i, s"str$i"))) - intercept[AnalysisException] { - createDF(10, 19).write.format("parquet").saveAsTable("insertParquet") - } + intercept[AnalysisException] { + createDF(10, 19).write.format("parquet").saveAsTable("insertParquet") + } - createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") - checkAnswer( - sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), - (6 to 19).map(i => Row(i, s"str$i"))) + createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") + checkAnswer( + sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), + (6 to 19).map(i => Row(i, s"str$i"))) - createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") - checkAnswer( - sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"), - (6 to 24).map(i => Row(i, s"str$i"))) + createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"), + (6 to 24).map(i => Row(i, s"str$i"))) - intercept[AnalysisException] { - createDF(30, 39).write.saveAsTable("insertParquet") - } + intercept[AnalysisException] { + createDF(30, 39).write.saveAsTable("insertParquet") + } - createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet") - checkAnswer( - sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), - (6 to 34).map(i => Row(i, s"str$i"))) - - createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet") - checkAnswer( - sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), - (6 to 44).map(i => Row(i, s"str$i"))) - - createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet") - checkAnswer( - sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"), - (52 to 54).map(i => Row(i, s"str$i"))) - createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet") - checkAnswer( - sql("SELECT p.c1, c2 FROM insertParquet p"), - (50 to 59).map(i => Row(i, s"str$i"))) - - createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet") - checkAnswer( - sql("SELECT p.c1, c2 FROM insertParquet p"), - (70 to 79).map(i => Row(i, s"str$i"))) + createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet") + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), + (6 to 34).map(i => Row(i, s"str$i"))) + + createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet") + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), + (6 to 44).map(i => Row(i, s"str$i"))) + + createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet") + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"), + (52 to 54).map(i => Row(i, s"str$i"))) + createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet") + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p"), + (50 to 59).map(i => Row(i, s"str$i"))) + + createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet") + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p"), + (70 to 79).map(i => Row(i, s"str$i"))) + } } } From a423d48856dfb369d34cd8cdce552ccd26256668 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 23 May 2015 22:12:16 +0800 Subject: [PATCH 3/4] Fixes Scala style issue --- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 44a450b2f90b6..ab2f647040203 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -776,7 +776,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils { } test("insert into a table") { - def createDF(from: Int, to: Int) = (from to to).map(i => i -> s"str$i").toDF("c1", "c2") + def createDF(from: Int, to: Int): DataFrame = { + (from to to).map(i => i -> s"str$i").toDF("c1", "c2") + } withTable("insertParquet") { createDF(0, 9).write.format("parquet").saveAsTable("insertParquet") From 26939aa4fde2269e1eece82f4c39a890fa99c5a5 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 27 May 2015 11:04:36 -0700 Subject: [PATCH 4/4] Move the initialization of jsonFilePath to beforeAll. --- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index ab2f647040203..58e2d1fbfa73e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -21,6 +21,8 @@ import java.io.File import scala.collection.mutable.ArrayBuffer +import org.scalatest.BeforeAndAfterAll + import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.InvalidInputException @@ -38,10 +40,14 @@ import org.apache.spark.util.Utils /** * Tests for persisting tables created though the data sources API into the metastore. */ -class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils { +class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeAndAfterAll { override val sqlContext = TestHive - val jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + var jsonFilePath: String = _ + + override def beforeAll(): Unit = { + jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + } test("persistent JSON table") { withTable("jsonTable") {