-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-846] Add WindowMappingFn, WindowMappingFns #2284
Conversation
Refer to this link for build results (access rights to CI server needed): |
* window in a {@link PCollectionView} consumed as a | ||
* {@link ParDo.BoundMulti#withSideInputs(PCollectionView[]) side input}. | ||
*/ | ||
public abstract class WindowMappingFn<W extends BoundedWindow> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SourceWindowT
and TargetWindowT
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if SourceWindowT
is valuable - this can be provided to some arbitrarily windowed ParDo
, at which point it should still behave appropriately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's possible to constrain on SourceWindowT, but I think we should keep the target window type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our conclusion was that the use of WindowMappingFn
was always as used as a Key and shouldn't be interacted with by the runner. Did you have other uses in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would actually be perfectly fine to have a WindowMappingFn
that could only apply to certain main input window types and side input window types. But due to erasure we can't actually validate the main window type or side window type, unless we play tricks like we do with other Fns. I don't think it is worthwhile here so I don't see the point of either one (conversely, I'm perfectly happy having both and leaving them as wildcards / BoundedWindow
when a runner interacts with them)
The feature as it exists before this PR probably has ~zero users and may have just barely more afterwards. This tweak in the design is more valuable for clarifying the model and solving the GC issue that real use cases.
I was thinking of two things:
- The type variables for internal type checking of implementations, which you can just do with their own type variables that don't show up in the interface.
- Just accurately describing the situation since it has no downside that I can think of and tends to pay off later in unforeseen ways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A WindowFn should return a default WindowMappingFn that maps to the same window type as the WindowFn itself. (It can't of course place constraints on the main window type, so should accept plain old BoundedWindow as a parameter and to any typechecking manually, if needed.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, when it comes via the defaults provided by a WindowFn
that is a reasonable guideline - though if we had construction-time validation I would be OK if it was violated.
Manual typechecking is really a last resort, since it happens way too late. We have lots of other options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the codomain window type is a type parameter of WindowMappingFn, we can enforce WindowFn.defaultWindowMappingFn() respects it at compile time. It's harder to see what we can enforce at pipeline construction time. Are you suggesting we add another method for construction-time validation? (In particular, the prohibitions here seem to be "in the static set" and "not the global window" (where the latter should probably be phrased as "finite endpoint")).
/** | ||
*/ | ||
public class WindowMappingFns { | ||
public static WindowMappingFn<?> fromKnownFn(WindowFn<?, ?> fn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that a WindowFn
itself might be allowed to provide a default choice. It wouldn't be a relevant part of the UDF as invoked by the Fn API, but would be extracted and embedded in the Runner API graph.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
7958e2e
to
688d648
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
688d648
to
f833425
Compare
Refer to this link for build results (access rights to CI server needed): |
Can you tag with multiple JIRAs? I was just browsing my assigned issues and was surprised that BEAM-260 was not linked. |
I'm going to add both the input and target types everywhere. Watch this space. (As an aside, I'm also going to expect that the target type everywhere except the actual declaration to always be |
Actually, just the target window type. I forgot how much of a nightmare using that sort of wildcard for inputs is in Java. |
WindowMappingFn maps from a main input window to a side input window, and provides a maximum amount of lookback that is possible to generate using the MappingFn. This separates the logic of assigning windows from the logic of mapping main and side inputs. Additionally, it permits pipeline runners to garbage collect side inputs when they are no longer accessible. This is currently unused. See BEAM-846 and BEAM-260 for additional context.
Done. PTAL. |
f833425
to
9d0da6e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Next time, could you make additional commits rather than re-pushing so it's easier to review what changed? (They can then all be squashed on merge.)
checkArgument(windows.get().contains(window), | ||
"StaticWindows only supports side input windows for main input windows that it contains"); | ||
return window; | ||
public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprised StaticWindows isn't parameterized... Should it be (before making the change is backwards incompatible)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a package-private implementation detail of PAssert
; I don't think should be parameterized, and we can still change it if we desire.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 2.58 MB...] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoFailureException: I/O operation failed at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.execute(CoverallsReportMojo.java:259) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 moreCaused by: java.io.IOException: Report submission to Coveralls API failed with HTTP status 504: Gateway Time-out (Coveralls API internal error) at org.eluder.coveralls.maven.plugin.httpclient.CoverallsClient.parseResponse(CoverallsClient.java:108) at org.eluder.coveralls.maven.plugin.httpclient.CoverallsClient.submit(CoverallsClient.java:85) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.submitData(CoverallsReportMojo.java:400) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.execute(CoverallsReportMojo.java:254) ... 33 moreCaused by: java.io.IOException: Coveralls API internal error at org.eluder.coveralls.maven.plugin.httpclient.CoverallsClient.parseResponse(CoverallsClient.java:97) ... 36 more2017-03-23T21:04:35.866 [ERROR] 2017-03-23T21:04:35.866 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-23T21:04:35.866 [ERROR] 2017-03-23T21:04:35.866 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-23T21:04:35.866 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExceptionchannel stoppedSetting status of abf129e to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8733/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
@Override | ||
public Duration maximumLookback() { | ||
// TODO: This may be unsafe. | ||
return Duration.millis(Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduce a config ?
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
WindowMappingFn maps from a main input window to a side input window,
and provides a maximum amount of lookback that is possible to generate
using the MappingFn. This separates the logic of assigning windows from
the logic of mapping main and side inputs. Additionally, it permits
pipeline runners to garbage collect side inputs when they are no longer
accessible.
This is currently unused.
See BEAM-846 and BEAM-260 for additional context.