diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 765c1e2dda99f..f76a903dcc9cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.util._ * Provides helper methods for comparing plans. */ class PlanTest extends SparkFunSuite { - /** * Since attribute references are given globally unique ids during analysis, * we must normalize them to check if two different queries are identical. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 3649c2a97b5ef..407658cdc8d1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.columnar.InMemoryRelation -class QueryTest extends PlanTest { +abstract class QueryTest extends PlanTest { + + protected def sqlContext: SQLContext // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) @@ -56,18 +58,32 @@ class QueryTest extends PlanTest { * @param df the [[DataFrame]] to be executed * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ - protected def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = { + protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = { + val analyzedDF = try df catch { + case ae: AnalysisException => + sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false) + val partiallyAnalzyedPlan = df.queryExecution.analyzed + sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true) + fail( + s""" + |Failed to analyze query: $ae + |$partiallyAnalzyedPlan + | + |${stackTraceToString(ae)} + |""".stripMargin) + } + QueryTest.checkAnswer(df, expectedAnswer) match { case Some(errorMessage) => fail(errorMessage) case None => } } - protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = { + protected def checkAnswer(df: => DataFrame, expectedAnswer: Row): Unit = { checkAnswer(df, Seq(expectedAnswer)) } - protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = { + protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = { checkAnswer(df, expectedAnswer.collect()) } @@ -96,7 +112,7 @@ object QueryTest { * @param df the [[DataFrame]] to be executed * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ - def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = { + def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Option[String] = { val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty def prepareAnswer(answer: Seq[Row]): Seq[Row] = { // Converts data to types that we can do equality comparison using Scala collections. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala index b91438baea06f..c79f6eb63316d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -268,9 +268,7 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext { Row(3, 4)) intercept[AnalysisException] { - checkAnswer( - df.selectExpr("length(c)"), // int type of the argument is unacceptable - Row("5.0000")) + df.selectExpr("length(c)") // int type of the argument is unacceptable } } @@ -284,63 +282,46 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext { } test("number format function") { - val tuple = - ("aa", 1.asInstanceOf[Byte], 2.asInstanceOf[Short], - 3.13223f, 4, 5L, 6.48173d, Decimal(7.128381)) - val df = - Seq(tuple) - .toDF( - "a", // string "aa" - "b", // byte 1 - "c", // short 2 - "d", // float 3.13223f - "e", // integer 4 - "f", // long 5L - "g", // double 6.48173d - "h") // decimal 7.128381 - - checkAnswer( - df.select(format_number($"f", 4)), + val df = sqlContext.range(1) + + checkAnswer( + df.select(format_number(lit(5L), 4)), Row("5.0000")) checkAnswer( - df.selectExpr("format_number(b, e)"), // convert the 1st argument to integer + df.select(format_number(lit(1.toByte), 4)), // convert the 1st argument to integer Row("1.0000")) checkAnswer( - df.selectExpr("format_number(c, e)"), // convert the 1st argument to integer + df.select(format_number(lit(2.toShort), 4)), // convert the 1st argument to integer Row("2.0000")) checkAnswer( - df.selectExpr("format_number(d, e)"), // convert the 1st argument to double + df.select(format_number(lit(3.1322.toFloat), 4)), // convert the 1st argument to double Row("3.1322")) checkAnswer( - df.selectExpr("format_number(e, e)"), // not convert anything + df.select(format_number(lit(4), 4)), // not convert anything Row("4.0000")) checkAnswer( - df.selectExpr("format_number(f, e)"), // not convert anything + df.select(format_number(lit(5L), 4)), // not convert anything Row("5.0000")) checkAnswer( - df.selectExpr("format_number(g, e)"), // not convert anything + df.select(format_number(lit(6.48173), 4)), // not convert anything Row("6.4817")) checkAnswer( - df.selectExpr("format_number(h, e)"), // not convert anything + df.select(format_number(lit(BigDecimal(7.128381)), 4)), // not convert anything Row("7.1284")) intercept[AnalysisException] { - checkAnswer( - df.selectExpr("format_number(a, e)"), // string type of the 1st argument is unacceptable - Row("5.0000")) + df.select(format_number(lit("aaa"), 4)) // string type of the 1st argument is unacceptable } intercept[AnalysisException] { - checkAnswer( - df.selectExpr("format_number(e, g)"), // decimal type of the 2nd argument is unacceptable - Row("5.0000")) + df.selectExpr("format_number(4, 6.48173)") // non-integral type 2nd argument is unacceptable } // for testing the mutable state of the expression in code gen. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index 3a87f374d94b0..956196673ce29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -31,13 +31,13 @@ import org.apache.spark.sql.catalyst.util._ * class's test helper methods can be used, see [[SortSuite]]. */ private[sql] abstract class SparkPlanTest extends SparkFunSuite { - protected def _sqlContext: SQLContext + protected def sqlContext: SQLContext /** * Creates a DataFrame from a local Seq of Product. */ implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder = { - _sqlContext.implicits.localSeqToDataFrameHolder(data) + sqlContext.implicits.localSeqToDataFrameHolder(data) } /** @@ -98,7 +98,7 @@ private[sql] abstract class SparkPlanTest extends SparkFunSuite { planFunction: Seq[SparkPlan] => SparkPlan, expectedAnswer: Seq[Row], sortAnswers: Boolean = true): Unit = { - SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers, _sqlContext) match { + SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers, sqlContext) match { case Some(errorMessage) => fail(errorMessage) case None => } @@ -122,7 +122,7 @@ private[sql] abstract class SparkPlanTest extends SparkFunSuite { expectedPlanFunction: SparkPlan => SparkPlan, sortAnswers: Boolean = true): Unit = { SparkPlanTest.checkAnswer( - input, planFunction, expectedPlanFunction, sortAnswers, _sqlContext) match { + input, planFunction, expectedPlanFunction, sortAnswers, sqlContext) match { case Some(errorMessage) => fail(errorMessage) case None => } @@ -149,13 +149,13 @@ object SparkPlanTest { planFunction: SparkPlan => SparkPlan, expectedPlanFunction: SparkPlan => SparkPlan, sortAnswers: Boolean, - _sqlContext: SQLContext): Option[String] = { + sqlContext: SQLContext): Option[String] = { val outputPlan = planFunction(input.queryExecution.sparkPlan) val expectedOutputPlan = expectedPlanFunction(input.queryExecution.sparkPlan) val expectedAnswer: Seq[Row] = try { - executePlan(expectedOutputPlan, _sqlContext) + executePlan(expectedOutputPlan, sqlContext) } catch { case NonFatal(e) => val errorMessage = @@ -170,7 +170,7 @@ object SparkPlanTest { } val actualAnswer: Seq[Row] = try { - executePlan(outputPlan, _sqlContext) + executePlan(outputPlan, sqlContext) } catch { case NonFatal(e) => val errorMessage = @@ -210,12 +210,12 @@ object SparkPlanTest { planFunction: Seq[SparkPlan] => SparkPlan, expectedAnswer: Seq[Row], sortAnswers: Boolean, - _sqlContext: SQLContext): Option[String] = { + sqlContext: SQLContext): Option[String] = { val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan)) val sparkAnswer: Seq[Row] = try { - executePlan(outputPlan, _sqlContext) + executePlan(outputPlan, sqlContext) } catch { case NonFatal(e) => val errorMessage = @@ -278,10 +278,10 @@ object SparkPlanTest { } } - private def executePlan(outputPlan: SparkPlan, _sqlContext: SQLContext): Seq[Row] = { + private def executePlan(outputPlan: SparkPlan, sqlContext: SQLContext): Seq[Row] = { // A very simple resolver to make writing tests easier. In contrast to the real resolver // this is always case sensitive and does not try to handle scoping or complex type resolution. - val resolvedPlan = _sqlContext.prepareForExecution.execute( + val resolvedPlan = sqlContext.prepareForExecution.execute( outputPlan transform { case plan: SparkPlan => val inputMap = plan.children.flatMap(_.output).map(a => (a.name, a)).toMap diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 2864181cf91d5..713d1da1cb515 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -21,10 +21,10 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext private[json] trait TestJsonData { - protected def _sqlContext: SQLContext + protected def sqlContext: SQLContext def primitiveFieldAndType: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"string":"this is a simple string.", "integer":10, "long":21474836470, @@ -35,7 +35,7 @@ private[json] trait TestJsonData { }""" :: Nil) def primitiveFieldValueTypeConflict: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1, "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" :: """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null, @@ -46,14 +46,14 @@ private[json] trait TestJsonData { "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil) def jsonNullStruct: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}""" :: """{"nullstr":"","ip":"27.31.100.29","headers":{}}""" :: """{"nullstr":"","ip":"27.31.100.29","headers":""}""" :: """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil) def complexFieldValueTypeConflict: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"num_struct":11, "str_array":[1, 2, 3], "array":[], "struct_array":[], "struct": {}}""" :: """{"num_struct":{"field":false}, "str_array":null, @@ -64,14 +64,14 @@ private[json] trait TestJsonData { "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil) def arrayElementTypeConflict: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}], "array2": [{"field":214748364700}, {"field":1}]}""" :: """{"array3": [{"field":"str"}, {"field":1}]}""" :: """{"array3": [1, 2, 3]}""" :: Nil) def missingFields: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"a":true}""" :: """{"b":21474836470}""" :: """{"c":[33, 44]}""" :: @@ -79,7 +79,7 @@ private[json] trait TestJsonData { """{"e":"str"}""" :: Nil) def complexFieldAndType1: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], @@ -95,7 +95,7 @@ private[json] trait TestJsonData { }""" :: Nil) def complexFieldAndType2: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "complexArrayOfStruct": [ { @@ -149,7 +149,7 @@ private[json] trait TestJsonData { }""" :: Nil) def mapType1: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"map": {"a": 1}}""" :: """{"map": {"b": 2}}""" :: """{"map": {"c": 3}}""" :: @@ -157,7 +157,7 @@ private[json] trait TestJsonData { """{"map": {"e": null}}""" :: Nil) def mapType2: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" :: """{"map": {"b": {"field2": 2}}}""" :: """{"map": {"c": {"field1": [], "field2": 4}}}""" :: @@ -166,21 +166,21 @@ private[json] trait TestJsonData { """{"map": {"f": {"field1": null}}}""" :: Nil) def nullsInArrays: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{"field1":[[null], [[["Test"]]]]}""" :: """{"field2":[null, [{"Test":1}]]}""" :: """{"field3":[[null], [{"Test":"2"}]]}""" :: """{"field4":[[null, [1,2,3]]]}""" :: Nil) def jsonArray: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """[{"a":"str_a_1"}]""" :: """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" :: """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: """[]""" :: Nil) def corruptRecords: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{""" :: """""" :: """{"a":1, b:2}""" :: @@ -189,7 +189,7 @@ private[json] trait TestJsonData { """]""" :: Nil) def emptyRecords: RDD[String] = - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( """{""" :: """""" :: """{"a": {}}""" :: @@ -198,7 +198,7 @@ private[json] trait TestJsonData { """]""" :: Nil) - lazy val singleRow: RDD[String] = _sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) + lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) - def empty: RDD[String] = _sqlContext.sparkContext.parallelize(Seq[String]()) + def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index 91f3ce4d34c8b..853339331913d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -39,12 +39,14 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq protected def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = { val fsPath = new Path(path) - val fs = fsPath.getFileSystem(configuration) + val fs = fsPath.getFileSystem(hadoopConfiguration) val parquetFiles = fs.listStatus(fsPath, new PathFilter { override def accept(path: Path): Boolean = pathFilter(path) }).toSeq.asJava - val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true) + + val footers = + ParquetFileReader.readAllFootersInParallel(hadoopConfiguration, parquetFiles, true) footers.asScala.head.getParquetMetadata.getFileMetaData.getSchema } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 08d2b9dee99b0..ed97a8dc4662f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -207,7 +207,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("compression codec") { def compressionCodecFor(path: String): String = { val codecs = ParquetTypesConverter - .readMetaData(new Path(path), Some(configuration)).getBlocks.asScala + .readMetaData(new Path(path), Some(hadoopConfiguration)).getBlocks.asScala .flatMap(_.getColumns.asScala) .map(_.getCodec.name()) .distinct @@ -277,14 +277,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("write metadata") { withTempPath { file => val path = new Path(file.toURI.toString) - val fs = FileSystem.getLocal(configuration) + val fs = FileSystem.getLocal(hadoopConfiguration) val attributes = ScalaReflection.attributesFor[(Int, String)] - ParquetTypesConverter.writeMetaData(attributes, path, configuration) + ParquetTypesConverter.writeMetaData(attributes, path, hadoopConfiguration) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration)) + val metaData = ParquetTypesConverter.readMetaData(path, Some(hadoopConfiguration)) val actualSchema = metaData.getFileMetaData.getSchema val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes) @@ -370,12 +370,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-6352 DirectParquetOutputCommitter") { - val clonedConf = new Configuration(configuration) + val clonedConf = new Configuration(hadoopConfiguration) // Write to a parquet file and let it fail. // _temporary should be missing if direct output committer works. try { - configuration.set("spark.sql.parquet.output.committer.class", + hadoopConfiguration.set("spark.sql.parquet.output.committer.class", classOf[DirectParquetOutputCommitter].getCanonicalName) sqlContext.udf.register("div0", (x: Int) => x / 0) withTempPath { dir => @@ -383,23 +383,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath) } val path = new Path(dir.getCanonicalPath, "_temporary") - val fs = path.getFileSystem(configuration) + val fs = path.getFileSystem(hadoopConfiguration) assert(!fs.exists(path)) } } finally { // Hadoop 1 doesn't have `Configuration.unset` - configuration.clear() - clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) } } test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") { - val clonedConf = new Configuration(configuration) + val clonedConf = new Configuration(hadoopConfiguration) // Write to a parquet file and let it fail. // _temporary should be missing if direct output committer works. try { - configuration.set("spark.sql.parquet.output.committer.class", + hadoopConfiguration.set("spark.sql.parquet.output.committer.class", "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") sqlContext.udf.register("div0", (x: Int) => x / 0) withTempPath { dir => @@ -407,25 +407,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath) } val path = new Path(dir.getCanonicalPath, "_temporary") - val fs = path.getFileSystem(configuration) + val fs = path.getFileSystem(hadoopConfiguration) assert(!fs.exists(path)) } } finally { // Hadoop 1 doesn't have `Configuration.unset` - configuration.clear() - clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) } } test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { withTempPath { dir => - val clonedConf = new Configuration(configuration) + val clonedConf = new Configuration(hadoopConfiguration) - configuration.set( + hadoopConfiguration.set( SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName) - configuration.set( + hadoopConfiguration.set( "spark.sql.parquet.output.committer.class", classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName) @@ -436,8 +436,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { assert(message === "Intentional exception for testing purposes") } finally { // Hadoop 1 doesn't have `Configuration.unset` - configuration.clear() - clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) } } } @@ -455,11 +455,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-7837 Do not close output writer twice when commitTask() fails") { - val clonedConf = new Configuration(configuration) + val clonedConf = new Configuration(hadoopConfiguration) // Using a output committer that always fail when committing a task, so that both // `commitTask()` and `abortTask()` are invoked. - configuration.set( + hadoopConfiguration.set( "spark.sql.parquet.output.committer.class", classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName) @@ -483,8 +483,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } finally { // Hadoop 1 doesn't have `Configuration.unset` - configuration.clear() - clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index b7b70c2bbbd5c..3e1e764757848 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils * A test suite that tests various Parquet queries. */ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext { + import testImplicits._ test("simple select queries") { withParquetTable((0 until 10).map(i => (i, i.toString)), "t") { @@ -203,8 +204,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } test("SPARK-10005 Schema merging for nested struct") { - val sqlContext = _sqlContext - import sqlContext.implicits._ withTempPath { dir => val path = dir.getCanonicalPath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 5dbc7d1630f27..809ccc793fc3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} * Especially, `Tuple1.apply` can be used to easily wrap a single type/value. */ private[sql] trait ParquetTest extends SQLTestUtils { - protected def _sqlContext: SQLContext + protected def sqlContext: SQLContext /** * Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f` @@ -43,7 +43,7 @@ private[sql] trait ParquetTest extends SQLTestUtils { (data: Seq[T]) (f: String => Unit): Unit = { withTempPath { file => - _sqlContext.createDataFrame(data).write.parquet(file.getCanonicalPath) + sqlContext.createDataFrame(data).write.parquet(file.getCanonicalPath) f(file.getCanonicalPath) } } @@ -55,7 +55,7 @@ private[sql] trait ParquetTest extends SQLTestUtils { protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag] (data: Seq[T]) (f: DataFrame => Unit): Unit = { - withParquetFile(data)(path => f(_sqlContext.read.parquet(path))) + withParquetFile(data)(path => f(sqlContext.read.parquet(path))) } /** @@ -67,14 +67,14 @@ private[sql] trait ParquetTest extends SQLTestUtils { (data: Seq[T], tableName: String) (f: => Unit): Unit = { withParquetDataFrame(data) { df => - _sqlContext.registerDataFrameAsTable(df, tableName) + sqlContext.registerDataFrameAsTable(df, tableName) withTempTable(tableName)(f) } } protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( data: Seq[T], path: File): Unit = { - _sqlContext.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath) + sqlContext.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath) } protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index 53a0e53fd7719..33242a7dc369a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.{SQLConf, SQLContext, QueryTest} */ class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll { private var sc: SparkContext = null - private var sqlContext: SQLContext = null + protected var sqlContext: SQLContext = null /** * Create a new [[SQLContext]] running in local-cluster mode with unsafe and codegen enabled. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 9bc3f6bcf6fce..9bf9518506602 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.util.Utils class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter { protected override lazy val sql = caseInsensitiveContext.sql _ - private lazy val sparkContext = caseInsensitiveContext.sparkContext private var path: File = null override def beforeAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index d74d29fb0beb0..c0dbb38699aa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -21,11 +21,10 @@ import org.apache.spark.sql._ private[sql] abstract class DataSourceTest extends QueryTest { - protected def _sqlContext: SQLContext // We want to test some edge cases. protected lazy val caseInsensitiveContext: SQLContext = { - val ctx = new SQLContext(_sqlContext.sparkContext) + val ctx = new SQLContext(sqlContext.sparkContext) ctx.setConf(SQLConf.CASE_SENSITIVE, false) ctx } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 084d83f6e9bff..154d6584c09fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.util.Utils class InsertSuite extends DataSourceTest with SharedSQLContext { protected override lazy val sql = caseInsensitiveContext.sql _ - private lazy val sparkContext = caseInsensitiveContext.sparkContext private var path: File = null override def beforeAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index f18546b4c2d9b..10d261368993d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.util.Utils class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter { protected override lazy val sql = caseInsensitiveContext.sql _ - private lazy val sparkContext = caseInsensitiveContext.sparkContext private var originalDefaultSource: String = null private var path: File = null private var df: DataFrame = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index 1374a97476ca1..e87bb0b3e7308 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -24,11 +24,11 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SQLImplicits} * A collection of sample data used in SQL tests. */ private[sql] trait SQLTestData { self => - protected def _sqlContext: SQLContext + protected def sqlContext: SQLContext // Helper object to import SQL implicits without a concrete SQLContext private object internalImplicits extends SQLImplicits { - protected override def _sqlContext: SQLContext = self._sqlContext + protected override def _sqlContext: SQLContext = self.sqlContext } import internalImplicits._ @@ -37,14 +37,14 @@ private[sql] trait SQLTestData { self => // Note: all test data should be lazy because the SQLContext is not set up yet. protected lazy val testData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( (1 to 100).map(i => TestData(i, i.toString))).toDF() df.registerTempTable("testData") df } protected lazy val testData2: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( TestData2(1, 1) :: TestData2(1, 2) :: TestData2(2, 1) :: @@ -56,7 +56,7 @@ private[sql] trait SQLTestData { self => } protected lazy val testData3: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( TestData3(1, None) :: TestData3(2, Some(2)) :: Nil).toDF() df.registerTempTable("testData3") @@ -64,14 +64,14 @@ private[sql] trait SQLTestData { self => } protected lazy val negativeData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( (1 to 100).map(i => TestData(-i, (-i).toString))).toDF() df.registerTempTable("negativeData") df } protected lazy val largeAndSmallInts: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( LargeAndSmallInts(2147483644, 1) :: LargeAndSmallInts(1, 2) :: LargeAndSmallInts(2147483645, 1) :: @@ -83,7 +83,7 @@ private[sql] trait SQLTestData { self => } protected lazy val decimalData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( DecimalData(1, 1) :: DecimalData(1, 2) :: DecimalData(2, 1) :: @@ -95,7 +95,7 @@ private[sql] trait SQLTestData { self => } protected lazy val binaryData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( BinaryData("12".getBytes, 1) :: BinaryData("22".getBytes, 5) :: BinaryData("122".getBytes, 3) :: @@ -106,7 +106,7 @@ private[sql] trait SQLTestData { self => } protected lazy val upperCaseData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( UpperCaseData(1, "A") :: UpperCaseData(2, "B") :: UpperCaseData(3, "C") :: @@ -118,7 +118,7 @@ private[sql] trait SQLTestData { self => } protected lazy val lowerCaseData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( LowerCaseData(1, "a") :: LowerCaseData(2, "b") :: LowerCaseData(3, "c") :: @@ -128,7 +128,7 @@ private[sql] trait SQLTestData { self => } protected lazy val arrayData: RDD[ArrayData] = { - val rdd = _sqlContext.sparkContext.parallelize( + val rdd = sqlContext.sparkContext.parallelize( ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 3))) :: ArrayData(Seq(2, 3, 4), Seq(Seq(2, 3, 4))) :: Nil) rdd.toDF().registerTempTable("arrayData") @@ -136,7 +136,7 @@ private[sql] trait SQLTestData { self => } protected lazy val mapData: RDD[MapData] = { - val rdd = _sqlContext.sparkContext.parallelize( + val rdd = sqlContext.sparkContext.parallelize( MapData(Map(1 -> "a1", 2 -> "b1", 3 -> "c1", 4 -> "d1", 5 -> "e1")) :: MapData(Map(1 -> "a2", 2 -> "b2", 3 -> "c2", 4 -> "d2")) :: MapData(Map(1 -> "a3", 2 -> "b3", 3 -> "c3")) :: @@ -147,13 +147,13 @@ private[sql] trait SQLTestData { self => } protected lazy val repeatedData: RDD[StringData] = { - val rdd = _sqlContext.sparkContext.parallelize(List.fill(2)(StringData("test"))) + val rdd = sqlContext.sparkContext.parallelize(List.fill(2)(StringData("test"))) rdd.toDF().registerTempTable("repeatedData") rdd } protected lazy val nullableRepeatedData: RDD[StringData] = { - val rdd = _sqlContext.sparkContext.parallelize( + val rdd = sqlContext.sparkContext.parallelize( List.fill(2)(StringData(null)) ++ List.fill(2)(StringData("test"))) rdd.toDF().registerTempTable("nullableRepeatedData") @@ -161,7 +161,7 @@ private[sql] trait SQLTestData { self => } protected lazy val nullInts: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( NullInts(1) :: NullInts(2) :: NullInts(3) :: @@ -171,7 +171,7 @@ private[sql] trait SQLTestData { self => } protected lazy val allNulls: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( NullInts(null) :: NullInts(null) :: NullInts(null) :: @@ -181,7 +181,7 @@ private[sql] trait SQLTestData { self => } protected lazy val nullStrings: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( NullStrings(1, "abc") :: NullStrings(2, "ABC") :: NullStrings(3, null) :: Nil).toDF() @@ -190,13 +190,13 @@ private[sql] trait SQLTestData { self => } protected lazy val tableName: DataFrame = { - val df = _sqlContext.sparkContext.parallelize(TableName("test") :: Nil).toDF() + val df = sqlContext.sparkContext.parallelize(TableName("test") :: Nil).toDF() df.registerTempTable("tableName") df } protected lazy val unparsedStrings: RDD[String] = { - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( "1, A1, true, null" :: "2, B2, false, null" :: "3, C3, true, null" :: @@ -205,13 +205,13 @@ private[sql] trait SQLTestData { self => // An RDD with 4 elements and 8 partitions protected lazy val withEmptyParts: RDD[IntField] = { - val rdd = _sqlContext.sparkContext.parallelize((1 to 4).map(IntField), 8) + val rdd = sqlContext.sparkContext.parallelize((1 to 4).map(IntField), 8) rdd.toDF().registerTempTable("withEmptyParts") rdd } protected lazy val person: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( Person(0, "mike", 30) :: Person(1, "jim", 20) :: Nil).toDF() df.registerTempTable("person") @@ -219,7 +219,7 @@ private[sql] trait SQLTestData { self => } protected lazy val salary: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( Salary(0, 2000.0) :: Salary(1, 1000.0) :: Nil).toDF() df.registerTempTable("salary") @@ -227,7 +227,7 @@ private[sql] trait SQLTestData { self => } protected lazy val complexData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1, 1, 1), true) :: ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2, 2, 2), false) :: Nil).toDF() @@ -239,7 +239,7 @@ private[sql] trait SQLTestData { self => * Initialize all test data such that all temp tables are properly registered. */ def loadTestData(): Unit = { - assert(_sqlContext != null, "attempted to initialize test data before SQLContext.") + assert(sqlContext != null, "attempted to initialize test data before SQLContext.") testData testData2 testData3 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index cdd691e035897..32fde239bb344 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -46,13 +46,15 @@ private[sql] trait SQLTestUtils with BeforeAndAfterAll with SQLTestData { self => - protected def _sqlContext: SQLContext + protected def sqlContext: SQLContext + + protected def sparkContext = sqlContext.sparkContext // Whether to materialize all test data before the first test is run private var loadTestDataBeforeTests = false // Shorthand for running a query using our SQLContext - protected lazy val sql = _sqlContext.sql _ + protected lazy val sql = sqlContext.sql _ /** * A helper object for importing SQL implicits. @@ -62,7 +64,7 @@ private[sql] trait SQLTestUtils * but the implicits import is needed in the constructor. */ protected object testImplicits extends SQLImplicits { - protected override def _sqlContext: SQLContext = self._sqlContext + protected override def _sqlContext: SQLContext = self.sqlContext } /** @@ -83,8 +85,8 @@ private[sql] trait SQLTestUtils /** * The Hadoop configuration used by the active [[SQLContext]]. */ - protected def configuration: Configuration = { - _sqlContext.sparkContext.hadoopConfiguration + protected def hadoopConfiguration: Configuration = { + sqlContext.sparkContext.hadoopConfiguration } /** @@ -95,12 +97,12 @@ private[sql] trait SQLTestUtils */ protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { val (keys, values) = pairs.unzip - val currentValues = keys.map(key => Try(_sqlContext.conf.getConfString(key)).toOption) - (keys, values).zipped.foreach(_sqlContext.conf.setConfString) + val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption) + (keys, values).zipped.foreach(sqlContext.conf.setConfString) try f finally { keys.zip(currentValues).foreach { - case (key, Some(value)) => _sqlContext.conf.setConfString(key, value) - case (key, None) => _sqlContext.conf.unsetConf(key) + case (key, Some(value)) => sqlContext.conf.setConfString(key, value) + case (key, None) => sqlContext.conf.unsetConf(key) } } } @@ -132,7 +134,7 @@ private[sql] trait SQLTestUtils * Drops temporary table `tableName` after calling `f`. */ protected def withTempTable(tableNames: String*)(f: => Unit): Unit = { - try f finally tableNames.foreach(_sqlContext.dropTempTable) + try f finally tableNames.foreach(sqlContext.dropTempTable) } /** @@ -141,7 +143,7 @@ private[sql] trait SQLTestUtils protected def withTable(tableNames: String*)(f: => Unit): Unit = { try f finally { tableNames.foreach { name => - _sqlContext.sql(s"DROP TABLE IF EXISTS $name") + sqlContext.sql(s"DROP TABLE IF EXISTS $name") } } } @@ -154,12 +156,12 @@ private[sql] trait SQLTestUtils val dbName = s"db_${UUID.randomUUID().toString.replace('-', '_')}" try { - _sqlContext.sql(s"CREATE DATABASE $dbName") + sqlContext.sql(s"CREATE DATABASE $dbName") } catch { case cause: Throwable => fail("Failed to create temporary database", cause) } - try f(dbName) finally _sqlContext.sql(s"DROP DATABASE $dbName CASCADE") + try f(dbName) finally sqlContext.sql(s"DROP DATABASE $dbName CASCADE") } /** @@ -167,8 +169,8 @@ private[sql] trait SQLTestUtils * `f` returns. */ protected def activateDatabase(db: String)(f: => Unit): Unit = { - _sqlContext.sql(s"USE $db") - try f finally _sqlContext.sql(s"USE default") + sqlContext.sql(s"USE $db") + try f finally sqlContext.sql(s"USE default") } /** @@ -176,6 +178,6 @@ private[sql] trait SQLTestUtils * way to construct [[DataFrame]] directly out of local data without relying on implicits. */ protected implicit def logicalPlanToSparkQuery(plan: LogicalPlan): DataFrame = { - DataFrame(_sqlContext, plan) + DataFrame(sqlContext, plan) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index d23c6a0732669..d2d2d83335402 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -37,8 +37,7 @@ trait SharedSQLContext extends SQLTestUtils { * The [[TestSQLContext]] to use for all tests in this suite. */ protected def ctx: TestSQLContext = _ctx - protected def sqlContext: TestSQLContext = _ctx - protected override def _sqlContext: SQLContext = _ctx + protected def sqlContext: SQLContext = _ctx /** * Initialize the [[TestSQLContext]]. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 92ef2f7d74ba1..d99d191ebe81e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -47,6 +47,6 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel } private object testData extends SQLTestData { - protected override def _sqlContext: SQLContext = self + protected override def sqlContext: SQLContext = self } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 57fea5d8db343..22705dcd29bc0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe -import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.{SQLContext, SQLConf} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.CacheTableCommand @@ -51,6 +51,11 @@ object TestHive // SPARK-8910 .set("spark.ui.enabled", "false"))) +trait TestHiveSingleton { + protected def sqlContext: SQLContext = TestHive + protected def hiveContext: HiveContext = TestHive +} + /** * A locally running test instance of Spark's Hive execution engine. * diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 39d315aaeab57..424046d601f61 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest} import org.apache.spark.storage.RDDBlockId import org.apache.spark.util.Utils -class CachedTableSuite extends QueryTest { +class CachedTableSuite extends QueryTest with TestHiveSingleton { def rddIdOf(tableName: String): Int = { val executedPlan = table(tableName).queryExecution.executedPlan diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala index 30f5313d2b812..cc8ebe9ae64e4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import org.apache.spark.sql.hive.test.TestHiveSingleton + import scala.util.Try import org.scalatest.BeforeAndAfter @@ -27,7 +29,7 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.{AnalysisException, QueryTest} -class ErrorPositionSuite extends QueryTest with BeforeAndAfter { +class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter { before { Seq((1, 1, 1)).toDF("a", "a", "b").registerTempTable("dupAttributes") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala index fb10f8583da99..63d3f2ee87e82 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{DataFrame, QueryTest} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.scalatest.BeforeAndAfterAll @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll // TODO ideally we should put the test suite into the package `sql`, as // `hive` package is optional in compiling, however, `SQLContext.sql` doesn't // support the `cube` or `rollup` yet. -class HiveDataFrameAnalyticsSuite extends QueryTest with BeforeAndAfterAll { +class HiveDataFrameAnalyticsSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { private var testData: DataFrame = _ override def beforeAll() { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala index 52e782768cb75..03bc9f5838aa2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.hive +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.{Row, QueryTest} import org.apache.spark.sql.hive.test.TestHive.implicits._ -class HiveDataFrameJoinSuite extends QueryTest { +class HiveDataFrameJoinSuite extends QueryTest with TestHiveSingleton { // We should move this into SQL package if we make case sensitivity configurable in SQL. test("join - self join auto resolve ambiguity with case insensitivity") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala index c177cbdd991cf..c288074279ad8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.hive +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.{Row, QueryTest} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -class HiveDataFrameWindowSuite extends QueryTest { +class HiveDataFrameWindowSuite extends QueryTest with TestHiveSingleton { test("reuse window partitionBy") { val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 574624d501f22..2d08d392402b2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.sources.DataSourceTest @@ -52,8 +52,10 @@ class HiveMetastoreCatalogSuite extends SparkFunSuite with Logging { } } -class DataSourceWithHiveMetastoreCatalogSuite extends DataSourceTest with SQLTestUtils { - override def _sqlContext: SQLContext = TestHive +class DataSourceWithHiveMetastoreCatalogSuite + extends DataSourceTest with TestHiveSingleton with SQLTestUtils { + + override def sqlContext: SQLContext = TestHive import testImplicits._ private val testDF = range(1, 3).select( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index fe0db5228de16..dae3c9ee32757 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -17,15 +17,15 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.execution.datasources.parquet.ParquetTest import org.apache.spark.sql.{QueryTest, Row, SQLContext} case class Cases(lower: String, UPPER: String) -class HiveParquetSuite extends QueryTest with ParquetTest { +class HiveParquetSuite extends QueryTest with TestHiveSingleton with ParquetTest { private val ctx = TestHive - override def _sqlContext: SQLContext = ctx + override def sqlContext: SQLContext = ctx test("Case insensitive attribute names") { withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index dc2d85f48624c..cc3855653cff6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ import org.apache.spark._ -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{SQLContext, QueryTest} import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.sql.types.DecimalType @@ -273,6 +273,10 @@ object SparkSQLConfTest extends Logging { } object SPARK_9757 extends QueryTest with Logging { + import org.apache.spark.sql.functions._ + + var sqlContext: SQLContext = _ + def main(args: Array[String]): Unit = { Utils.configTestLog4j("INFO") @@ -282,9 +286,9 @@ object SPARK_9757 extends QueryTest with Logging { .set("spark.sql.hive.metastore.jars", "maven")) val hiveContext = new TestHiveContext(sparkContext) - import hiveContext.implicits._ + sqlContext = hiveContext - import org.apache.spark.sql.functions._ + import hiveContext.implicits._ val dir = Utils.createTempDir() dir.delete() @@ -292,24 +296,24 @@ object SPARK_9757 extends QueryTest with Logging { try { { val df = - hiveContext + sqlContext .range(10) .select(('id + 0.1) cast DecimalType(10, 3) as 'dec) df.write.option("path", dir.getCanonicalPath).mode("overwrite").saveAsTable("t") - checkAnswer(hiveContext.table("t"), df) + checkAnswer(sqlContext.table("t"), df) } { val df = - hiveContext + sqlContext .range(10) .select(callUDF("struct", ('id + 0.2) cast DecimalType(10, 3)) as 'dec_struct) df.write.option("path", dir.getCanonicalPath).mode("overwrite").saveAsTable("t") - checkAnswer(hiveContext.table("t"), df) + checkAnswer(sqlContext.table("t"), df) } } finally { dir.delete() - hiveContext.sql("DROP TABLE t") + sqlContext.sql("DROP TABLE t") sparkContext.stop() } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index d33e81227db88..34dc72f3ed853 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.{QueryTest, _} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -35,7 +35,7 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) -class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { +class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter { import org.apache.spark.sql.hive.test.TestHive.implicits._ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index d3388a9429e41..e3f75b44f3549 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.hive import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row -class ListTablesSuite extends QueryTest with BeforeAndAfterAll { +class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { import org.apache.spark.sql.hive.test.TestHive.implicits._ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 20a50586d5201..03425e33db92f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.Logging import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation @@ -39,10 +39,8 @@ import org.apache.spark.util.Utils /** * Tests for persisting tables created though the data sources API into the metastore. */ -class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeAndAfterAll - with Logging { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class MetastoreDataSourcesSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterAll with Logging { var jsonFilePath: String = _ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 997c667ec0d1b..efadc5ab1f7f6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -17,20 +17,17 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.{AnalysisException, QueryTest, SQLContext, SaveMode} -class MultiDatabaseSuite extends QueryTest with SQLTestUtils { - override val _sqlContext: HiveContext = TestHive - private val sqlContext = _sqlContext +class MultiDatabaseSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - private val df = sqlContext.range(10).coalesce(1) + private lazy val df = sqlContext.range(10).coalesce(1) private def checkTablePath(dbName: String, tableName: String): Unit = { - // val hiveContext = sqlContext.asInstanceOf[HiveContext] - val metastoreTable = sqlContext.catalog.client.getTable(dbName, tableName) - val expectedPath = sqlContext.catalog.client.getDatabase(dbName).location + "/" + tableName + val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName) + val expectedPath = hiveContext.catalog.client.getDatabase(dbName).location + "/" + tableName assert(metastoreTable.serdeProperties("path") === expectedPath) } @@ -220,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils { df.write.parquet(s"$path/p=2") sql("ALTER TABLE t ADD PARTITION (p=2)") - sqlContext.refreshTable("t") + hiveContext.refreshTable("t") checkAnswer( sqlContext.table("t"), df.withColumn("p", lit(1)).unionAll(df.withColumn("p", lit(2)))) @@ -252,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils { df.write.parquet(s"$path/p=2") sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)") - sqlContext.refreshTable(s"$db.t") + hiveContext.refreshTable(s"$db.t") checkAnswer( sqlContext.table(s"$db.t"), df.withColumn("p", lit(1)).unionAll(df.withColumn("p", lit(2)))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 91d7a48208e8d..49fb1301f8cec 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -23,13 +23,12 @@ import java.util.{Locale, TimeZone} import org.apache.hadoop.hive.conf.HiveConf import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.{Row, SQLConf, SQLContext} +import org.apache.spark.sql.{Row, SQLConf} -class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class ParquetHiveCompatibilitySuite + extends ParquetCompatibilityTest with TestHiveSingleton with BeforeAndAfterAll { /** * Set the staging directory (and hence path to ignore Parquet files under) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 1cc8a93e83411..ef8cb7d03e023 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -18,22 +18,20 @@ package org.apache.spark.sql.hive import com.google.common.io.Files -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.{QueryTest, _} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql._ +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.util.Utils -class QueryPartitionSuite extends QueryTest with SQLTestUtils { - - private lazy val ctx = org.apache.spark.sql.hive.test.TestHive - import ctx.implicits._ +class QueryPartitionSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - protected def _sqlContext = ctx + import testImplicits._ test("SPARK-5068: query data when path doesn't exist"){ withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) { - val testData = ctx.sparkContext.parallelize( + val testData = sparkContext.parallelize( (1 to 10).map(i => TestData(i, i.toString))).toDF() testData.registerTempTable("testData") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index e4fec7e2c8a2a..77a8150f1dfd0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils import org.scalatest.BeforeAndAfterAll import scala.reflect.ClassTag @@ -25,16 +27,8 @@ import org.apache.spark.sql.{Row, SQLConf, QueryTest} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.execution._ -class StatisticsSuite extends QueryTest with BeforeAndAfterAll { - - private lazy val ctx: HiveContext = { - val ctx = org.apache.spark.sql.hive.test.TestHive - ctx.reset() - ctx.cacheTables = false - ctx - } - - import ctx.sql +class StatisticsSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterAll { test("parse analyze commands") { def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { @@ -54,9 +48,6 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { } } - // Ensure session state is initialized. - ctx.parseSql("use default") - assertAnalyzeCommand( "ANALYZE TABLE Table1 COMPUTE STATISTICS", classOf[HiveNativeCommand]) @@ -80,7 +71,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - ctx.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes + sqlContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -114,7 +105,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { |SELECT * FROM src """.stripMargin).collect() - assert(queryTotalSize("analyzeTable_part") === ctx.conf.defaultSizeInBytes) + assert(queryTotalSize("analyzeTable_part") === sqlContext.conf.defaultSizeInBytes) sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan") @@ -125,9 +116,9 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { // Try to analyze a temp table sql("""SELECT * FROM src""").registerTempTable("tempTable") intercept[UnsupportedOperationException] { - ctx.analyze("tempTable") + hiveContext.analyze("tempTable") } - ctx.catalog.unregisterTable(Seq("tempTable")) + sqlContext.catalog.unregisterTable(Seq("tempTable")) } test("estimates the size of a test MetastoreRelation") { @@ -155,8 +146,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { val sizes = df.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(0) <= ctx.conf.autoBroadcastJoinThreshold - && sizes(1) <= ctx.conf.autoBroadcastJoinThreshold, + assert(sizes.size === 2 && sizes(0) <= sqlContext.conf.autoBroadcastJoinThreshold + && sizes(1) <= sqlContext.conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -167,8 +158,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { checkAnswer(df, expectedAnswer) // check correctness of output - ctx.conf.settings.synchronized { - val tmp = ctx.conf.autoBroadcastJoinThreshold + sqlContext.conf.settings.synchronized { + val tmp = sqlContext.conf.autoBroadcastJoinThreshold sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""") df = sql(query) @@ -211,8 +202,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { .isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(1) <= ctx.conf.autoBroadcastJoinThreshold - && sizes(0) <= ctx.conf.autoBroadcastJoinThreshold, + assert(sizes.size === 2 && sizes(1) <= sqlContext.conf.autoBroadcastJoinThreshold + && sizes(0) <= sqlContext.conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -225,8 +216,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { checkAnswer(df, answer) // check correctness of output - ctx.conf.settings.synchronized { - val tmp = ctx.conf.autoBroadcastJoinThreshold + sqlContext.conf.settings.synchronized { + val tmp = sqlContext.conf.autoBroadcastJoinThreshold sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1") df = sql(leftSemiJoinQuery) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala index 7ee1c8d13aa3f..4dff32ee47caa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHiveSingleton case class FunctionResult(f1: String, f2: String) -class UDFSuite extends QueryTest { +class UDFSuite extends QueryTest with TestHiveSingleton { private lazy val ctx = org.apache.spark.sql.hive.test.TestHive test("UDF case insensitive") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 4886a85948367..2a7bdcfa2e282 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -21,15 +21,15 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.execution.aggregate -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.hive.aggregate.{MyDoubleAvg, MyDoubleSum} -abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with BeforeAndAfterAll { - override def _sqlContext: SQLContext = TestHive - protected val sqlContext = _sqlContext - import sqlContext.implicits._ +abstract class AggregationQuerySuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterAll { + + import testImplicits._ var originalUseAggregate2: Boolean = _ @@ -597,7 +597,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue sqlContext.conf.unsetConf("spark.sql.TungstenAggregate.testFallbackStartsAt") } - override protected def checkAnswer(actual: DataFrame, expectedAnswer: Seq[Row]): Unit = { + override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = { (0 to 2).foreach { fallbackStartsAt => sqlContext.setConf( "spark.sql.TungstenAggregate.testFallbackStartsAt", @@ -626,12 +626,12 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue } // Override it to make sure we call the actually overridden checkAnswer. - override protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = { + override protected def checkAnswer(df: => DataFrame, expectedAnswer: Row): Unit = { checkAnswer(df, Seq(expectedAnswer)) } // Override it to make sure we call the actually overridden checkAnswer. - override protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = { + override protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = { checkAnswer(df, expectedAnswer.collect()) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index 11d7a872dff09..0e589d90d0ca9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -18,16 +18,14 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{SQLContext, QueryTest} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.test.SQLTestUtils /** * A set of tests that validates support for Hive Explain command. */ -class HiveExplainSuite extends QueryTest with SQLTestUtils { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class HiveExplainSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { test("explain extended command") { checkExistence(sql(" explain select * from src where key=123 "), true, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala index efbef68cd4447..c7ef31cbfaf21 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.{Row, QueryTest} import org.apache.spark.sql.hive.test.TestHive._ /** * A set of tests that validates commands can also be queried by like a table */ -class HiveOperatorQueryableSuite extends QueryTest { +class HiveOperatorQueryableSuite extends QueryTest with TestHiveSingleton { test("SPARK-5324 query result of describe command") { loadTestTable("src") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala index ba56a8a6b689c..ef482d8469d6c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} -class HivePlanTest extends QueryTest { +class HivePlanTest extends QueryTest with TestHiveSingleton { import TestHive._ import TestHive.implicits._ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 9c10ffe1113dc..4e8268c7412ff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -27,8 +27,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.util.Utils @@ -43,10 +44,10 @@ case class ListStringCaseClass(l: Seq[String]) /** * A test suite for Hive custom UDFs. */ -class HiveUDFSuite extends QueryTest { +class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { + import testImplicits._ - import TestHive.{udf, sql} - import TestHive.implicits._ + private def udf = sqlContext.udf test("spark sql udf test that returns a struct") { udf.register("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1ff1d9a2934cc..2bac17621e7e4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries} import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation} @@ -65,9 +65,7 @@ class MyDialect extends DefaultParserDialect * Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is * valid, but Hive currently cannot execute it. */ -class SQLQuerySuite extends QueryTest with SQLTestUtils { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class SQLQuerySuite extends QueryTest with TestHiveSingleton with SQLTestUtils { test("UDTF") { sql(s"ADD JAR ${TestHive.getHiveFile("TestUDTF.jar").getCanonicalPath()}") @@ -509,19 +507,20 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { checkAnswer( sql("SELECT f1.f2.f3 FROM nested"), Row(1)) - checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"), - Seq.empty[Row]) + + sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested") checkAnswer( sql("SELECT * FROM test_ctas_1234"), sql("SELECT * FROM nested").collect().toSeq) intercept[AnalysisException] { - sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect() + sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect() } } test("test CTAS") { - checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row]) + sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src") + checkAnswer( sql("SELECT key, value FROM test_ctas_123 ORDER BY key"), sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 9aca40f15ac15..068fdb37ae8d7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -26,13 +26,10 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.{UnaryNode, SparkPlan, SparkPlanTest} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.types.StringType -class ScriptTransformationSuite extends SparkPlanTest { - - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { private val noSerdeIOSchema = HiveScriptIOSchema( inputRowFormat = Seq.empty, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index deec0048d24b8..c490cb728811a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -24,10 +24,9 @@ import org.apache.spark.sql.sources.HadoopFsRelationTest import org.apache.spark.sql.types._ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { - override val dataSourceName: String = classOf[DefaultSource].getCanonicalName + import testImplicits._ - import sqlContext._ - import sqlContext.implicits._ + override val dataSourceName: String = classOf[DefaultSource].getCanonicalName test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => @@ -48,7 +47,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - read.options(Map( + sqlContext.read.options(Map( "path" -> file.getCanonicalPath, "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load()) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala index a46ca9a2c9706..3e0884ae5d1eb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.util.Utils @@ -38,7 +38,7 @@ case class OrcParData(intField: Int, stringField: String) case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) // TODO This test suite duplicates ParquetPartitionDiscoverySuite a lot -class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll { +class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { val defaultPartitionName = ConfVars.DEFAULTPARTITIONNAME.defaultStrVal def withTempDir(f: File => Unit): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 80c38084f293d..2f5ee277e17b8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.orc import java.io.File +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.test.TestHive._ @@ -26,7 +27,7 @@ import org.apache.spark.sql.{QueryTest, Row} case class OrcData(intField: Int, stringField: String) -abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { +abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { var orcTableDir: File = null var orcTableAsDir: File = null diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala index f7ba20ff41d8d..5ca2db31ff618 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.orc import java.io.File +import org.apache.spark.sql.hive.test.TestHiveSingleton + import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -26,11 +28,8 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.test.SQLTestUtils -private[sql] trait OrcTest extends SQLTestUtils { this: SparkFunSuite => - protected override def _sqlContext: SQLContext = org.apache.spark.sql.hive.test.TestHive - protected val sqlContext = _sqlContext - import sqlContext.implicits._ - import sqlContext.sparkContext +private[sql] trait OrcTest extends SQLTestUtils with TestHiveSingleton { this: SparkFunSuite => + import testImplicits._ /** * Writes `data` to a Orc file, which is then passed to `f` and will be deleted after `f` diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 34d3434569f58..3f4170c0e87f0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation @@ -684,9 +684,8 @@ class ParquetSourceSuite extends ParquetPartitioningTest { /** * A collection of tests for parquet data with various forms of partitioning. */ -abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with BeforeAndAfterAll { - override def _sqlContext: SQLContext = TestHive - protected val sqlContext = _sqlContext +abstract class ParquetPartitioningTest + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterAll { var partitionedTableDir: File = null var normalTableDir: File = null diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala index b4640b1616281..846b2400bb157 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala @@ -21,13 +21,12 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.test.SQLTestUtils -class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class CommitFailureTestRelationSuite + extends SparkFunSuite with TestHiveSingleton with SQLTestUtils { // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose. val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index 8ca3a17085194..0ebb79066b8f2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = "json" - import sqlContext._ + import testImplicits._ test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => @@ -47,7 +47,7 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - read.format(dataSourceName) + sqlContext.read.format(dataSourceName) .option("dataSchema", dataSchemaWithPartition.json) .load(file.getCanonicalPath)) } @@ -65,14 +65,14 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { val data = Row(Seq(1L, 2L, 3L), Map("m1" -> Row(4L))) :: Row(Seq(5L, 6L, 7L), Map("m2" -> Row(10L))) :: Nil - val df = createDataFrame(sparkContext.parallelize(data), schema) + val df = sqlContext.createDataFrame(sparkContext.parallelize(data), schema) // Write the data out. df.write.format(dataSourceName).save(file.getCanonicalPath) // Read it back and check the result. checkAnswer( - read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), + sqlContext.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), df ) } @@ -90,14 +90,14 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { Row(new BigDecimal("10.02")) :: Row(new BigDecimal("20000.99")) :: Row(new BigDecimal("10000")) :: Nil - val df = createDataFrame(sparkContext.parallelize(data), schema) + val df = sqlContext.createDataFrame(sparkContext.parallelize(data), schema) // Write the data out. df.write.format(dataSourceName).save(file.getCanonicalPath) // Read it back and check the result. checkAnswer( - read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), + sqlContext.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), df ) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index cb4cedddbfddd..8be82b7e793b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -26,12 +26,10 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} - class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { - override val dataSourceName: String = "parquet" + import testImplicits._ - import sqlContext._ - import sqlContext.implicits._ + override val dataSourceName: String = "parquet" test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => @@ -51,7 +49,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - read.format(dataSourceName) + sqlContext.read.format(dataSourceName) .option("dataSchema", dataSchemaWithPartition.json) .load(file.getCanonicalPath)) } @@ -69,7 +67,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { .format("parquet") .save(s"${dir.getCanonicalPath}/_temporary") - checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect()) + checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df.collect()) } } @@ -97,7 +95,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { // This shouldn't throw anything. df.write.format("parquet").mode(SaveMode.Overwrite).save(path) - checkAnswer(read.format("parquet").load(path), df) + checkAnswer(sqlContext.read.format("parquet").load(path), df) } } @@ -107,7 +105,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { // Parquet doesn't allow field names with spaces. Here we are intentionally making an // exception thrown from the `ParquetRelation2.prepareForWriteJob()` method to trigger // the bug. Please refer to spark-8079 for more details. - range(1, 10) + sqlContext.range(1, 10) .withColumnRenamed("id", "a b") .write .format("parquet") @@ -125,7 +123,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { val summaryPath = new Path(path, "_metadata") val commonSummaryPath = new Path(path, "_common_metadata") - val fs = summaryPath.getFileSystem(configuration) + val fs = summaryPath.getFileSystem(hadoopConfiguration) fs.delete(summaryPath, true) fs.delete(commonSummaryPath, true) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index e8975e5f5cd08..1d93020c7f20c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName - import sqlContext._ - test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) @@ -44,7 +42,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - read.format(dataSourceName) + sqlContext.read.format(dataSourceName) .option("dataSchema", dataSchemaWithPartition.json) .load(file.getCanonicalPath)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 7966b43596e75..dcb7c865070bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -28,15 +28,13 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHive} import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ -abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { - override def _sqlContext: SQLContext = TestHive - protected val sqlContext = _sqlContext - import sqlContext.implicits._ +abstract class HadoopFsRelationTest extends QueryTest with TestHiveSingleton with SQLTestUtils { + import testImplicits._ val dataSourceName: String @@ -504,17 +502,17 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } test("SPARK-8578 specified custom output committer will not be used to append data") { - val clonedConf = new Configuration(configuration) + val clonedConf = new Configuration(hadoopConfiguration) try { val df = sqlContext.range(1, 10).toDF("i") withTempPath { dir => df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - configuration.set( + hadoopConfiguration.set( SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[AlwaysFailOutputCommitter].getName) // Since Parquet has its own output committer setting, also set it // to AlwaysFailParquetOutputCommitter at here. - configuration.set("spark.sql.parquet.output.committer.class", + hadoopConfiguration.set("spark.sql.parquet.output.committer.class", classOf[AlwaysFailParquetOutputCommitter].getName) // Because there data already exists, // this append should succeed because we will use the output committer associated @@ -533,12 +531,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } } withTempPath { dir => - configuration.set( + hadoopConfiguration.set( SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[AlwaysFailOutputCommitter].getName) // Since Parquet has its own output committer setting, also set it // to AlwaysFailParquetOutputCommitter at here. - configuration.set("spark.sql.parquet.output.committer.class", + hadoopConfiguration.set("spark.sql.parquet.output.committer.class", classOf[AlwaysFailParquetOutputCommitter].getName) // Because there is no existing data, // this append will fail because AlwaysFailOutputCommitter is used when we do append @@ -549,8 +547,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } } finally { // Hadoop 1 doesn't have `Configuration.unset` - configuration.clear() - clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) } } @@ -570,7 +568,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } test("SPARK-9899 Disable customized output committer when speculation is on") { - val clonedConf = new Configuration(configuration) + val clonedConf = new Configuration(hadoopConfiguration) val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) @@ -580,7 +578,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { sqlContext.sparkContext.conf.set("spark.speculation", "true") // Uses a customized output committer which always fails - configuration.set( + hadoopConfiguration.set( SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[AlwaysFailOutputCommitter].getName) @@ -597,8 +595,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } } finally { // Hadoop 1 doesn't have `Configuration.unset` - configuration.clear() - clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) } }