Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4285] Implement Flink batch side input handler #5688

Closed
wants to merge 3 commits into from

Conversation

bsidhom
Copy link
Contributor

@bsidhom bsidhom commented Jun 19, 2018

This implements a state request handler for batch side inputs in the portable Flink runner. The core implementation is the same as in the non-portable runner: PCollections used as side inputs are materialized as (in-memory) broadcast variables. These materialized elements are then exposed by a side input state handler.

Note that we construct a duplicate ExecutableProcessBundleDescriptor here rather than use an ExecutableStage. This is because the latter exposes side input details (most importantly coder structure) that are not available in ExecutableStage. It would be cleaner to migrate side input utilities from executable bundle descriptors directly to ExecutableStage, but that refactor should be done in a separate change.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

@bsidhom
Copy link
Contributor Author

bsidhom commented Jun 19, 2018

R: @jkff @lukecwik
CC: @angoenka @axelmagn

@jkff
Copy link
Contributor

jkff commented Jun 19, 2018

Thanks Ben; before I take a look: have you tried patching this into #5688 and checking whether wordcount starts working then? I think it's the only missing piece.

@angoenka
Copy link
Contributor

Wordcount will also need GCS credential propagation and Caching change.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

sideInputToCollection.get(
SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
checkArgument(collectionNode != null, "No side input for %s/%s", transformId, sideInputId);
List<Object> broadcastVariable = runtimeContext.getBroadcastVariable(collectionNode.getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a List<WindowedValue<KV<K, V>>>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out it can! I assumed this could only be List<Object> because that's how it's used elsewhere. Thanks for pointing this out.

}

@Override
public Iterable<V> get(K key, W window) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, is this the main input window or the side input window? (if it's the latter then it's fine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the side input window.

// below. This only uses the bundle descriptor for side input specs and effectively ignores
// data and state endpoints. We rely on the fact that PCollections and coders are structurally
// identical between instantiations here to prevent having to wire the original executable
// bundle descriptor here. The correct long-term fix is to move side input logic out of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this note. What is the problem with the current code, and which side input logic are you referring to that must be moved into ExecutableStage? Would that require also changing ExecutableStagePayload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, the problem is that ExecutableProcessBundleDescriptor does not do quite what we want right now. It was intended to be an instantiation ExecutableStage with the gRPC endpoints filled out. All the other logic should go into ExecutableStage itself. However, we now have side input logic and coder instantiation on EPBD in addition to gRPC endpoints. This general pipeline logic would be better refactored into ExecutableStage because we don't have access to the data or state API endpoints here and cannot fill those out.

Luckily, this is not a matter of correctness because EPBD construction (at least for the component ids that we care about) is deterministic. It's just strange to pass in API descriptors where we do not have them and where they will not be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I looked at the code of EPBD, and it seems that extracting that logic would take up less code in the current PR than taken up by this comment ;) So I recommend to do that in this PR.

Copy link
Contributor Author

@bsidhom bsidhom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed comments.

sideInputToCollection.get(
SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
checkArgument(collectionNode != null, "No side input for %s/%s", transformId, sideInputId);
List<Object> broadcastVariable = runtimeContext.getBroadcastVariable(collectionNode.getId());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out it can! I assumed this could only be List<Object> because that's how it's used elsewhere. Thanks for pointing this out.

}

@Override
public Iterable<V> get(K key, W window) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the side input window.

// below. This only uses the bundle descriptor for side input specs and effectively ignores
// data and state endpoints. We rely on the fact that PCollections and coders are structurally
// identical between instantiations here to prevent having to wire the original executable
// bundle descriptor here. The correct long-term fix is to move side input logic out of
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, the problem is that ExecutableProcessBundleDescriptor does not do quite what we want right now. It was intended to be an instantiation ExecutableStage with the gRPC endpoints filled out. All the other logic should go into ExecutableStage itself. However, we now have side input logic and coder instantiation on EPBD in addition to gRPC endpoints. This general pipeline logic would be better refactored into ExecutableStage because we don't have access to the data or state API endpoints here and cannot fill those out.

Luckily, this is not a matter of correctness because EPBD construction (at least for the component ids that we care about) is deterministic. It's just strange to pass in API descriptors where we do not have them and where they will not be used.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

// below. This only uses the bundle descriptor for side input specs and effectively ignores
// data and state endpoints. We rely on the fact that PCollections and coders are structurally
// identical between instantiations here to prevent having to wire the original executable
// bundle descriptor here. The correct long-term fix is to move side input logic out of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I looked at the code of EPBD, and it seems that extracting that logic would take up less code in the current PR than taken up by this comment ;) So I recommend to do that in this PR.

@jkff
Copy link
Contributor

jkff commented Jun 20, 2018

Actually, I retract my comment. Extracting that logic requires another change that I'm working on (#5709). For now I'm going to just fix one bug in here (that Ankur identified) and merge.

@asfgit asfgit closed this in 1c42e90 Jun 21, 2018
@bsidhom bsidhom deleted the flink-state-handler branch June 21, 2018 18:11
charlesccychen pushed a commit to charlesccychen/beam that referenced this pull request Jul 24, 2018
charlesccychen pushed a commit to charlesccychen/beam that referenced this pull request Jul 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants