Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-310] Add RootTransformEvaluatorFactory #1051

Closed
wants to merge 4 commits into from

Conversation

tgroh
Copy link
Member

@tgroh tgroh commented Oct 5, 2016

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

Use for Root Transforms.

These transforms generate their own initial inputs, which the Evaluator
is responsible for providing back to them to generate elements from the
root PCollections.

Update ExecutorServiceParallelExecutor to schedule roots based on the
provided transforms.

Some tests which have become no longer relevant are deleted within this PR.

@tgroh tgroh changed the title Add RootTransformEvaluatorFactory [BEAM-310] Add RootTransformEvaluatorFactory Oct 5, 2016
Use for Root Transforms.

These transforms generate their own initial inputs, which the Evaluator
is responsible for providing back to them to generate elements from the
root PCollections.

Update ExecutorServiceParallelExecutor to schedule roots based on the
provided transforms.
This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.
@tgroh
Copy link
Member Author

tgroh commented Oct 5, 2016

R: @lukecwik

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets talk about my comments about the interface hierarchy before we proceed further.

Note to self, review tests.

}

@Override
public void cleanup() {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup -> cleanUp

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing naming. If it's really important to you, I'm happy doing it in a separate PR, but it's not really relevant to this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not important

return createInitialSplits((AppliedPTransform) transform);
}

private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is to later replace this with actual splitting in a follow up PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

rootNodes = ImmutableList.copyOf(roots);
for (AppliedPTransform<?, ?, ?> root : roots) {
ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
pending.addAll(registry.getInitialInputs(root));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we really limited to a single root per bundle in the model or is this an implementation detail.
I'm imagining that with SDF that this could potentially be multiple "root sources" per bundle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A root bundle has a single transform associated with it, and that transform generates initial inputs sufficient to produce outputs - for SDF, this would be producing the initial restriction, or set of such restrictions, which would then be provided to the downstream Fn

if (ExecutorState.ACTIVE == startingState || (ExecutorState.PROCESSING == startingState
&& noWorkOutstanding)) {
if (ExecutorState.ACTIVE == startingState
|| (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also move && noWork... to next line for readability

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please still make this change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, sorry. Done.

@@ -419,6 +434,7 @@ private void fireTimers() throws Exception {
}
KeyedWorkItem<?, Object> work =
KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery);
LOG.warn("Delivering {} timers for {}", delivery.size(), keyTimers.getKey().getKey());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be there at all, actually.

@@ -31,21 +34,32 @@
* The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
* {@link PTransform}.
*/
class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
Copy link
Member

@lukecwik lukecwik Oct 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to split this into two implementations, one which is expected as part of a flatten with upstream transforms and one flatten which is not. Otherwise the methods that you have will have to be "dual" purposed and may cause issues about how they are used in the future.

For example cleanUp may need to do something for both the root version and something for the non-root version and then you may have to add switching logic to figure out which is which.

This would also allow you to guard against the usage of forApplication for something that never produced output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change removes all the logic associated with handling a nonexistent input (if the Flatten is a root)

* {@link Pipeline}. Provides a way to get initial inputs, which will cause the {@link PTransform}
* to produce all of the appropriate output.
*/
interface RootTransformEvaluatorFactory extends TransformEvaluatorFactory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it extend?

It seems as though the purpose of RootTransformEvaluatorFactory and TransformEvaluatorFactory are completely different, for example with the Flatten PTransform with 0 pcollection inputs, you would want it to be just a RootTransformEvaluatorFactory and not a regular TransformEvaluatorFactory

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not. I've split it into an independent interface. However, I'm (generally) keeping the input providers within the Evaluator Factory implementations, as the initial inputs have to match the type of the actual Evaluator.

@@ -77,15 +77,33 @@ private TransformEvaluatorRegistry(
}

@Override
public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example where splitting the two interfaces might be wise so that you have a mapping from certain root ptransforms to evaluators for roots, and this mapping for the non-root ones.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments and then LGTM

I like the usage of the static inner class to keep the code close together but with a defined boundary.

.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
.commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extraneous new lines

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* See the License for the specific language governing permissions and
* limitations under the License.
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra new line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

/**
* Created by tgroh on 10/6/16.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* See the License for the specific language governing permissions and
* limitations under the License.
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra new line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add new line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (ExecutorState.ACTIVE == startingState || (ExecutorState.PROCESSING == startingState
&& noWorkOutstanding)) {
if (ExecutorState.ACTIVE == startingState
|| (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please still make this change

@lukecwik
Copy link
Member

lukecwik commented Oct 6, 2016

LGTM, merging

@asfgit asfgit closed this in ecbc641 Oct 6, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants