[BEAM-4297] Streaming executable stage translation and operator for portable Flink runner.#5407
Conversation
f1127a0 to
6c8c1c7
Compare
|
|
||
| /** Creates a mapping from PCollection id to output tag integer. */ | ||
| private static BiMap<String, Integer> createOutputMap(Iterable<String> localOutputs) { | ||
| static BiMap<String, Integer> createOutputMap(Iterable<String> localOutputs) { |
There was a problem hiding this comment.
Please move this into a shared utility class.
720d134 to
abeb4f2
Compare
| BiMap<String, Integer> outputMap = | ||
| FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); | ||
| Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap(); | ||
| for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { |
There was a problem hiding this comment.
A tree set of what? The intent here is to order the coders (the values in the map) by their tags (the keys).
There was a problem hiding this comment.
I have cleaned up this portion of the code.
| } | ||
|
|
||
| String inputPCollectionId = | ||
| Iterables.getOnlyElement(transform.getInputsMap().values()); |
There was a problem hiding this comment.
Use ExecutableStage and rely on using getInputPCollection().getId() method.
Ditto for updating this in the batch pipeline translator as well.
There was a problem hiding this comment.
As above, we need to pass a serializable representation to operators. We could create an ExecutableStage here and then reconstruct it on the runner. In that case, we need to ensure that two ExecutableStages constructed from the same ExecutableStagePayload are always equivalent (including any synthetic ids that may be generated). That should be the case as of now, but we need to be careful.
There was a problem hiding this comment.
Unfortunately the way in which we construct coders and other properties of the ExecutableProcessBundleDescriptor are done during execution and it would be best if we somehow could make all these details be stable during pipeline translation so during execution it doesn't change. Having Flink rely on calling WireCoders.instantiateRunnerWireCoder(...) is an anti-pattern for encapsulation.
So if we need to construct the executable stage payload twice, we could make the contract have it be stable regardless the number of times it is constructed. I just want to push more of the input/output/coder/state/side input information upto translation time instead of having it deep within execution. In my opinion, only service (ApiServiceDescriptor) binding should happen there.
There was a problem hiding this comment.
Unfortunately, Flink needs to have an associated serializer (TypeInformation, aka Coder) with each distributed collection. This TypeInformation needs to be known at pipeline construction time. It need not match the exact coder being used to materialize elements over gRPC, but it does need to match the in-memory element type.
We could get around this partially by representing everything as bytes. The downside is that each runner-native operation that requires structure (e.g., GBK) will require an additional operation to break elements into their constituent parts. This step itself also requires knowledge of the coded type, so we ultimately run into the same issue.
There was a problem hiding this comment.
I'm just arguing for pushing most of the manipulation done within ExecutableProcessBundleDescriptor into the ExecutableStage payload (minus the ApiServiceDescriptor binding) so it doesn't need modification. This would allow the ExecutableStage to concretely answer what are the input coders, output coders, side input coders, state coders, ... in addition to any other information.
Longer term it seems if we had a way for the runner to say whether we need a keyed input context or grouped keyed output context makes sense as the runner could then say. These are the cases I know of:
- KV<Key, Value> for SplittableDoFn input, StatefulDoFn input, GBK input, Multimap side input materialization input, window mapping input
- KV<Key, Iterable> for GBK output
Do you know of any others?
There was a problem hiding this comment.
I added a test for serialization. If we agree on the repeated instantiation of ExecutableStage, then I can take this up in a separate PR (for both, batch and streaming translation). I would do that once we have test and end-to-end coverage, right now the translators are still not wired.
| throw new RuntimeException("executable stage translation not implemented"); | ||
| // TODO: is this still relevant? | ||
| // we assume that the transformation does not change the windowing strategy. | ||
| RunnerApi.WindowingStrategy windowingStrategyProto = |
There was a problem hiding this comment.
ExecutableStage's can change the windowing strategy as they may execute assign windows within. So a previous executable stage may have changed the windowing strategy.
There was a problem hiding this comment.
Removed, it was a remnant of translation code from old runner (only used in case of stateful ParDo). Since key handling and output encoding will happen in the SDK harness, we probably won't need to know the windowing strategy in the operator.
| // we assume that the transformation does not change the windowing strategy. | ||
| RunnerApi.WindowingStrategy windowingStrategyProto = | ||
| pipeline.getComponents().getWindowingStrategiesOrThrow( | ||
| pipeline.getComponents().getPcollectionsOrThrow( |
There was a problem hiding this comment.
You should rely on getInputPCollection().pcollection() from the ExecutableStage that you could construct above from the payload.
| Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap(); | ||
| for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { | ||
| String collectionId = outputs.get(localOutputName); | ||
| Coder<WindowedValue<?>> windowCoder = (Coder) instantiateCoder(collectionId, components); |
There was a problem hiding this comment.
Rely on creating an ExecutableStage here and its getOutputPCollections method. You can pass forward this ExecutableStage to the ExecutableStageDoFnOperator. Anywhere below where you need to get the input pcollection (and in the future state/timer or side input information), you can get it from this object.
I can see how having access to the coders would be important here and that the ExecutableStage should contain this information without needing to bind the data/state service that the ExecutableProcessBundleDescriptor does (currently that ExecutableProcessBundleDescriptor is munging the payload and making small albeit important modifications).
There was a problem hiding this comment.
Unfortunately, we cannot simply pass the ExecutableStage onto operators here. Flink requires that operators be serializable. Operator constructors run on the client JVM, but operators are initialized via lifecycle methods on TaskManagers. For this reason, we use the executable stage payload in the batch translator. The same applies here.
We could decide to use a different serialized representation here for operator tasks, but it seems convenient here to reuse what we already have.
There was a problem hiding this comment.
As commented above, we should be able to recreate the ExecutableStage multiple times and only bind the service (ApiServiceDescriptor) information should happen upstream.
| public static BiMap<String, Integer> createOutputMap(Iterable<String> localOutputs) { | ||
| ImmutableBiMap.Builder<String, Integer> builder = ImmutableBiMap.builder(); | ||
| int outputIndex = 0; | ||
| for (String tag : localOutputs) { |
There was a problem hiding this comment.
Sort the localOutputs to get a stable indexing otherwise multiple calls to createOutputMap won't be stable.
| @Mock private RuntimeContext runtimeContext; | ||
| @Mock private DistributedCache distributedCache; | ||
| @Mock private FlinkExecutableStageContext stageContext; | ||
| @Mock private StageBundleFactory stageBundleFactory; |
There was a problem hiding this comment.
It might be easier to follow the testing strategy employed here:
and use the InProcessSdkHarness TestRule to setup the tests instead of mocks.
If not, can review the tests as is.
There was a problem hiding this comment.
I think the mock based test is good for covering just the operator class, without other dependencies. InProcessServerFactory might be a good way to write an integration test that also covers the translator, outside of the validate runner suite. I can probably do that as follow-up.
e44f783 to
424ed38
Compare
…ortable Flink runner.
424ed38 to
540d36e
Compare
540d36e to
c6a0bf3
Compare
Executable stage translation for streaming mode based on the generic Flink streaming operator. Stage execution and tests adopted from batch translation.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username) to look at it.