[BEAM-115] Unify Java and Python WindowingStrategy representations.#3222
[BEAM-115] Unify Java and Python WindowingStrategy representations.#3222robertwb wants to merge 4 commits into
Conversation
|
R: @tgroh |
| // This URN says that the WindowFn is just a UDF blob the Java SDK understands | ||
| // TODO: standardize such things | ||
| public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1"; | ||
| public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1"; |
There was a problem hiding this comment.
Just curious - does the v add anything in particular?
There was a problem hiding this comment.
Makes it clearer that this number is a version field.
There was a problem hiding this comment.
We use 'v' elsewhere (e.g. in the package name itself). Otherwise it looks like a ratio or something.
There was a problem hiding this comment.
It is in the package name to make it a valid package name. Not a huge deal. If you feel strongly about it I'm fine with it.
| package org.apache.beam.runner_api.v1; | ||
|
|
||
| option java_package = "org.apache.beam.sdk.common.runner.v1"; | ||
| option java_outer_classname = "RunnerApiPayloads"; |
There was a problem hiding this comment.
Let's name the classes and files consistently so they are easy to find.
There was a problem hiding this comment.
I'm open for suggestions (haven't come up with names that feel right yet).
There was a problem hiding this comment.
Here, how about StandardWindowFns in standard_window_fns.proto?
There was a problem hiding this comment.
Sounds good. Done.
|
R: @kennknowles |
| SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1" | ||
| SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1" | ||
| PICKLED_WINDOW_FN = "beam:windowfn:pickled_python:v0.1" | ||
| GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1" |
There was a problem hiding this comment.
We had discussed using a proto definition like
message WellKnownUrns {
string global_windows = 1 [default = "urn:beam:windowfn:global_windows:0.1"]
...
}
But I guess this is proto 2 only. For normal proto use, I agree with this change, but it is too bad we don't have that hack available. Any other ideas? A data file seems mostly pointless because whatever you look up the URN by is effectively just another unique name anyhow (it would probably prevent some errors).
Not that I think there is a huge danger of these getting out of sync, since they can never change once they are really used. But it would be nice for new SDKs, etc.
There was a problem hiding this comment.
We could have a script that goes through the proto file (or something else) and pulls the name out and generates a .java, .py, .go, ... file of constants. Perhaps overkill though. Ideally this'd be co-located with the semantic definition of the urn as well, if needed.
| public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"; | ||
| // This URN says that the WindowFn is just a UDF blob the Java SDK understands | ||
| // TODO: standardize such things | ||
| public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1"; |
There was a problem hiding this comment.
FWIW the urn: at the beginning is the URI scheme that makes it a URN. The official syntax is "urn:" Namespace : Namespace-specific format
(https://tools.ietf.org/html/rfc8141#section-2)
I agree the scheme is mostly pointless but wanted to just note the departure.
There was a problem hiding this comment.
(in particular, top level schemes and the remainder of the URI have exactly the same relationship as URN namespaces and the NSS, as far as I can tell)
There was a problem hiding this comment.
I didn't know urns were an official thing with a spec... We can prefix everything with urn: if you want.
There was a problem hiding this comment.
No, I just wanted to note it. I find the prefixing a bit tedious. I've been doing it because following internet standards is my first instinct, but I don't think it is worthwhile. We should lay out the namespaces in a section of the runner guide on the website.
|
(btw in commit message s/Stragegy/Strategy/) |
tgroh
left a comment
There was a problem hiding this comment.
It seems that it would be worth adding the serialization as well here and testing the round tripping.
|
We have serialization and round-tripping tests. |
|
What I mean is my understanding is that we do not have any logic to serialize known types (so this new deserialization code is not exercised). I'm happy to be wrong though. |
|
Ah, yes, the serialization logic is only available in Python. |
|
PTAL |
| import org.apache.beam.sdk.transforms.windowing.FixedWindows; | ||
| import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; | ||
| import org.apache.beam.sdk.transforms.windowing.Trigger; | ||
| import org.apache.beam.sdk.transforms.windowing.*; |
There was a problem hiding this comment.
Yeah, just disabled that setting in intellej...
| windowFnSpec.getSpec().getParameter().unpack( | ||
| RunnerApiPayloads.SlidingWindowsPayload.class); | ||
| return SlidingWindows.of( | ||
| Duration.millis(Durations.toMillis(slidingParams.getSize()))) |
There was a problem hiding this comment.
The formatting here is pretty funky.
There was a problem hiding this comment.
These are long lines, you have to break somewhere. I generally abide by the rectangle rule when I have to break up large expressions.
There was a problem hiding this comment.
FWIW since you mentioned IntelliJ there's google-java-format plugin which is stellar.
| .build()))) | ||
| .build(); | ||
| // TODO: Set environment IDs | ||
| if (windowFn instanceof GlobalWindows) { |
There was a problem hiding this comment.
These should probably translated similarly to how we're translating the PTransform Payloads (i.e. with a registrar and some interface that can translate to and from a known type in some meaningful format. We probably want to wait until we've determined that API, though.)
There was a problem hiding this comment.
Yeah, I'll defer on that.
|
Jenkins: retest this please |
|
retest this please |
|
Jenkins: retest this please |
|
retest this please |
|
retest this please |
|
--none-- |
1 similar comment
|
--none-- |
|
Retest this please |
|
Jenkins: retest this please ( |
|
Jenkins is happy, PTAL. |
|
Jenkins appears sad due to a typo in an attribute name. |
kennknowles
left a comment
There was a problem hiding this comment.
LGTM once you get Jenkins happy.
| return (WindowFn<?, ?>) deserializedWindowFn; | ||
| default: | ||
| throw new IllegalArgumentException( | ||
| "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn()); |
There was a problem hiding this comment.
nit: use WindowFn.class.getSimpleName()
| self.assertEqual( | ||
| strategy, | ||
| DataflowRunner.deserialize_windowing_strategy( | ||
| DataflowRunner.serialize_windwoing_strategy(strategy))) |
| offset=proto_utils.from_micros( | ||
| timestamp_pb2.Timestamp, self.offset.micros))) | ||
|
|
||
| @urns.RunnerApiFn.register_urn( |
There was a problem hiding this comment.
One design decision on the Java side (which I don't feel wedded to) is that URNs and payloads are entirely divorced from their Java native classes. I had imagined adding PTransform.getUrn() but instead we have a registrar from Java class to URN and payload which lives entirely outside the core SDK to keep proto off the API surface.
I don't entirely understand the ramifications of the Python SDK making both of these decisions separately. (PTransform having their URN, proto messages being right there too). I'd love your thoughts on it sometime, and whether we can easily make Java more natural.
There was a problem hiding this comment.
There are certainly pros and cons to each, but we should be consistent. Certainly going from the urn to object requires some kind of registration (even if the other way around can be handled by polymorphism).
Probably wouldn't hurt to put together a doc detailing these design choices, which wouldn't have to all be resolved now (but before the Runner/Fn Api is public we would make sure all SDKs are consistent).
There was a problem hiding this comment.
I generally think that two-way translation is better via separate registered translators than dynamic dispatch. So that is where the design comes from. But actually, we don't really do it, for a specific reason. When we are parsing a proto that has URN "beam:transforms:pardo" we don't try to make a Java ParDo object because the types wouldn't work out, etc. Instead all transforms just become RawPTransform that can vend its URN and language-agnostic bits of the payload.
I am currently fleshing out the kinds of access runners will need and will then add all this to the runner guide.
Just notes, nothing particularly needed here.
|
Build finished. |
| SerializableUtils.serializeToByteArray(windowFn))) | ||
| .build()))) | ||
| .build(); | ||
| } else if (windowFn instanceof GlobalWindows) { |
There was a problem hiding this comment.
We should have some sort of registration scheme for WindowFns in the medium-long term, even if only to simplify this method. File a JIRA?
There was a problem hiding this comment.
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify.<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.