Skip to content

[BEAM-3741] Proto changes for reporting backlog/splitting/finalizing bundles.#6963

Merged
lukecwik merged 3 commits intoapache:masterfrom
lukecwik:splittabledofn3
Nov 9, 2018
Merged

[BEAM-3741] Proto changes for reporting backlog/splitting/finalizing bundles.#6963
lukecwik merged 3 commits intoapache:masterfrom
lukecwik:splittabledofn3

Conversation

@lukecwik
Copy link
Member

@lukecwik lukecwik commented Nov 6, 2018

This change contains the recommended proto changes from:

Note that we only have partial implementations within some shared code as there is no end to end working solution yet over portability for any runner so if this compiles and doesn't break tests we should be able to submit this to unblock further work on SDF.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---

@lukecwik
Copy link
Member Author

lukecwik commented Nov 6, 2018

R: @swegner
CC: @iemejia

@lukecwik
Copy link
Member Author

lukecwik commented Nov 6, 2018

CC: @ananvay

@lukecwik
Copy link
Member Author

lukecwik commented Nov 6, 2018

Run Java PreCommit

1 similar comment
@lukecwik
Copy link
Member Author

lukecwik commented Nov 6, 2018

Run Java PreCommit

@iemejia
Copy link
Member

iemejia commented Nov 7, 2018

Run Java PreCommit

ProcessBundleRequest process_bundle = 1001;
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
FinalizeBundleRequest finalize_bundle = 1004;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it meant that InstructionRequest is marked Stable? Is there any expectation that modifications here are backwards/forward compatible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a hint that this proto is unlikely to change. There are no compatibility guarantees yet since we don't have a full portability API done and implemented.

repeated DelayedBundleApplication residual_roots = 2;
}

message FinalizeBundleRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FinalizeBundleRequest is packaged inside of an InstructionRequest. Is the instruction_reference here redundant wit hthe InstructionRequest.instruction_id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the FinalizeBundleRequest.instruction_id is used to know which FinalizeBundleResponse corresponds to it, the instruction_reference is referring to the the instruction_id of the ProcessBundleRequest that requested finalization.

message Application {
// (Required) The primitive transform to which to pass the element
string ptransform_id = 1;
// One of the applications specifying the scope of work for a bundle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be valuable to link from these protos to a glossary of concept definitions? If I remember correctly one of the SDF docs had a good definition of a "bundle application".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a link.

// One of the applications specifying the scope of work for a bundle.
message BundleApplication {
// (Required) The primitive transform to which to pass the element
string ptransform_id = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this layer of the architecture do we talk about the difference between 'PTransform' and 'applied PTransform'? If I understand correctly, PTransform is an SDK concept which can be applied many times in a pipeline; and a bundle application refers to a specific application of a PTransform.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied PTransforms are an internal representation detail of how an SDK tracks "duplication" of the same PTransform within a graph.

From the Beam Portability APIs, the pipeline proto only represent applied PTransforms as their are no unapplied PTransforms.


// (Required) The encoded element to pass to the transform.
bytes element = 3;
// Lower bound on timestamps of elements that this PTransform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this description somewhat hard to grok. Some thoughts:

  1. Start with what the map is keyed by (rather than ending with it), since this context is useful for the rest of the explanation.
  2. Is the explanation here trying to define what an output watermark is? I find the explanation makes it harder to understand the usage of this particular field. Maybe it'd be simpler to just refer to it as an output watermark and let it be defined more fully somewhere else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworded, thanks.

// * a shared partitioned resource should use the partition identifier.
// * a uniquely partitioned resource such as a file range should set this to
// file name + start offset.
bytes partition = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a String (versus bytes?) The documentation refers to the empty string "", and it seems consistent with other identifiers which are strings.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's meant to be bytes, "" is a canonical representation for an empty byte string.


// Root applications that should replace the current bundle.
repeated Application primary_roots = 1;
// Contains additional monitoring information related to this application.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"related to" is ambiguous; can this be more precise? Is this monitoring info produced by the application so far? Or baseline monitoring data to be augmented by the current application?

Also, I see the ProcessBundleProgressResponse also contains monitoring_infos; are one of these fields redundant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a link to a doc that describes these additional signals.

Tthese monitoring infos give insights into a specific application that while the ProcessBundleProgressResponse contains monitoring info that is the aggregate of many applications.

message DelayedBundleApplication {
// Recommended amount of delay before the application should be executed
// by the runner.
google.protobuf.Duration delay = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the expectation that the runner applies this delay from the time when it gets around the processing the response? Would it be more accurate for the SDK to send the delay as an actual walltime timestamp calculated from when it produces the response?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering which would make more sense here as well. Either could work and this delay doesn't have to be honored by a Runner, it is just a recommendation.

I'll swap to timestamp in a follow-up PR.

// doing yet, so that it can be done in a separate bundle (perhaps in
// parallel with the current bundle).
// (Required) Specifies that the Runner would like the bundle to split itself
// such that it performs no more work then the backlog specified for each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then -> than

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

//
// The value is relative to the current scope of work of the bundle.
google.protobuf.DoubleValue fraction_of_remainder = 2;
// If the backlog is unspecified for a PTransform, it would like that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update it to be more precise, i.e. "the runner requests that the SDK process all data received for the PTransform"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lukecwik
Copy link
Member Author

lukecwik commented Nov 9, 2018

Run Java PreCommit

@swegner
Copy link
Contributor

swegner commented Nov 9, 2018

LGTM, thanks. Please squash and merge.

@lukecwik lukecwik merged commit 17c2da6 into apache:master Nov 9, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants