Skip to content

[BEAM-920] Add support for Watermarks in the Spark runner.#1987

Closed
amitsela wants to merge 13 commits intoapache:masterfrom
amitsela:watermarks
Closed

[BEAM-920] Add support for Watermarks in the Spark runner.#1987
amitsela wants to merge 13 commits intoapache:masterfrom
amitsela:watermarks

Conversation

@amitsela
Copy link
Member

@amitsela amitsela commented Feb 12, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@amitsela
Copy link
Member Author

amitsela commented Feb 12, 2017

This PR is the first of 2-3 PRs that will complete BEAM-920 to support the Beam model for streaming.
I'm trying to break to as many separate PRs as possible to make reviews simpler and focused, though it's not always easy since a lot of things are entangled here.

This PR adds support to Watermarks via a global Broadcast to workers, that keeps updating every batch (asynchronously), and also allows the runner's UnboundedDatasets (representation of UNBOUNDED PCollections) to track their creating sources so in case of a flattened stream they could access the relevant WMs.

@amitsela
Copy link
Member Author

R: @kennknowles
CC: @staslev

@jbonofre
Copy link
Member

CC : @jbonofre

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 69.679% when pulling e3593b5 on amitsela:watermarks into 5fe11a2 on apache:master.

@asfbot
Copy link

asfbot commented Feb 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7330/
--none--


p.run().waitUntilFinish(Duration.millis(options.getBatchIntervalMillis()).multipliedBy(3));

System.out.println(WatermarksDoFn.strings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.out.println

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops!


@ProcessElement
public void processElement(ProcessContext c) {
if (GlobalWatermarkHolder.get() == null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be able to be replaced by TestStream + triggers, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, next PR, lining it up as we speak.

Queue<SparkWatermarks> timesQueue = en.getValue();

// current state, if exists.
Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems about right. I admit that I have forgotten some of the details of our deep dive on watermarks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timesQueue = new ConcurrentLinkedQueue<>();
}
timesQueue.offer(sparkWatermarks);
sourceTimes.put(sourceId, timesQueue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the watermarks for unbounded sources are tracked here. What I don't quite follow is the behavior in fan-in situations where the watermark is the min of the inputs.

Copy link
Member Author

@amitsela amitsela Feb 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in cases such as Flatten where the WM is the min{WM1, WM2} it is up to whoever uses WMs to deduce that (such as GABW implementation).
This would be handled by SparkTimerInternals.
This WM mechanism only reflects the current-batch lo/hi WMs per source. Adding to that the information on upstream/origin sources to a PCollection is supposed to provide all necessary information to find the right WM for the computation, so for example a GABW following a Flatten would have to choose the Min{WM...}.
This is implemented in the next PR as well since it's related to triggers and generally a part of GABW implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amitsela
Copy link
Member Author

@kennknowles that's sort of the trade-off between gradual PRing with (fairly) small changes or one huge PR.
I've added some tests to make this as complete as possible but clearly it'll make more sense after the next PR.
I'll finish the polishing there and add a link to the branch here so you'd get a sneak-preview 😉

@amitsela
Copy link
Member Author

Rebased and removed leftover printouts.

@asfbot
Copy link

asfbot commented Feb 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7583/

Build result: FAILURE

[...truncated 12098 lines...] at hudson.remoting.UserRequest.perform(UserRequest.java:153) at hudson.remoting.UserRequest.perform(UserRequest.java:50) at hudson.remoting.Request$2.run(Request.java:332) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:68) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoFailureException: You have 1 Checkstyle violation. at org.apache.maven.plugin.checkstyle.CheckstyleViolationCheckMojo.execute(CheckstyleViolationCheckMojo.java:588) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-02-18T21:40:39.238 [ERROR] 2017-02-18T21:40:39.238 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-18T21:40:39.238 [ERROR] 2017-02-18T21:40:39.238 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-18T21:40:39.238 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException2017-02-18T21:40:39.239 [ERROR] 2017-02-18T21:40:39.239 [ERROR] After correcting the problems, you can resume the build with the command2017-02-18T21:40:39.239 [ERROR] mvn -rf :beam-runners-sparkchannel stoppedSetting status of c2415a0 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7583/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install
--none--

@amitsela
Copy link
Member Author

retest this please.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 69.166% when pulling 7463d8c on amitsela:watermarks into 01de255 on apache:master.

@asfbot
Copy link

asfbot commented Feb 19, 2017

@amitsela
Copy link
Member Author

retest this please.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 69.166% when pulling 7463d8c on amitsela:watermarks into 01de255 on apache:master.

@asfbot
Copy link

asfbot commented Feb 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7588/
--none--

@amitsela
Copy link
Member Author

Test flake was weird, but I don't see a point in investing too much here since this test specifically is to be removed in the next PR where Watermarks are part of "TestStream-like" test pipelines.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, LGTM. I understand that these things are going to have more changes as they develop, anyhow, so let's keep momentum. Sorry it took so long to grok this.

@amitsela
Copy link
Member Author

Added a fix to clear the state of singletons BEFORE pipeline execution via TestSparkRunner - this broke MetricsTest in post commit (I ran locally before merging). Simply moving this before executing the pipeline.
Merging when tests here pass and post commits pass locally for me.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.02%) to 69.173% when pulling 6ba7ed6 on amitsela:watermarks into 01de255 on apache:master.

@asfbot
Copy link

asfbot commented Feb 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7597/
--none--

@asfgit asfgit closed this in aa45ccb Feb 20, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants