Skip to content

Commit

Permalink
[SPARK-25267][SQL][TEST] Disable ConvertToLocalRelation in the test c…
Browse files Browse the repository at this point in the history
…ases of sql/core and sql/hive

## What changes were proposed in this pull request?
In SharedSparkSession and TestHive, we need to disable the rule ConvertToLocalRelation for better test case coverage.
## How was this patch tested?
Identify the failures after excluding "ConvertToLocalRelation" rule.

Closes #22270 from dilipbiswal/SPARK-25267-final.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
dilipbiswal authored and gatorsmile committed Sep 7, 2018
1 parent ed249db commit 6d7bc5a
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 14 deletions.
10 changes: 9 additions & 1 deletion mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
Expand Up @@ -21,12 +21,13 @@ import java.io.File

import org.scalatest.Suite

import org.apache.spark.SparkContext
import org.apache.spark.{DebugFilesystem, SparkConf, SparkContext}
import org.apache.spark.ml.{PredictionModel, Transformer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.test.TestSparkSession
import org.apache.spark.util.Utils
Expand All @@ -36,6 +37,13 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite =>
@transient var sc: SparkContext = _
@transient var checkpointDir: String = _

protected override def sparkConf = {
new SparkConf()
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
}

protected override def createSparkSession: TestSparkSession = {
new TestSparkSession(new SparkContext("local[2]", "MLlibUnitTest", sparkConf))
}
Expand Down
Expand Up @@ -38,7 +38,9 @@ select a, b, sum(b) from data group by 3;
select a, b, sum(b) + 2 from data group by 3;

-- negative case: nondeterministic expression
select a, rand(0), sum(b) from data group by a, 2;
select a, rand(0), sum(b)
from
(select /*+ REPARTITION(1) */ a, b from data) group by a, 2;

-- negative case: star
select * from data group by a, b, 1;
Expand Down
Expand Up @@ -135,7 +135,9 @@ aggregate functions are not allowed in GROUP BY, but found (sum(CAST(data.`b` AS


-- !query 13
select a, rand(0), sum(b) from data group by a, 2
select a, rand(0), sum(b)
from
(select /*+ REPARTITION(1) */ a, b from data) group by a, 2
-- !query 13 schema
struct<a:int,rand(0):double,sum(b):bigint>
-- !query 13 output
Expand Down
Expand Up @@ -558,7 +558,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {

test("SPARK-18004 limit + aggregates") {
withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value").repartition(1)
val limit2Df = df.limit(2)
checkAnswer(
limit2Df.groupBy("id").count().select($"id"),
Expand Down
Expand Up @@ -85,14 +85,16 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
}

val df5 = Seq((Seq("a", null), Seq(1, 2))).toDF("k", "v")
intercept[RuntimeException] {
val msg1 = intercept[Exception] {
df5.select(map_from_arrays($"k", $"v")).collect
}
}.getMessage
assert(msg1.contains("Cannot use null as map key!"))

val df6 = Seq((Seq(1, 2), Seq("a"))).toDF("k", "v")
intercept[RuntimeException] {
val msg2 = intercept[Exception] {
df6.select(map_from_arrays($"k", $"v")).collect
}
}.getMessage
assert(msg2.contains("The given two arrays should have the same length"))
}

test("struct with column name") {
Expand Down Expand Up @@ -2377,7 +2379,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
assert(ex2.getMessage.contains(
"The number of lambda function arguments '3' does not match"))

val ex3 = intercept[RuntimeException] {
val ex3 = intercept[Exception] {
dfExample1.selectExpr("transform_keys(i, (k, v) -> v)").show()
}
assert(ex3.getMessage.contains("Cannot use null as map key!"))
Expand Down Expand Up @@ -2697,7 +2699,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {

test("SPARK-24734: Fix containsNull of Concat for array type") {
val df = Seq((Seq(1), Seq[Integer](null), Seq("a", "b"))).toDF("k1", "k2", "v")
val ex = intercept[RuntimeException] {
val ex = intercept[Exception] {
df.select(map_from_arrays(concat($"k1", $"k2"), $"v")).show()
}
assert(ex.getMessage.contains("Cannot use null as map key"))
Expand Down
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContex
import org.apache.spark.sql.test.SQLTestData.{NullInts, NullStrings, TestData2}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.XORShiftRandom

class DataFrameSuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand Down Expand Up @@ -1729,10 +1730,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-9083: sort with non-deterministic expressions") {
import org.apache.spark.util.random.XORShiftRandom

val seed = 33
val df = (1 to 100).map(Tuple1.apply).toDF("i")
val df = (1 to 100).map(Tuple1.apply).toDF("i").repartition(1)
val random = new XORShiftRandom(seed)
val expected = (1 to 100).map(_ -> random.nextDouble()).sortBy(_._2).map(_._1)
val actual = df.sort(rand(seed)).collect().map(_.getInt(0))
Expand Down
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -39,6 +40,11 @@ trait SharedSparkSession
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
}

/**
Expand Down
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.CacheTableCommand
Expand All @@ -59,7 +60,12 @@ object TestHive
.set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)
// SPARK-8910
.set("spark.ui.enabled", "false")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")))
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)))


case class TestHiveVersion(hiveClient: HiveClient)
Expand Down

0 comments on commit 6d7bc5a

Please sign in to comment.