Skip to content
Jozef Dúc edited this page Nov 11, 2016 · 12 revisions

StreamingSuiteBase simulates the way spark streaming works. It sends the input as batches and apply given operation at every batch and compare the output with expected output. You can compare the output and expected output in the same order (by setting ordered flag to true), Or Compare them unordered (ordered flag = false).

Example(Unary operation):

class SampleStreamingTest extends FunSuite with StreamingSuiteBase {

  test("really simple transformation") {
    val input = List(List("hi"), List("hi holden"), List("bye"))
    val expected = List(List("hi"), List("hi", "holden"), List("bye"))
    testOperation[String, String](input, tokenize _, expected, ordered = false)
  }

  // This is the sample operation we are testing
  def tokenize(f: DStream[String]): DStream[String] = {
    f.flatMap(_.split(" "))
  }

}

Example(Binary Operation):

class SampleStreamingTest extends FunSuite with StreamingSuiteBase {

  test("simple two stream streaming test") {
    val input = List(List("hi", "pandas"), List("hi holden"), List("bye"))
    val input2 = List(List("hi"), List("pandas"), List("byes"))
    val expected = List(List("pandas"), List("hi holden"), List("bye"))
    testOperation[String, String, String](input, input2, subtract _, expected, ordered = false)
  }

  def subtract(d1: DStream[String], d2: DStream[String]): DStream[String] = {
    d1.transformWith(d2, SampleStreamingTest.subtractRDDs _)
  }

}

object SampleStreamingTest {
  def subtractRDDs(r1: RDD[String], r2: RDD[String]): RDD[String] = {
    r1.subtract(r2)
  }

Example(Window Operation):

class SampleStreamingTest extends FunSuite with StreamingSuiteBase {

  test("CountByWindow with windowDuration 3s and slideDuration=2s") {
    // There should be 2 windows :  {batch2, batch1},  {batch4, batch3, batch2}
    val batch1 = List("a", "b")
    val batch2 = List("d", "f", "a")
    val batch3 = List("f", "g"," h")
    val batch4 = List("a")
    val input= List(batch1, batch2, batch3, batch4)
    val expected = List(List(5L), List(7L))

    def countByWindow(ds:DStream[String]):DStream[Long] = {
      ds.countByWindow(windowDuration = Seconds(3), slideDuration = Seconds(2))
    }

    testOperation[String, Long](input, countByWindow _, expected, ordered = true)
  }
}

You can simulate the input batch as a List of values or as null to simulate empty batch.

Example(Empty Batch):

class SampleStreamingTest extends FunSuite with StreamingSuiteBase {

  test("empty batch by using null") {
    def multiply(stream1: DStream[Int]) = stream1.map(_ * 3)

    val input1 = List(List(1), null, List(10))
    val output = List(List(3), List(30))

    testOperation(input1, multiply _, output, ordered = false)
  }

}

You can also compare the output and expected output with custom equality method, using implicit custom equality object.

Example(Custom Equality):

class SampleStreamingTest extends FunSuite with StreamingSuiteBase {
  test("custom equality object (Integer)") {
    val input = List(List(-1), List(-2, 3, -4), List(5, -6))
    val expected = List(List(1), List(2, 3, 4), List(5, 6))

    implicit val integerCustomEquality =
      new Equality[Int] {
        override def areEqual(a: Int, b: Any): Boolean =
          b match {
            case n: Int => Math.abs(a) == Math.abs(n)
            case _ => false
          }
      }

    def doNothing(ds: DStream[Int]) = ds

    testOperation[Int, Int](input, doNothing _, expected, ordered = false)
    testOperation[Int, Int](input, doNothing _, expected, ordered = true)
  }
}

The simulation times out after 10 seconds. If you want to increase that duration you should override maxWaitTimeMillis value.

Example(Longer timeout duration):

class SampleStreamingTest extends FunSuite with StreamingSuiteBase {
  override def maxWaitTimeMillis: Int = 20000

  test("increase duration more than 10 seconds") {
    val input = (1 to 1000).toList.map(x => List(x))
    val expectedOutput = (1 to 1000).toList.map(x => List(2 * x))

    def multiply(ds: DStream[Int]) = ds.map(_ * 2)

    testOperation[Int, Int](input, multiply _, expectedOutput, ordered = true)
  }
}