Skip to content

StreamingActionBase

Jozef Dúc edited this page Nov 11, 2016 · 4 revisions

StreamingActionBase contains Methods for testing Spark actions. Because actions don't return a DStream, you will need to verify the results of your test against mocks.

Example:

class SampleStreamingActionTest extends FunSuite with StreamingActionBase {

  test("a simple action") {
    val input = List(List("hi"), List("bye"))
    val acc = sc.accumulator(0)
    val cw = countWordsLength(acc)
    runAction(input, cw)
    assert(5 === acc.value)
  }

  def countWordsLength(acc: Accumulator[Int]): (DStream[String] => Unit) = {
    def c(input: DStream[String]): Unit = {
      input.foreachRDD{r: RDD[String] =>
        r.foreach{e: String => acc += e.length()}}
    }
    c _
  }
}