Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test][SPARK-16002][Follow-up] Fix flaky test in StreamingQueryListenerSuite #15497

Closed
wants to merge 3 commits into from

Conversation

lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Oct 15, 2016

What changes were proposed in this pull request?

StreamingQueryListenerSuite #test(s"single listener, check trigger statuses") is flaky; the following is one possible flaky execution:

+-----------------------------------+--------------------------------+
|      StreamExecution thread       |         testing thread         |
+-----------------------------------+--------------------------------+
|  ManualClock.waitTillTime(100) {  |                                |
|        _isWaiting = true          |                                |
|            wait(10)               |                                |
|        still in wait(10)          |  if (_isWaiting) advance(100)  |
|        still in wait(10)          |  if (_isWaiting) advance(200)  | <- this should be disallowed !
|        still in wait(10)          |  if (_isWaiting) advance(300)  | <- this should be disallowed !
|      wake up from wait(10)        |                                |
|       current time is 600         |                                |
|       _isWaiting = false          |                                |
|  }                                |                                |
+-----------------------------------+--------------------------------+

This patch's fix is, disallowing advance(...) for more than once when there's some thread -- e.g. StreamExecution -- is still in wait(10). In other words, we do not want to advance() "too early" -- in this sense, this is a follow-up to SPARK-16002.

How was this patch tested?

Ran the flaky test for 1000 times, and all passed.

@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 15, 2016

@tdas @zsxwing could you take a look, thanks!

@SparkQA
Copy link

SparkQA commented Oct 15, 2016

Test build #67002 has finished for PR 15497 at commit 5bc47b6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 15, 2016

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Oct 15, 2016

Test build #67006 has finished for PR 15497 at commit 5bc47b6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2016

Test build #3343 has finished for PR 15497 at commit 5bc47b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for identifying this potential source of flakiness. But its not clear to me why another variable _readyForFirstPeek is needed over _isWaiting. Could you add more docs? Or at least explain why a single variable _isWaiting not sufficient to get this logic right?

