-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1393] [BEAM-1394] [BEAM-1445] Update Flink Runner to Flink 1.2.0 & Use Flink InternalTimerService for TimerInternals #1960
Conversation
R: @aljoscha |
Refer to this link for build results (access rights to CI server needed): |
Hi @JingsongLi Could you please rebase on top of master and provide one (or several if there are independent features) commits? With several merge commits in between it's hard for me to reconcile it with the current master when merging. I also think we need to figure out the checkpointing of the side-input state before we can merge. Having in a regression in the checkpointing would not be good. Unfortunately I didn't manage to come up with a good solution for that yet. I'll keep you posted. |
Hi @aljoscha I'm so sorry, I want to rebase some commits, but bring so many commits to this PR. Is there a way to get rid of them? |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 10990 lines...] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.codehaus.groovy.reflection.CachedConstructor.invoke(CachedConstructor.java:83) at org.codehaus.groovy.reflection.CachedConstructor.doConstructorInvoke(CachedConstructor.java:77) at org.codehaus.groovy.runtime.callsite.ConstructorSite$ConstructorSiteNoUnwrap.callConstructor(ConstructorSite.java:84) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCallConstructor(CallSiteArray.java:60) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:235) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:247) at org.codehaus.mojo.findbugs.FindbugsViolationCheckMojo.execute(FindbugsViolationCheckMojo.groovy:529) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-02-09T16:44:36.775 [ERROR] 2017-02-09T16:44:36.775 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-09T16:44:36.775 [ERROR] 2017-02-09T16:44:36.775 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-09T16:44:36.775 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-02-09T16:44:36.776 [ERROR] 2017-02-09T16:44:36.776 [ERROR] After correcting the problems, you can resume the build with the command2017-02-09T16:44:36.776 [ERROR] mvn -rf :beam-runners-flink_2.10channel stoppedSetting status of ed8f6ab to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7245/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Don't worry. 😃 I responded on Jira, regarding the checkpointing of side input and pushed back events. |
Refer to this link for build results (access rights to CI server needed): |
This is indeed a bit more complicated. I will do some tests in our internal Flink cluster. |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11121 lines...] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.codehaus.groovy.reflection.CachedConstructor.invoke(CachedConstructor.java:83) at org.codehaus.groovy.reflection.CachedConstructor.doConstructorInvoke(CachedConstructor.java:77) at org.codehaus.groovy.runtime.callsite.ConstructorSite$ConstructorSiteNoUnwrap.callConstructor(ConstructorSite.java:84) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCallConstructor(CallSiteArray.java:60) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:235) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:247) at org.codehaus.mojo.findbugs.FindbugsViolationCheckMojo.execute(FindbugsViolationCheckMojo.groovy:529) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-02-12T15:03:59.539 [ERROR] 2017-02-12T15:03:59.539 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-12T15:03:59.539 [ERROR] 2017-02-12T15:03:59.539 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-12T15:03:59.539 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-02-12T15:03:59.539 [ERROR] 2017-02-12T15:03:59.540 [ERROR] After correcting the problems, you can resume the build with the command2017-02-12T15:03:59.540 [ERROR] mvn -rf :beam-runners-flink_2.10channel stoppedSetting status of 2671e7e to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7328/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11123 lines...] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.codehaus.groovy.reflection.CachedConstructor.invoke(CachedConstructor.java:83) at org.codehaus.groovy.reflection.CachedConstructor.doConstructorInvoke(CachedConstructor.java:77) at org.codehaus.groovy.runtime.callsite.ConstructorSite$ConstructorSiteNoUnwrap.callConstructor(ConstructorSite.java:84) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCallConstructor(CallSiteArray.java:60) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:235) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:247) at org.codehaus.mojo.findbugs.FindbugsViolationCheckMojo.execute(FindbugsViolationCheckMojo.groovy:529) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-02-13T01:57:03.584 [ERROR] 2017-02-13T01:57:03.584 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-13T01:57:03.584 [ERROR] 2017-02-13T01:57:03.584 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-13T01:57:03.584 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-02-13T01:57:03.584 [ERROR] 2017-02-13T01:57:03.584 [ERROR] After correcting the problems, you can resume the build with the command2017-02-13T01:57:03.584 [ERROR] mvn -rf :beam-runners-flink_2.10channel stoppedSetting status of 26e7f16 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7334/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11119 lines...] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.codehaus.groovy.reflection.CachedConstructor.invoke(CachedConstructor.java:83) at org.codehaus.groovy.reflection.CachedConstructor.doConstructorInvoke(CachedConstructor.java:77) at org.codehaus.groovy.runtime.callsite.ConstructorSite$ConstructorSiteNoUnwrap.callConstructor(ConstructorSite.java:84) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCallConstructor(CallSiteArray.java:60) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:235) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:247) at org.codehaus.mojo.findbugs.FindbugsViolationCheckMojo.execute(FindbugsViolationCheckMojo.groovy:529) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-02-13T02:47:19.512 [ERROR] 2017-02-13T02:47:19.512 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-13T02:47:19.512 [ERROR] 2017-02-13T02:47:19.512 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-13T02:47:19.512 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-02-13T02:47:19.512 [ERROR] 2017-02-13T02:47:19.512 [ERROR] After correcting the problems, you can resume the build with the command2017-02-13T02:47:19.512 [ERROR] mvn -rf :beam-runners-flink_2.10channel stoppedSetting status of 429a7ad to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7335/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
I understood it.. add -Prelease to mvn install will run findbugs plugin. |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11368 lines...] ^/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java:441: error: bad use of '>' * Restore the state (stateName -> (valueCoder && (namespace -> value))) ^Command line was: /usr/local/asfpackages/java/jdk1.8.0_121/jre/../bin/javadoc @options @packagesRefer to the generated Javadoc files in '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/runner/target/apidocs' dir. at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeJavadocCommandLine(AbstractJavadocMojo.java:5188) at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeReport(AbstractJavadocMojo.java:2075) at org.apache.maven.plugin.javadoc.JavadocJar.execute(JavadocJar.java:188) ... 33 more2017-02-13T03:48:43.332 [ERROR] 2017-02-13T03:48:43.332 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-13T03:48:43.332 [ERROR] 2017-02-13T03:48:43.332 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-13T03:48:43.332 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-02-13T03:48:43.333 [ERROR] 2017-02-13T03:48:43.333 [ERROR] After correcting the problems, you can resume the build with the command2017-02-13T03:48:43.333 [ERROR] mvn -rf :beam-runners-flink_2.10channel stoppedSetting status of 35c6f71 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7336/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. I just had some inline comments about how we can put the finished touches to this. What do you think about them?
Once all this is in. I think we can also make WindowDoFnOperator
very slim. It basically only has to provide the GroupAlsoBy...
function.
Also, I would like to add some tests that check the side input and pushback behaviour and also tests for all the checkpoint/restore code. A (non-working) basic test is already available in DoFnOperatorTest.testSideInputs()
, we should expand on that.
StateHandle<Serializable> sideInputStateHandle = | ||
(StateHandle) sideInputStateBackend.checkpointStateSerializable( | ||
sideInputSnapshot, checkpointId, timestamp); | ||
// We can't get all timerServices, so we just snapshot our timerService |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can override getInternalTimerService(...)
, always pass through to super
and just record the names of the timer services in existence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have getInternalTimerServiceByName(String name) method. There maybe be a problem about invoking HeapInternalTimerService.startTimerService() by using null namespaceSerializer or null Trigger.
* Choose keyGroup of input and addInput to accumulator. | ||
*/ | ||
void addInput(InputT input) { | ||
int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think whenever the FlinkKeyGroupStateInternals
are used we also have access to a proper Flink KeyedStateBackend
. This has a method getCurrentKeyGroupIndex()
that could be used here instead of manually deriving the key-group index which might change depending on the underlying Flink implementation.
/** | ||
* Get map(namespce->T) from index 0. | ||
*/ | ||
Map<String, T> getMap() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to make this part of the code sensitive to wether we are operator-0 or another operator. On operator-0 the code should do what your code does right now.
On other operators we have to check whether the iterator is non-empty (this happens in case we restored and we have the state that was checkpointed on operator-0), if yes, extract the map and work with that without putting it back into the broadcast state. If there is nothing, simply create a new map and work with that for the rest of the operator lifetime since we are not operator-0 and don't have to do any checkpointing.
I also rebased/cherry-picked your changes on top of master and squashed into a single commit. The result of this is here: https://github.com/aljoscha/beam/tree/finish-pr-1969-flink12. If you could continue working on that one it will be easier to merge things into master in the end. 😃 |
35c6f71
to
7a2830e
Compare
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11381 lines...] ^/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java:441: error: bad use of '>' * Restore the state (stateName -> (valueCoder && (namespace -> value))) ^Command line was: /usr/local/asfpackages/java/jdk1.8.0_121/jre/../bin/javadoc @options @packagesRefer to the generated Javadoc files in '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/runner/target/apidocs' dir. at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeJavadocCommandLine(AbstractJavadocMojo.java:5188) at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeReport(AbstractJavadocMojo.java:2075) at org.apache.maven.plugin.javadoc.JavadocJar.execute(JavadocJar.java:188) ... 33 more2017-02-14T03:34:56.465 [ERROR] 2017-02-14T03:34:56.465 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-14T03:34:56.465 [ERROR] 2017-02-14T03:34:56.465 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-14T03:34:56.465 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-02-14T03:34:56.465 [ERROR] 2017-02-14T03:34:56.465 [ERROR] After correcting the problems, you can resume the build with the command2017-02-14T03:34:56.465 [ERROR] mvn -rf :beam-runners-flink_2.10channel stoppedSetting status of 7a2830e to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7388/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Because TwoInputStreamOperatorTestHarness does not support KeyedStateBackend, the testing of KeyedSideInput and snapshot/restore is a bit difficult. |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11383 lines...] ^/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java:435: error: bad use of '>' * Restore the state (stateName -> (valueCoder && (namespace -> value))) ^Command line was: /usr/local/asfpackages/java/jdk1.8.0_121/jre/../bin/javadoc @options @packagesRefer to the generated Javadoc files in '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/runner/target/apidocs' dir. at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeJavadocCommandLine(AbstractJavadocMojo.java:5188) at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeReport(AbstractJavadocMojo.java:2075) at org.apache.maven.plugin.javadoc.JavadocJar.execute(JavadocJar.java:188) ... 33 more2017-02-14T10:05:03.381 [ERROR] 2017-02-14T10:05:03.381 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-14T10:05:03.381 [ERROR] 2017-02-14T10:05:03.381 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-14T10:05:03.381 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-02-14T10:05:03.381 [ERROR] 2017-02-14T10:05:03.381 [ERROR] After correcting the problems, you can resume the build with the command2017-02-14T10:05:03.381 [ERROR] mvn -rf :beam-runners-flink_2.10channel stoppedSetting status of f411503 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7398/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Ha wow, you are quick. 😃 Testing the keyed operator with side input does in fact work when using I would propose to add more extensive tests and resolve some other rough edges in follow-up work. What do you think? |
OK, very good! |
The good thing is that |
@davorbonaci What's the command to ask Jenkins to re-run? I can't figure out what the problem is in the "Maven clean install" hook. |
retest this please |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11383 lines...] ^/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java:435: error: bad use of '>' * Restore the state (stateName -> (valueCoder && (namespace -> value))) ^Command line was: /usr/local/asfpackages/java/jdk1.8.0_121/jre/../bin/javadoc @options @packagesRefer to the generated Javadoc files in '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/runner/target/apidocs' dir. at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeJavadocCommandLine(AbstractJavadocMojo.java:5188) at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeReport(AbstractJavadocMojo.java:2075) at org.apache.maven.plugin.javadoc.JavadocJar.execute(JavadocJar.java:188) ... 33 more2017-02-14T18:30:17.679 [ERROR] 2017-02-14T18:30:17.680 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-02-14T18:30:17.680 [ERROR] 2017-02-14T18:30:17.680 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-02-14T18:30:17.680 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-02-14T18:30:17.680 [ERROR] 2017-02-14T18:30:17.681 [ERROR] After correcting the problems, you can resume the build with the command2017-02-14T18:30:17.681 [ERROR] mvn -rf :beam-runners-flink_2.10channel stoppedSetting status of f411503 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7400/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Looks like a javadoc problem, but no details. |
I think I figured it out, see my fix here: #2012 I'll wait for Jenkins to give the green light and then I'll merge. 👍 |
I finally managed to resolve all the issues, was chasing master there for a bit. 😄 Thanks for your very good work, @JingsongLi 👍 |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.