Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
refactor in stop context
Browse files Browse the repository at this point in the history
  • Loading branch information
compae committed Jun 20, 2016
1 parent f6b1a9e commit ad78a0e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<scala.binary.version>2.10</scala.binary.version>
<joda.version>2.8.2</joda.version>
<scalatest.version>2.2.2</scalatest.version>
<scalatest.version>2.2.5</scalatest.version>
<junit.version>4.8.1</junit.version>
<scala.check.version>1.11.3</scala.check.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.streaming.datasource.models.InputSentences
import org.apache.spark.streaming.datasource.receiver.DatasourceDStream._
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.{Job, RateController, StreamInputInfo}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

private[streaming]
Expand Down Expand Up @@ -134,10 +134,26 @@ class DatasourceDStream[C <: SQLContext](
computingStopped = true

Option(datasourceRDD)
} else {
if (inputSentences.stopConditions.isDefined && inputSentences.stopConditions.get.finishContextWhenEmpty)
_ssc.stop(stopSparkContext, stopGracefully)
None
} else None
}

override private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[Row]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}

Some(new Job(time, jobFunc))
case None =>
if (computingStopped &&
inputSentences.stopConditions.isDefined &&
inputSentences.stopConditions.get.finishContextWhenEmpty
)
_ssc.stop(stopSparkContext, stopGracefully)

None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,12 @@
package org.apache.spark.streaming.datasource

import org.apache.spark.Logging
import org.scalatest.concurrent.Timeouts
import org.scalatest.{FunSuite, Outcome}
import org.scalatest.FunSuite
import org.scalatest.concurrent.TimeLimitedTests
import org.scalatest.time.SpanSugar._

private[datasource] trait DatasourceSuite extends FunSuite with Timeouts with Logging {
private[datasource] trait DatasourceSuite extends FunSuite with TimeLimitedTests with Logging {

val timeLimit = 1 minutes

/**
* Log the suite name and the test name before and after each test.
*
* Subclasses should never override this method. If they wish to run
* custom code before and after each test, they should mix in the
* {{org.scalatest.BeforeAndAfter}} trait instead.
*/
final protected override def withFixture(test: NoArgTest): Outcome = {
val testName = test.text
val suiteName = this.getClass.getName
val shortSuiteName = suiteName.replaceAll("org.apache.spark", "o.a.s")
try {
logInfo(s"\n\n===== TEST OUTPUT FOR $shortSuiteName: '$testName' =====\n")
test()
} finally {
logInfo(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n")
}
}
}

0 comments on commit ad78a0e

Please sign in to comment.