Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5999] Reconcile timer proto representation. #6986

Merged
merged 2 commits into from Nov 13, 2018

Conversation

@robertwb
Copy link
Contributor

robertwb commented Nov 8, 2018

This adds a coder field to the timer spec, and moves the Java executor to use this spec alone when constructing executable graphs (and Python does). It should unblock Python timers working on Flink once #6981 also goes in.

See also the thread at http://mail-archives.apache.org/mod_mbox/beam-dev/201810.mbox/%3CCAFFRZHVecq4r3F_XgkEMqHJ9AqTMfTvGNZkOg764H9L2fSL85g@mail.gmail.com%3E


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---
@robertwb

This comment has been minimized.

Copy link
Contributor Author

robertwb commented Nov 8, 2018

Run Java PreCommit

@robertwb

This comment has been minimized.

Copy link
Contributor Author

robertwb commented Nov 8, 2018

Run JavaPortabilityApi PreCommit

@robertwb

This comment has been minimized.

Copy link
Contributor Author

robertwb commented Nov 8, 2018

@lukecwik

This comment has been minimized.

Copy link
Member

lukecwik commented Nov 8, 2018

It seems odd to have the pipeline during job creation represent it one way and for the ProcessBundleDescriptor to create a PCollection to satisfy how the Data API is able to connect inputs/outputs. One of the side effects of this change is that timers are implicitly reserving their local names in the input map and aren't part of that input map.

In the thread you had mentioned that it seems a lot closer to state, where you planning on adding a follow-up change to migrate timers to a different state like API or just to the state API where timers are "read" like a bagstate and it is upto the SDK to request them and it would be an error not to do so?

@robertwb

This comment has been minimized.

Copy link
Contributor Author

robertwb commented Nov 8, 2018

True. We were already mutating it by cloning the output collection, but this is more invasive. I still do like the idea of timers being pushed to the SDK, but didn't fully flesh out all the options. (On the write side setting them as one sets state is easy.) I think it's more important to get the runner representation fixed, the fn-execution one is ephemeral and easier to evolve.

Fair point that timers are implicitly reserving their names, rather than explicitly doing so, which might be more motivation for the above change.

@robertwb robertwb force-pushed the robertwb:simplify-timers branch from 136ebde to 675187b Nov 9, 2018
}

/** The PTransform that uses this timer. */
public abstract PipelineNode.PTransformNode transform();
/** The local name the referencing PTransform uses to refer to this timer. */
public abstract String localName();
/** The PCollection that backs this timer. */
public abstract PipelineNode.PCollectionNode collection();

This comment has been minimized.

Copy link
@mxm

mxm Nov 9, 2018

Contributor

As far as I understand, at runtime, timers will still be represented as items of a PColleciton? If we remove the collection here, how do we discover timers for an input PCollection?

I suppose we go entirely thorugh TimerSpec in ProcessBundleDescriptors?

This comment has been minimized.

Copy link
@robertwb

robertwb Nov 9, 2018

Author Contributor

I don't plan on changing the representation at runtime now, but perhaps it will be in the future. For now, you can look up the collection in the inputs and outputs dict of a process bundle descriptor using the local name.

This comment has been minimized.

Copy link
@mxm

mxm Nov 13, 2018

Contributor

Ok, found it, for example:

stageBundleFactory
.getProcessBundleDescriptor()
// yes this is required twice
.getProcessBundleDescriptor()
.getPcollectionsMap()
.get("ParDo(Anonymous)2/ParMultiDo(Anonymous).foo.out:0")
.getUniqueName()

=> "ParDo(Anonymous)2/ParMultiDo(Anonymous).foo"

This comment has been minimized.

Copy link
@robertwb

robertwb Nov 13, 2018

Author Contributor

You should be able to get the name of the PCollection from ref.transform().getTransform().getInputsOrThrow(ref.localName()) as well.

This comment has been minimized.

Copy link
@mxm

mxm Nov 13, 2018

Contributor

To be able to resolve the TimerReference, I need to resolve the PCollectionId which I receive ("ParDo(Anonymous)2/ParMultiDo(Anonymous).foo.out:0"). I suppose it would work to loop over all TimeReferences and resolve by the local name, but couldn't that cause problems with duplicate timer names across different transforms?