@@ -27,6 +27,7 @@ package org.apache.spark.util
private[spark] class ManualClock(private var time: Long) extends Clock {

private var _isWaiting = false
private var _readyForFirstPeek = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more docs on what does peek mean, what does this variable supposed to do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its really not obvious on what purpose does _readyForFirstPeek server over _isWaiting.

Copy link
Contributor Author

@lw-lin lw-lin Oct 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_isWaiting indicates the main StreamExecution thread is waiting, while _readyForFirstPeek is supposed to indicate whether it's the first time the test thread knows about the main thread is waiting.

When it's a second time or a third time the test thread knows the main thread is waiting via isWaitingAndReadyForFirstPeek(), the test thread itself should block to prevent advance()ing too early.

what purpose does _readyForFirstPeek server over _isWaiting

Please refer to advance() where we would mark _readyForFirstPeek as false to indicate the test thread has already advance()d for one time, but we would not do anything to _isWaiting.

Copy link
Contributor Author

@lw-lin lw-lin Oct 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas if we're in the right direction, we should definitely pick good names for _readyForFirstPeek and isWaitingAndReadyForFirstPeek() :)

Copy link
Contributor

@tdas tdas Oct 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think semantically this ManualClock is becoming a very complex and confusing API to use. The caller has to

How about changing the API to something like this.

  • ManualClock: It has a method called isThreadWaitingAt(timeWhenWaitStarted: Long): Boolean. It returns true when another thread has started waiting when the time was timeWhenWaitStarted. This replaces isWaiting.
  • StreamTest: It keeps track of the expected time when manual clock is being used. For every AdvaneManualClock is first calls isThreadWaitingAt(expectedTime) until it returns true, then advances manual clock as well as the expected time.

This will ensure that successive AdvanceManualClock will wait for the StreamExecution thread to be unblocked from the previous wait, and start a new wait on the expected time.

How does this sound?

If this sounds good, then there should be unit tests to test this Manual Clock behavior thoroughly.

Copy link
Contributor Author

@lw-lin lw-lin Oct 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds good!

One thing to confirm: we are assuming only one main thread and one test thread, right? Because:

  • given two main threads m1, m2, and one testing thread t, should isThreadWaitingAt(time) return true or false if m1 reaches waitUtil(time) but m2 does not yet?
  • given one main thread m, and two testing threads t1, t2, should isThreadWaitingAt(time) return true for only one of t1, t2 or both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could reproduce this issue as well in a Jenkins run in my PR - https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3351/consoleFull

Copy link
Contributor

@tdas tdas Oct 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled in your current fix into my PR to test for flakiness by running it in a loop in Jenkins. At least that will give us confidence that the theory of the bug is right. Nonetheless, please change the API and code as I suggested.

My PR - #15492 . See Jenkin builds - 3352, 3353, 3354. Each of them is expected to test StreamingQueryListenerSuite about 50 times before Jenkins times out.

*/
def isWaiting: Boolean = synchronized { _isWaiting }
def isWaitingAndReadyForFirstPeek: Boolean = synchronized { _isWaiting && _readyForFirstPeek }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont like this name. Why not keep it simple like isWaiting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess it boils down to what is the purpose of _readyForPeek, which is not clear to me.

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #3347 has finished for PR 15497 at commit 5bc47b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnLastQueryStatus { status: StreamingQueryStatus =>
// Check the correctness of the trigger info of the last completed batch reported by
// onQueryProgress
assert(status.triggerDetails.get("triggerId") == "0")
assert(Seq("-1", "0").contains(status.triggerDetails.get("triggerId")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at my PR, I replaced .get with .containsKey because all we need to test here is that the triggerId key is set. The exact trigger id in which the data was found, etc. is not important.

#15492

Could you change this PR to do the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure -- let me do the change, thanks!

@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 17, 2016

Jenkins retest this please

@tdas
Copy link
Contributor

tdas commented Oct 17, 2016

I thought about it, and I still dont like this design. This is adding more complexity in a general class ManualClock, for functionality needed only by StreamExecution. And that leads to these sort of question - should the general feature like isThreadWaiting work with multiple threads, etc.

I think we need to do it differently. I think its best to create a custom ManualClock for StreamExecution, which adds the functionality necessary for StreamExecution.

Mind if I take over this PR and work this out (in the interest of time, 2.0.2 cutoff is imminent)?

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #67074 has finished for PR 15497 at commit 7ae7782.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Oct 17, 2016

I opened a PR after modifying your branch #15519. Since you did the initial investigation, I will mark you as the author when I merge it.

@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 18, 2016

Please go ahead and take over -- let's make it into 2.0.2, thanks!

@lw-lin lw-lin closed this Oct 18, 2016
@lw-lin lw-lin deleted the metrics-flaky-test branch October 18, 2016 07:38
asfgit pushed a commit that referenced this pull request Oct 18, 2016
This work has largely been done by lw-lin in his PR #15497. This is a slight refactoring of it.

## What changes were proposed in this pull request?
There were two sources of flakiness in StreamingQueryListener test.

- When testing with manual clock, consecutive attempts to advance the clock can occur without the stream execution thread being unblocked and doing some work between the two attempts. Hence the following can happen with the current ManualClock.
```
+-----------------------------------+--------------------------------+
|      StreamExecution thread       |         testing thread         |
+-----------------------------------+--------------------------------+
|  ManualClock.waitTillTime(100) {  |                                |
|        _isWaiting = true          |                                |
|            wait(10)               |                                |
|        still in wait(10)          |  if (_isWaiting) advance(100)  |
|        still in wait(10)          |  if (_isWaiting) advance(200)  | <- this should be disallowed !
|        still in wait(10)          |  if (_isWaiting) advance(300)  | <- this should be disallowed !
|      wake up from wait(10)        |                                |
|       current time is 600         |                                |
|       _isWaiting = false          |                                |
|  }                                |                                |
+-----------------------------------+--------------------------------+
```

- Second source of flakiness is that the adding data to memory stream may get processing in any trigger, not just the first trigger.

My fix is to make the manual clock wait for the other stream execution thread to start waiting for the clock at the right wait start time. That is, `advance(200)` (see above) will wait for stream execution thread to complete the wait that started at time 0, and start a new wait at time 200 (i.e. time stamp after the previous `advance(100)`).

In addition, since this is a feature that is solely used by StreamExecution, I removed all the non-generic code from ManualClock and put them in StreamManualClock inside StreamTest.

## How was this patch tested?
Ran existing unit test MANY TIME in Jenkins

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Liwei Lin <lwlin7@gmail.com>

Closes #15519 from tdas/metrics-flaky-test-fix.

(cherry picked from commit 7d878cf)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
asfgit pushed a commit that referenced this pull request Oct 18, 2016
This work has largely been done by lw-lin in his PR #15497. This is a slight refactoring of it.

## What changes were proposed in this pull request?
There were two sources of flakiness in StreamingQueryListener test.

- When testing with manual clock, consecutive attempts to advance the clock can occur without the stream execution thread being unblocked and doing some work between the two attempts. Hence the following can happen with the current ManualClock.
```
+-----------------------------------+--------------------------------+
|      StreamExecution thread       |         testing thread         |
+-----------------------------------+--------------------------------+
|  ManualClock.waitTillTime(100) {  |                                |
|        _isWaiting = true          |                                |
|            wait(10)               |                                |
|        still in wait(10)          |  if (_isWaiting) advance(100)  |
|        still in wait(10)          |  if (_isWaiting) advance(200)  | <- this should be disallowed !
|        still in wait(10)          |  if (_isWaiting) advance(300)  | <- this should be disallowed !
|      wake up from wait(10)        |                                |
|       current time is 600         |                                |
|       _isWaiting = false          |                                |
|  }                                |                                |
+-----------------------------------+--------------------------------+
```

- Second source of flakiness is that the adding data to memory stream may get processing in any trigger, not just the first trigger.

My fix is to make the manual clock wait for the other stream execution thread to start waiting for the clock at the right wait start time. That is, `advance(200)` (see above) will wait for stream execution thread to complete the wait that started at time 0, and start a new wait at time 200 (i.e. time stamp after the previous `advance(100)`).

In addition, since this is a feature that is solely used by StreamExecution, I removed all the non-generic code from ManualClock and put them in StreamManualClock inside StreamTest.

## How was this patch tested?
Ran existing unit test MANY TIME in Jenkins

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Liwei Lin <lwlin7@gmail.com>

Closes #15519 from tdas/metrics-flaky-test-fix.
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
This work has largely been done by lw-lin in his PR apache#15497. This is a slight refactoring of it.

## What changes were proposed in this pull request?
There were two sources of flakiness in StreamingQueryListener test.

- When testing with manual clock, consecutive attempts to advance the clock can occur without the stream execution thread being unblocked and doing some work between the two attempts. Hence the following can happen with the current ManualClock.
```
+-----------------------------------+--------------------------------+
|      StreamExecution thread       |         testing thread         |
+-----------------------------------+--------------------------------+
|  ManualClock.waitTillTime(100) {  |                                |
|        _isWaiting = true          |                                |
|            wait(10)               |                                |
|        still in wait(10)          |  if (_isWaiting) advance(100)  |
|        still in wait(10)          |  if (_isWaiting) advance(200)  | <- this should be disallowed !
|        still in wait(10)          |  if (_isWaiting) advance(300)  | <- this should be disallowed !
|      wake up from wait(10)        |                                |
|       current time is 600         |                                |
|       _isWaiting = false          |                                |
|  }                                |                                |
+-----------------------------------+--------------------------------+
```

- Second source of flakiness is that the adding data to memory stream may get processing in any trigger, not just the first trigger.

My fix is to make the manual clock wait for the other stream execution thread to start waiting for the clock at the right wait start time. That is, `advance(200)` (see above) will wait for stream execution thread to complete the wait that started at time 0, and start a new wait at time 200 (i.e. time stamp after the previous `advance(100)`).

In addition, since this is a feature that is solely used by StreamExecution, I removed all the non-generic code from ManualClock and put them in StreamManualClock inside StreamTest.

## How was this patch tested?
Ran existing unit test MANY TIME in Jenkins

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Liwei Lin <lwlin7@gmail.com>

Closes apache#15519 from tdas/metrics-flaky-test-fix.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
This work has largely been done by lw-lin in his PR apache#15497. This is a slight refactoring of it.

## What changes were proposed in this pull request?
There were two sources of flakiness in StreamingQueryListener test.

- When testing with manual clock, consecutive attempts to advance the clock can occur without the stream execution thread being unblocked and doing some work between the two attempts. Hence the following can happen with the current ManualClock.
```
+-----------------------------------+--------------------------------+
|      StreamExecution thread       |         testing thread         |
+-----------------------------------+--------------------------------+
|  ManualClock.waitTillTime(100) {  |                                |
|        _isWaiting = true          |                                |
|            wait(10)               |                                |
|        still in wait(10)          |  if (_isWaiting) advance(100)  |
|        still in wait(10)          |  if (_isWaiting) advance(200)  | <- this should be disallowed !
|        still in wait(10)          |  if (_isWaiting) advance(300)  | <- this should be disallowed !
|      wake up from wait(10)        |                                |
|       current time is 600         |                                |
|       _isWaiting = false          |                                |
|  }                                |                                |
+-----------------------------------+--------------------------------+
```

- Second source of flakiness is that the adding data to memory stream may get processing in any trigger, not just the first trigger.

My fix is to make the manual clock wait for the other stream execution thread to start waiting for the clock at the right wait start time. That is, `advance(200)` (see above) will wait for stream execution thread to complete the wait that started at time 0, and start a new wait at time 200 (i.e. time stamp after the previous `advance(100)`).

In addition, since this is a feature that is solely used by StreamExecution, I removed all the non-generic code from ManualClock and put them in StreamManualClock inside StreamTest.

## How was this patch tested?
Ran existing unit test MANY TIME in Jenkins

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Liwei Lin <lwlin7@gmail.com>

Closes apache#15519 from tdas/metrics-flaky-test-fix.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants