Skip to content

Commit

Permalink
[SPARK-13655] Improve isolation between tests in KinesisBackedBlockRD…
Browse files Browse the repository at this point in the history
…DSuite

This patch modifies `KinesisBackedBlockRDDTests` to increase the isolation between tests in order to fix a bug which causes the tests to hang.

See #11558 for more details.

/cc zsxwing srowen

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11564 from JoshRosen/SPARK-13655.
  • Loading branch information
JoshRosen authored and zsxwing committed Mar 7, 2016
1 parent b6071a7 commit e9e67b3
Showing 1 changed file with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.spark.streaming.kinesis

import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}

abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
extends KinesisFunSuite with BeforeAndAfterAll {
extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext {

private val testData = 1 to 8

Expand All @@ -35,10 +35,10 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
private var shardIdToRange: Map[String, SequenceNumberRange] = null
private var allRanges: Seq[SequenceNumberRange] = null

private var sc: SparkContext = null
private var blockManager: BlockManager = null

override def beforeAll(): Unit = {
super.beforeAll()
runIfTestsEnabled("Prepare KinesisTestUtils") {
testUtils = new KPLBasedKinesisTestUtils()
testUtils.createStream()
Expand All @@ -55,19 +55,23 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
(shardId, seqNumRange)
}
allRanges = shardIdToRange.values.toSeq

val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
sc = new SparkContext(conf)
blockManager = sc.env.blockManager
}
}

override def beforeEach(): Unit = {
super.beforeEach()
val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
sc = new SparkContext(conf)
blockManager = sc.env.blockManager
}

override def afterAll(): Unit = {
if (testUtils != null) {
testUtils.deleteStream()
}
if (sc != null) {
sc.stop()
try {
if (testUtils != null) {
testUtils.deleteStream()
}
} finally {
super.afterAll()
}
}

Expand Down

0 comments on commit e9e67b3

Please sign in to comment.