Skip to content

Commit

Permalink
revert spurious test change; this came from another PR
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 16, 2018
1 parent a7c5181 commit 8092a4d
Showing 1 changed file with 1 addition and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{File, InterruptedIOException, IOException, UncheckedIOException}
import java.nio.channels.ClosedByInterruptException
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit}

import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.control.ControlThrowable

Expand All @@ -30,7 +29,7 @@ import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
Expand Down Expand Up @@ -246,29 +245,6 @@ class ContinuousSuite extends ContinuousSuiteBase {
class ContinuousStressSuite extends ContinuousSuiteBase {
import testImplicits._

// Continuous processing tasks end asynchronously, so test that they actually end.
private val tasksEndedListener = new SparkListener() {
val activeTaskIds = mutable.Set[Long]()

override def onTaskStart(start: SparkListenerTaskStart): Unit = {
activeTaskIds.add(start.taskInfo.taskId)
}

override def onTaskEnd(end: SparkListenerTaskEnd): Unit = {
activeTaskIds.remove(end.taskInfo.taskId)
}
}
override def beforeEach(): Unit = {
spark.sparkContext.addSparkListener(tasksEndedListener)
}

override def afterEach(): Unit = {
eventually(timeout(streamingTimeout)) {
assert(tasksEndedListener.activeTaskIds.isEmpty)
}
spark.sparkContext.removeSparkListener(tasksEndedListener)
}

test("only one epoch") {
val df = spark.readStream
.format("rate")
Expand Down

0 comments on commit 8092a4d

Please sign in to comment.