This comment has been minimized.

Copy link
@mxm

mxm Nov 13, 2018

Contributor

Actually, I don't have the local timer name available, only the timer pcollection id. If we move TimerReference#collection, I will have to remove ".out:0" from the input collection id, to look up the corresponding TimerReference.

This comment has been minimized.

Copy link
@mxm

mxm Nov 13, 2018

Contributor

Maybe I'm missing something here, otherwise I'd keep the collection method.

This comment has been minimized.

Copy link
@robertwb

robertwb Nov 13, 2018

Author Contributor

Could you provide a pointer to the code you're trying to use this from?

This comment has been minimized.

Copy link
@mxm

mxm Nov 13, 2018

Contributor

Here I use the collection() method:

To resolve the Timer here:

final String timerPCollectionId = extractTimerCollectionId(inputCollectionId);

This comment has been minimized.

Copy link
@mxm

mxm Nov 13, 2018

Contributor

FYI, I've solved it like this: 49ea0b9

@mxm

This comment has been minimized.

Copy link
Contributor

mxm commented Nov 13, 2018

Run Java PreCommit

@mxm

This comment has been minimized.

Copy link
Contributor

mxm commented Nov 13, 2018

Run JavaPortabilityApi PreCommit

@mxm mxm merged commit a0bc60b into apache:master Nov 13, 2018
7 checks passed
7 checks passed
Go ("Run Go PreCommit") SUCCESS
Details
Java ("Run Java PreCommit") SUCCESS
Details
JavaPortabilityApi ("Run JavaPortabilityApi PreCommit") SUCCESS
Details
Java_Examples_Dataflow ("Run Java_Examples_Dataflow PreCommit") SUCCESS
Details
Python ("Run Python PreCommit") SUCCESS
Details
RAT ("Run RAT PreCommit") SUCCESS
Details
Spotless ("Run Spotless PreCommit") SUCCESS
Details
@lukecwik

This comment has been minimized.

Copy link
Member

lukecwik commented Nov 13, 2018

@robertwb, can you fix up https://s.apache.org/beam-portability-timers based upon the new representation. Want to keep the docs up to date.

mxm added a commit to mxm/beam that referenced this pull request Nov 13, 2018
…roto representation."

This reverts commit a0bc60b, reversing
changes made to fca3d98.
mxm added a commit to mxm/beam that referenced this pull request Nov 13, 2018
…roto representation."

This reverts commit a0bc60b, reversing
changes made to fca3d98.
mxm added a commit to mxm/beam that referenced this pull request Nov 14, 2018
…roto representation."

This reverts commit a0bc60b, reversing
changes made to fca3d98.
@mxm

This comment has been minimized.

Copy link
Contributor

mxm commented Nov 14, 2018

I think there is a problem with this PR. It doesn't produce unique names anymore for the timer collection ids. For example, timer foo:

stageBundleFactory
.getProcessBundleDescriptor()
.getProcessBundleDescriptor()
.getPcollectionsMap()
.get("ParDo(Anonymous)2/ParMultiDo(Anonymous).foo.out:0")
.getUniqueName()

Should produce: "ParDo(Anonymous)2/ParMultiDo(Anonymous).foo"
Actual: "ParDo(Anonymous)2/ParMultiDo(Anonymous).output"

I couldn't find a way to resolve the timer input collection id from the timer output collection id. So I had to revert the commit for #6981.

@robertwb

This comment has been minimized.

Copy link
Contributor Author

robertwb commented Nov 14, 2018

@robertwb robertwb mentioned this pull request Nov 14, 2018
0 of 2 tasks complete
@robertwb

This comment has been minimized.

Copy link
Contributor Author

robertwb commented Nov 14, 2018

#7036 should now generate unique names. Looks like the original expansion had the same issue, which is why I didn't see it. What are these unique names being used for?

@mxm

This comment has been minimized.

Copy link
Contributor

mxm commented Nov 14, 2018

What are these unique names being used for?

We use them to resolve the timer input collection id from the timer output collection id. Like in the above example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.