New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1582, BEAM-1562] Stop streaming tests on EOT Watermark. #2168
Conversation
4d1222f
to
3bc74ab
Compare
Refer to this link for build results (access rights to CI server needed): Failed Tests: 2beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-spark: 2
--none-- |
Pushed an updated that should avoid multi-context issues. |
Run Spark RunnableOnService |
Refer to this link for build results (access rights to CI server needed): |
Run Spark RunnableOnService |
Refer to this link for build results (access rights to CI server needed): |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Run Spark RunnableOnService |
Refer to this link for build results (access rights to CI server needed): |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Run Spark RunnableOnService |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Okay, ran 3xROS and 3xMaven_Install - both run all streaming tests, so total of 6 runs. All green. Do I believe it's totally stable now ? hopeful. R: @staslev note the changes I made to |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of comments on points of customization that I don't really understand.
Since it is in TestSparkOptions
I don't care too much, and deflaking definitely is worth the tradeoff in the near term.
So LGTM to deflake but I am still interested in the answers to my questions, and I suspect the options should either be more heavily documented or JIRAs filed to remove.
@@ -32,4 +36,22 @@ | |||
boolean isForceStreaming(); | |||
void setForceStreaming(boolean forceStreaming); | |||
|
|||
@Description("A hard-coded expected number of assertions for this test pipeline.") | |||
@Nullable | |||
Integer getExpectedAssertions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have PAssert.countAssertions(Pipeline)
what is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the tests for the Spark runner test a recovery/resume from checkpoint, so while "countAssertions" might expect 1 assertion, it is fair that this assertion would only happen after recovery, so that first execution of the pipeline has 0 assertions, and the following that resumes from checkpoint has the actual expected, 1 assertion. A manual override seemed to fit here.
Usually I'd expect runners not to test the underlying engine's features (such as recovery from checkpoint), but since Spark doesn't provide things like recovering metric values and the runner implements those we have to test them.
Integer getExpectedAssertions(); | ||
void setExpectedAssertions(Integer expectedAssertions); | ||
|
||
@Description("A customizable EOT watermark in Millis.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for? If it stays, I'd spell out "end of time" in the docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, relating to my previous comment, I'd like to be able to terminate the pipeline on "end-of-time" watermark. But for the first execution I'd like to do it "mid-life" to simulate a failure/stop. so the watermark is not at infinity yet.
I really don't know why I decided to save words in documentation 🙃, I'll fix that.
@kennknowles your comments all relate to a test that tests resuming from checkpoint - |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 953.97 KB...] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.compiler.CompilationFailureException: Compilation failure/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java:[246,14] cannot find symbol symbol: method setEndOfTimeWatermark(long) location: variable options of type org.apache.beam.runners.spark.TestSparkPipelineOptions at org.apache.maven.plugin.compiler.AbstractCompilerMojo.execute(AbstractCompilerMojo.java:1029) at org.apache.maven.plugin.compiler.TestCompilerMojo.execute(TestCompilerMojo.java:170) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-03-07T10:14:31.734 [ERROR] 2017-03-07T10:14:31.734 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-07T10:14:31.734 [ERROR] 2017-03-07T10:14:31.734 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-07T10:14:31.735 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException2017-03-07T10:14:31.735 [ERROR] 2017-03-07T10:14:31.735 [ERROR] After correcting the problems, you can resume the build with the command2017-03-07T10:14:31.735 [ERROR] mvn -rf :beam-runners-sparkchannel stoppedSetting status of b39783e to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8166/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
b39783e
to
c3bc49a
Compare
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 953.19 KB...] at hudson.remoting.UserRequest.perform(UserRequest.java:153) at hudson.remoting.UserRequest.perform(UserRequest.java:50) at hudson.remoting.Request$2.run(Request.java:336) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:68) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoFailureException: You have 1 Checkstyle violation. at org.apache.maven.plugin.checkstyle.CheckstyleViolationCheckMojo.execute(CheckstyleViolationCheckMojo.java:588) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-03-07T12:13:48.085 [ERROR] 2017-03-07T12:13:48.085 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-07T12:13:48.085 [ERROR] 2017-03-07T12:13:48.085 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-07T12:13:48.085 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException2017-03-07T12:13:48.086 [ERROR] 2017-03-07T12:13:48.086 [ERROR] After correcting the problems, you can resume the build with the command2017-03-07T12:13:48.086 [ERROR] mvn -rf :beam-runners-sparkchannel stoppedSetting status of c3bc49a to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8168/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
c3bc49a
to
b079349
Compare
Refer to this link for build results (access rights to CI server needed): |
Remove timeout since it is already a pipeline option.
Just noting this still LGTM, and thanks for indulging my curiosity. Not sure what you want to do as far as squashing (look like there's at least one fixup commit) so I'm not merging - go ahead when ready. |
Waiting for @staslev to review as well regarding |
@@ -92,6 +93,13 @@ public static SparkTimerInternals forStreamFromSources( | |||
slowestLowWatermark, slowestHighWatermark, synchronizedProcessingTime); | |||
} | |||
|
|||
/** Build a global {@link TimerInternals} for all feeding streams.*/ | |||
public static SparkTimerInternals forStreamFromSources( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forStreamFromSources
's parameters are not sources per-se, perhaps it could be renamed to reflect it's params better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this specific method to global
, since it really provides a "global" SparkTimerInterals
.
} while ((timeoutMillis -= batchDurationMillis) > 0 | ||
&& globalWatermark.isBefore(stopPipelineWatermark)); | ||
|
||
result.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L127 - L145 could be encapsulated in a neat method so it looks something like:
result = delegate.run(pipeline);
awaitWatermark(testSparkPipelineOptions, result);
result.stop();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
|
||
private PipelineRule(Duration forcedTimeout) { | ||
this.delegate = new SparkStreamingPipelineRule(forcedTimeout, testName); | ||
private PipelineRule(SparkPipelineRule delegate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't PipelineRule.SparkStreamingPipelineRule#after(..)
and TestSparkRunner#run()
both try to delete the checkpoint? Do we need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that's me being "over protective" 😄
PipelineRule
might not work for all test scenarios, so just in case. Better to remove an empty directory (or none) rather then risking junk in Jenkins.
Would you agree ?
@@ -102,11 +101,9 @@ | |||
private static final String TOPIC = "kafka_beam_test_topic"; | |||
|
|||
@Rule | |||
public TemporaryFolder tmpFolder = new TemporaryFolder(); | |||
public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo noContextRe*sue*
=> noContextReuse
I wonder if ReuseSparkContextRule
can be merged into PipelineRule
to simplify things and have a single rule that can be used in the context of Spark pipeline tests.
At the moment we have cases with up to 3 different rules in a single test (e.g. CreateStreamTest
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a separate ticket: "enhance PipelineRule".
Should be doable since it uses RuleChain
Could you open a ticket ?
@@ -149,6 +150,9 @@ public MetricResults metrics() { | |||
@Override | |||
protected void stop() { | |||
SparkContextFactory.stopSparkContext(javaSparkContext); | |||
if (Objects.equals(state, State.RUNNING)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state == State.RUNNING
?
http://stackoverflow.com/questions/34486832/objects-equals-and-object-equals
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave it that way until the PipelineResult
API officially stops using null
as a valid State
throw beamExceptionFrom(e); | ||
} finally { | ||
SparkContextFactory.stopSparkContext(javaSparkContext); | ||
if (Objects.equals(state, State.RUNNING)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state == State.RUNNING
?
this.state = State.DONE; | ||
break; | ||
default: | ||
this.state = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can avoid mutating state
here, and just return the appropriate State
back to the caller.
Since SparkPipelineResult#waitUntilFinish()
stores the state
returned by awaitTermination(...)
anyway, the bottom line should be the same with the advantage we'll only have one place where state
is set.
Also perhaps if the state is neither DONE
nor RUNNING
, then UNKNOWN
should be returned so that users are not taken by surprise getting a null
here.
b079349
to
157ab10
Compare
@staslev I addressed some of your comments, and responded on others where I thought it's a no-op. |
LGTM, pending the tests to confirm. |
Refer to this link for build results (access rights to CI server needed): |
Merging, thanks! |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.