From 6d7bc5af454341f6d9bfc1e903148ad7ba8de6f9 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 6 Sep 2018 23:35:02 -0700 Subject: [PATCH] [SPARK-25267][SQL][TEST] Disable ConvertToLocalRelation in the test cases of sql/core and sql/hive ## What changes were proposed in this pull request? In SharedSparkSession and TestHive, we need to disable the rule ConvertToLocalRelation for better test case coverage. ## How was this patch tested? Identify the failures after excluding "ConvertToLocalRelation" rule. Closes #22270 from dilipbiswal/SPARK-25267-final. Authored-by: Dilip Biswal Signed-off-by: gatorsmile --- .../scala/org/apache/spark/ml/util/MLTest.scala | 10 +++++++++- .../sql-tests/inputs/group-by-ordinal.sql | 4 +++- .../sql-tests/results/group-by-ordinal.sql.out | 4 +++- .../apache/spark/sql/DataFrameAggregateSuite.scala | 2 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 14 ++++++++------ .../org/apache/spark/sql/DataFrameSuite.scala | 5 ++--- .../apache/spark/sql/test/SharedSparkSession.scala | 6 ++++++ .../org/apache/spark/sql/hive/test/TestHive.scala | 8 +++++++- 8 files changed, 39 insertions(+), 14 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala index 76d41f9b23715..acac171346a85 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala @@ -21,12 +21,13 @@ import java.io.File import org.scalatest.Suite -import org.apache.spark.SparkContext +import org.apache.spark.{DebugFilesystem, SparkConf, SparkContext} import org.apache.spark.ml.{PredictionModel, Transformer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.test.TestSparkSession import org.apache.spark.util.Utils @@ -36,6 +37,13 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite => @transient var sc: SparkContext = _ @transient var checkpointDir: String = _ + protected override def sparkConf = { + new SparkConf() + .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + } + protected override def createSparkSession: TestSparkSession = { new TestSparkSession(new SparkContext("local[2]", "MLlibUnitTest", sparkConf)) } diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql index 928f766b4add2..3144833b608be 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql @@ -38,7 +38,9 @@ select a, b, sum(b) from data group by 3; select a, b, sum(b) + 2 from data group by 3; -- negative case: nondeterministic expression -select a, rand(0), sum(b) from data group by a, 2; +select a, rand(0), sum(b) +from +(select /*+ REPARTITION(1) */ a, b from data) group by a, 2; -- negative case: star select * from data group by a, b, 1; diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out index 9ecbe19078dd6..cf5add6a71af2 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out @@ -135,7 +135,9 @@ aggregate functions are not allowed in GROUP BY, but found (sum(CAST(data.`b` AS -- !query 13 -select a, rand(0), sum(b) from data group by a, 2 +select a, rand(0), sum(b) +from +(select /*+ REPARTITION(1) */ a, b from data) group by a, 2 -- !query 13 schema struct -- !query 13 output diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 85b3ca11383f7..ed110f751645d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -558,7 +558,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { test("SPARK-18004 limit + aggregates") { withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") { - val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value") + val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value").repartition(1) val limit2Df = df.limit(2) checkAnswer( limit2Df.groupBy("id").count().select($"id"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 156e54300e38b..4b83e51fa8992 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -85,14 +85,16 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { } val df5 = Seq((Seq("a", null), Seq(1, 2))).toDF("k", "v") - intercept[RuntimeException] { + val msg1 = intercept[Exception] { df5.select(map_from_arrays($"k", $"v")).collect - } + }.getMessage + assert(msg1.contains("Cannot use null as map key!")) val df6 = Seq((Seq(1, 2), Seq("a"))).toDF("k", "v") - intercept[RuntimeException] { + val msg2 = intercept[Exception] { df6.select(map_from_arrays($"k", $"v")).collect - } + }.getMessage + assert(msg2.contains("The given two arrays should have the same length")) } test("struct with column name") { @@ -2377,7 +2379,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(ex2.getMessage.contains( "The number of lambda function arguments '3' does not match")) - val ex3 = intercept[RuntimeException] { + val ex3 = intercept[Exception] { dfExample1.selectExpr("transform_keys(i, (k, v) -> v)").show() } assert(ex3.getMessage.contains("Cannot use null as map key!")) @@ -2697,7 +2699,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { test("SPARK-24734: Fix containsNull of Concat for array type") { val df = Seq((Seq(1), Seq[Integer](null), Seq("a", "b"))).toDF("k1", "k2", "v") - val ex = intercept[RuntimeException] { + val ex = intercept[Exception] { df.select(map_from_arrays(concat($"k1", $"k2"), $"v")).show() } assert(ex.getMessage.contains("Cannot use null as map key")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index d43fcf3c6f5de..45b17b3d4958f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContex import org.apache.spark.sql.test.SQLTestData.{NullInts, NullStrings, TestData2} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +import org.apache.spark.util.random.XORShiftRandom class DataFrameSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -1729,10 +1730,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-9083: sort with non-deterministic expressions") { - import org.apache.spark.util.random.XORShiftRandom - val seed = 33 - val df = (1 to 100).map(Tuple1.apply).toDF("i") + val df = (1 to 100).map(Tuple1.apply).toDF("i").repartition(1) val random = new XORShiftRandom(seed) val expected = (1 to 100).map(_ -> random.nextDouble()).sortBy(_._2).map(_._1) val actual = df.sort(rand(seed)).collect().map(_.getInt(0)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index 8968dbf36d507..e7e0ce64963a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.internal.SQLConf /** @@ -39,6 +40,11 @@ trait SharedSparkSession .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set(SQLConf.CODEGEN_FALLBACK.key, "false") + // Disable ConvertToLocalRelation for better test coverage. Test cases built on + // LocalRelation will exercise the optimization rules better by disabling it as + // this rule may potentially block testing of other optimization rules such as + // ConstantPropagation etc. + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index ee3f99ab7e9bb..71f15a45d162a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -36,6 +36,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener} +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.CacheTableCommand @@ -59,7 +60,12 @@ object TestHive .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath) // SPARK-8910 .set("spark.ui.enabled", "false") - .set("spark.unsafe.exceptionOnMemoryLeak", "true"))) + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + // Disable ConvertToLocalRelation for better test coverage. Test cases built on + // LocalRelation will exercise the optimization rules better by disabling it as + // this rule may potentially block testing of other optimization rules such as + // ConstantPropagation etc. + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName))) case class TestHiveVersion(hiveClient: HiveClient)