Skip to content

Commit

Permalink
[SPARK-21977][HOTFIX] Adjust EnsureStatefulOpPartitioningSuite to use…
Browse files Browse the repository at this point in the history
… scalatest lifecycle normally instead of constructor

## What changes were proposed in this pull request?

Adjust EnsureStatefulOpPartitioningSuite to use scalatest lifecycle normally instead of constructor; fixes:

```
*** RUN ABORTED ***
  org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.streaming.EnsureStatefulOpPartitioningSuite.<init>(EnsureStatefulOpPartitioningSuite.scala:35)
```

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19306 from srowen/SPARK-21977.2.
  • Loading branch information
srowen committed Sep 21, 2017
1 parent 1270e71 commit f10cbf1
Showing 1 changed file with 49 additions and 43 deletions.
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
import java.util.UUID

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Attribute
Expand All @@ -32,66 +33,71 @@ import org.apache.spark.sql.test.SharedSQLContext
class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLContext {

import testImplicits._
super.beforeAll()

private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
private var baseDf: DataFrame = null

testEnsureStatefulOpPartitioning(
"ClusteredDistribution generates Exchange with HashPartitioning",
baseDf.queryExecution.sparkPlan,
requiredDistribution = keys => ClusteredDistribution(keys),
expectedPartitioning =
keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions),
expectShuffle = true)
override def beforeAll(): Unit = {
super.beforeAll()
baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
}

test("ClusteredDistribution generates Exchange with HashPartitioning") {
testEnsureStatefulOpPartitioning(
baseDf.queryExecution.sparkPlan,
requiredDistribution = keys => ClusteredDistribution(keys),
expectedPartitioning =
keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions),
expectShuffle = true)
}

testEnsureStatefulOpPartitioning(
"ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning",
baseDf.coalesce(1).queryExecution.sparkPlan,
requiredDistribution = keys => ClusteredDistribution(keys),
expectedPartitioning =
keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions),
expectShuffle = true)
test("ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning") {
testEnsureStatefulOpPartitioning(
baseDf.coalesce(1).queryExecution.sparkPlan,
requiredDistribution = keys => ClusteredDistribution(keys),
expectedPartitioning =
keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions),
expectShuffle = true)
}

testEnsureStatefulOpPartitioning(
"AllTuples generates Exchange with SinglePartition",
baseDf.queryExecution.sparkPlan,
requiredDistribution = _ => AllTuples,
expectedPartitioning = _ => SinglePartition,
expectShuffle = true)
test("AllTuples generates Exchange with SinglePartition") {
testEnsureStatefulOpPartitioning(
baseDf.queryExecution.sparkPlan,
requiredDistribution = _ => AllTuples,
expectedPartitioning = _ => SinglePartition,
expectShuffle = true)
}

testEnsureStatefulOpPartitioning(
"AllTuples with coalesce(1) doesn't need Exchange",
baseDf.coalesce(1).queryExecution.sparkPlan,
requiredDistribution = _ => AllTuples,
expectedPartitioning = _ => SinglePartition,
expectShuffle = false)
test("AllTuples with coalesce(1) doesn't need Exchange") {
testEnsureStatefulOpPartitioning(
baseDf.coalesce(1).queryExecution.sparkPlan,
requiredDistribution = _ => AllTuples,
expectedPartitioning = _ => SinglePartition,
expectShuffle = false)
}

/**
* For `StatefulOperator` with the given `requiredChildDistribution`, and child SparkPlan
* `inputPlan`, ensures that the incremental planner adds exchanges, if required, in order to
* ensure the expected partitioning.
*/
private def testEnsureStatefulOpPartitioning(
testName: String,
inputPlan: SparkPlan,
requiredDistribution: Seq[Attribute] => Distribution,
expectedPartitioning: Seq[Attribute] => Partitioning,
expectShuffle: Boolean): Unit = {
test(testName) {
val operator = TestStatefulOperator(inputPlan, requiredDistribution(inputPlan.output.take(1)))
val executed = executePlan(operator, OutputMode.Complete())
if (expectShuffle) {
val exchange = executed.children.find(_.isInstanceOf[Exchange])
if (exchange.isEmpty) {
fail(s"Was expecting an exchange but didn't get one in:\n$executed")
}
assert(exchange.get ===
ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan),
s"Exchange didn't have expected properties:\n${exchange.get}")
} else {
assert(!executed.children.exists(_.isInstanceOf[Exchange]),
s"Unexpected exchange found in:\n$executed")
val operator = TestStatefulOperator(inputPlan, requiredDistribution(inputPlan.output.take(1)))
val executed = executePlan(operator, OutputMode.Complete())
if (expectShuffle) {
val exchange = executed.children.find(_.isInstanceOf[Exchange])
if (exchange.isEmpty) {
fail(s"Was expecting an exchange but didn't get one in:\n$executed")
}
assert(exchange.get ===
ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan),
s"Exchange didn't have expected properties:\n${exchange.get}")
} else {
assert(!executed.children.exists(_.isInstanceOf[Exchange]),
s"Unexpected exchange found in:\n$executed")
}
}

Expand Down

0 comments on commit f10cbf1

Please sign in to comment.