Skip to content

[BEAM-5197] Fix UnboundedSourceWrapper#testWatermarkEmission#7138

Merged
tweise merged 3 commits intoapache:masterfrom
mxm:BEAM-5197
Nov 28, 2018
Merged

[BEAM-5197] Fix UnboundedSourceWrapper#testWatermarkEmission#7138
tweise merged 3 commits intoapache:masterfrom
mxm:BEAM-5197

Conversation

@mxm
Copy link
Contributor

@mxm mxm commented Nov 27, 2018

A small wait interval could deadlock on some platforms because the checkpoint
lock would be held too often, making it impossible for the source to make
progress.

CC @kennknowles @tweise

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---

A small wait interval could deadlock on some platforms because the checkpoint
lock would be held too often, making it impossible for the source to make
progress.
@mxm mxm requested review from kennknowles and tweise November 27, 2018 13:24
@mxm
Copy link
Contributor Author

mxm commented Nov 27, 2018

I observed that this would fail even on MacOs with a small enough wait interval, e.g. 10 milliseconds. The lock is not held while sleeping but the wait interval is small enough that the source thread may not acquire the lock fast enough before it is acquired again by the test.

Would appreciate if someone with Linux could verify this.


// Consider that UnboundedSourceWrapper needs to acquire the checkpoint lock below.
// So wait for enough time for that to happen.
Thread.sleep(200);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a CountdownLatch or something like that instead of a sleep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is tricky because the processing time is expected to be advanced for the event time timers to progress.

I've refactored the test to wait for the readers to finish; only then the time is advanced and the final trigger executed. That avoids constantly holding the lock.

@tweise
Copy link
Contributor

tweise commented Nov 27, 2018

Here is my attempt at it: #7140

@tweise
Copy link
Contributor

tweise commented Nov 27, 2018

Just wondering why this test is needed at all? Doesn't testValueEmission already verify that the watermarks are generated?

@mxm
Copy link
Contributor Author

mxm commented Nov 27, 2018

Good point. testValueEmission is more sophisticated though because it emulates the runtime behavior by reading from multiple sharded instances of UnboundedSoure.

I've extended the test to at least check that two watermarks are emitted after the watermark emission is triggered twice. That should make it a good addition to the value test where only the final watermark is checked.

@mxm
Copy link
Contributor Author

mxm commented Nov 27, 2018

Test failures in latest PreCommit are unrelated to this PR.

edit: Also had to fix checkstyle, so tests are running again.

synchronized (checkpointLock) {
testHarness.setProcessingTime(Instant.now().getMillis());
}
// Need to advance this so that the watermark timers in the source wrapper fire
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment also needs update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the comment still holds true.

// Need to advance this so that the watermark timers in the source wrapper fire
// Synchronize is necessary because this can interfere with updating the PriorityQueue
// of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
synchronized (testHarness.getCheckpointLock()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved up to initialization and synchronized won't be needed then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is not possible. The watermark timer is not registered then so this would not to lead to a watermark emission, hence the waiting before for the readers before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see that you changed the initial value to Long.MIN_VALUE. What is the significance of changing the processing time to 0 in the next line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to make this test more meaningful by testing emission of multiple watermarks by the source. The initial value before was arbitrary as well, as it was System.currentTimeMillis().

Afterwards we advance to 0, but it could be any value between MIN_VALUE and MAX_VALUE, but it has to be larger than currentTime + autowatermark interval. This emits a watermark from the TestCoutingSource.

Then we advance it to MAX_VALUE to test the emission of the final watermark.

@mxm
Copy link
Contributor Author

mxm commented Nov 27, 2018

ElasticSearchIO test failures.

@tweise
Copy link
Contributor

tweise commented Nov 27, 2018

Run Java PreCommit

@tweise tweise changed the title [BEAM-5197] Increase test time progressing interval to avoid deadlock [BEAM-5197] Fix UnboundedSourceWrapper#testWatermarkEmission Nov 27, 2018
@tweise tweise merged commit 02c763b into apache:master Nov 28, 2018
ajamato pushed a commit to ajamato/beam that referenced this pull request Nov 28, 2018
…7138)

* Avoid holding checkpoint lock in UnboundedSourceWrapper#testWatermarkEmission
* Test emitting multiple watermarks in UnboundedSourceWrapper#testWatermarkEmission
ajamato pushed a commit to ajamato/beam that referenced this pull request Nov 28, 2018
…7138)

* Avoid holding checkpoint lock in UnboundedSourceWrapper#testWatermarkEmission
* Test emitting multiple watermarks in UnboundedSourceWrapper#testWatermarkEmission
ajamato pushed a commit to ajamato/beam that referenced this pull request Nov 30, 2018
…7138)

* Avoid holding checkpoint lock in UnboundedSourceWrapper#testWatermarkEmission
* Test emitting multiple watermarks in UnboundedSourceWrapper#testWatermarkEmission
ajamato pushed a commit to ajamato/beam that referenced this pull request Nov 30, 2018
…7138)

* Avoid holding checkpoint lock in UnboundedSourceWrapper#testWatermarkEmission
* Test emitting multiple watermarks in UnboundedSourceWrapper#testWatermarkEmission
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants