Skip to content

Commit

Permalink
[SPARK-21345][SQL][TEST][TEST-MAVEN] SparkSessionBuilderSuite should …
Browse files Browse the repository at this point in the history
…clean up stopped sessions.

## What changes were proposed in this pull request?

`SparkSessionBuilderSuite` should clean up stopped sessions. Otherwise, it leaves behind some stopped `SparkContext`s interfereing with other test suites using `ShardSQLContext`.

Recently, master branch fails consequtively.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/

## How was this patch tested?

Pass the Jenkins with a updated suite.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#18567 from dongjoon-hyun/SPARK-SESSION.
  • Loading branch information
dongjoon-hyun authored and cloud-fan committed Jul 8, 2017
1 parent 330bf5c commit 0b8dd2d
Showing 1 changed file with 18 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,49 @@

package org.apache.spark.sql

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.internal.SQLConf

/**
* Test cases for the builder pattern of [[SparkSession]].
*/
class SparkSessionBuilderSuite extends SparkFunSuite {
class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {

private var initialSession: SparkSession = _
override def afterEach(): Unit = {
// This suite should not interfere with the other test suites.
SparkSession.getActiveSession.foreach(_.stop())
SparkSession.clearActiveSession()
SparkSession.getDefaultSession.foreach(_.stop())
SparkSession.clearDefaultSession()
}

private lazy val sparkContext: SparkContext = {
initialSession = SparkSession.builder()
test("create with config options and propagate them to SparkContext and SparkSession") {
val session = SparkSession.builder()
.master("local")
.config("spark.ui.enabled", value = false)
.config("some-config", "v2")
.getOrCreate()
initialSession.sparkContext
}

test("create with config options and propagate them to SparkContext and SparkSession") {
// Creating a new session with config - this works by just calling the lazy val
sparkContext
assert(initialSession.sparkContext.conf.get("some-config") == "v2")
assert(initialSession.conf.get("some-config") == "v2")
SparkSession.clearDefaultSession()
assert(session.sparkContext.conf.get("some-config") == "v2")
assert(session.conf.get("some-config") == "v2")
}

test("use global default session") {
val session = SparkSession.builder().getOrCreate()
val session = SparkSession.builder().master("local").getOrCreate()
assert(SparkSession.builder().getOrCreate() == session)
SparkSession.clearDefaultSession()
}

test("config options are propagated to existing SparkSession") {
val session1 = SparkSession.builder().config("spark-config1", "a").getOrCreate()
val session1 = SparkSession.builder().master("local").config("spark-config1", "a").getOrCreate()
assert(session1.conf.get("spark-config1") == "a")
val session2 = SparkSession.builder().config("spark-config1", "b").getOrCreate()
assert(session1 == session2)
assert(session1.conf.get("spark-config1") == "b")
SparkSession.clearDefaultSession()
}

test("use session from active thread session and propagate config options") {
val defaultSession = SparkSession.builder().getOrCreate()
val defaultSession = SparkSession.builder().master("local").getOrCreate()
val activeSession = defaultSession.newSession()
SparkSession.setActiveSession(activeSession)
val session = SparkSession.builder().config("spark-config2", "a").getOrCreate()
Expand All @@ -73,16 +72,14 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
SparkSession.clearActiveSession()

assert(SparkSession.builder().getOrCreate() == defaultSession)
SparkSession.clearDefaultSession()
}

test("create a new session if the default session has been stopped") {
val defaultSession = SparkSession.builder().getOrCreate()
val defaultSession = SparkSession.builder().master("local").getOrCreate()
SparkSession.setDefaultSession(defaultSession)
defaultSession.stop()
val newSession = SparkSession.builder().master("local").getOrCreate()
assert(newSession != defaultSession)
newSession.stop()
}

test("create a new session if the active thread session has been stopped") {
Expand All @@ -91,11 +88,9 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
activeSession.stop()
val newSession = SparkSession.builder().master("local").getOrCreate()
assert(newSession != activeSession)
newSession.stop()
}

test("create SparkContext first then SparkSession") {
sparkContext.stop()
val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1")
val sparkContext2 = new SparkContext(conf)
val session = SparkSession.builder().config("key2", "value2").getOrCreate()
Expand All @@ -105,11 +100,9 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
// We won't update conf for existing `SparkContext`
assert(!sparkContext2.conf.contains("key2"))
assert(sparkContext2.conf.get("key1") == "value1")
session.stop()
}

test("create SparkContext first then pass context to SparkSession") {
sparkContext.stop()
val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1")
val newSC = new SparkContext(conf)
val session = SparkSession.builder().sparkContext(newSC).config("key2", "value2").getOrCreate()
Expand All @@ -121,14 +114,12 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
// the conf of this sparkContext will not contain the conf set through the API config.
assert(!session.sparkContext.conf.contains("key2"))
assert(session.sparkContext.conf.get("spark.app.name") == "test")
session.stop()
}

test("SPARK-15887: hive-site.xml should be loaded") {
val session = SparkSession.builder().master("local").getOrCreate()
assert(session.sessionState.newHadoopConf().get("hive.in.test") == "true")
assert(session.sparkContext.hadoopConfiguration.get("hive.in.test") == "true")
session.stop()
}

test("SPARK-15991: Set global Hadoop conf") {
Expand All @@ -140,7 +131,6 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
assert(session.sessionState.newHadoopConf().get(mySpecialKey) == mySpecialValue)
} finally {
session.sparkContext.hadoopConfiguration.unset(mySpecialKey)
session.stop()
}
}
}

0 comments on commit 0b8dd2d

Please sign in to comment.