Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test case for DStream.countByWindow #59

Closed
matmsh opened this issue Feb 6, 2016 · 2 comments
Closed

Test case for DStream.countByWindow #59

matmsh opened this issue Feb 6, 2016 · 2 comments

Comments

@matmsh
Copy link

matmsh commented Feb 6, 2016

Hi,
I am trying to write a test case for DStream,countByWindow using StreamingSuiteBase.

test("CountByWindow with windowDuration 3s and slideDuration=2s") {
     // There should be 2 windows :  {batch2,batch1},  {batch4,batch3,batch2}
     val batch1 = List("a","b")
     val batch2 = List("d","f","a")
     val batch3 = List("f","g","h")
     val batch4 = List("a")
     val input= List(batch1,batch2,batch3,batch4)
      val expected = List( List(5L), List(7L))
     def countByWindow(ds:DStream[String]):DStream[Long] = {
        ds.countByWindow(windowDuration = Seconds(3), slideDuration = Seconds(2))
     }
     testOperation[String,Long](input,countByWindow _ ,expected)
}

But I get a timed out exception when I run it.

Testing started at 15:31 ...
16/02/06 15:31:44 INFO [ScalaTest-run] WindowTest: Using manual clock
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: numBatches = 2, numExpectedOutput = 2
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: Manual clock before advancing = 0
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: Manual clock after advancing = 2000
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: output.size = 0, numExpectedOutput = 2
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: output.size = 0, numExpectedOutput = 2
....
16/02/06 15:31:56 INFO [ScalaTest-run-running-WindowTest] WindowTest: output.size = 1, numExpectedOutput = 2
16/02/06 15:31:56 INFO [ScalaTest-run-running-WindowTest] WindowTest: Output generated in 10022 milliseconds
16/02/06 15:31:56 INFO [ScalaTest-run-running-WindowTest] WindowTest: [5]

assertion failed: Operation timed out after 10022 ms
java.lang.AssertionError: assertion failed: Operation timed out after 10022 ms
    at scala.Predef$.assert(Predef.scala:179)


Is my set up correct ? (I can not find an example on testing DStream.countByWindow in the test cases.)

Thanks in advance for your assistance !
Shing

@mahmoudhanafy
Copy link
Collaborator

I think the problem is: the number of batches being sent depends on the length of the output. I will fix that and make it depends on the length of input instead.

A quick fix you can use now is to send the number of batches explicitly like this:
testOperation[String, Long](input, countByWindow _, expected, 4, ordered = true)

mahmoudhanafy added a commit to mahmoudhanafy/spark-testing-base that referenced this issue Feb 29, 2016
@matmsh
Copy link
Author

matmsh commented Mar 8, 2016

I have verified the suggested quick fix works.
Thanks!

Shing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants