[BEAM-8157][backport] Add LengthPrefixCoder around unknown key coder for stateful ProcessBundleDescriptor#10077
Merged
mxm merged 3 commits intoapache:release-2.17.0from Nov 13, 2019
Merged
Conversation
…ul ProcessBundleDescriptor
Flink and other Runners use the serialized key in stateful ExecutableStages to
key partition the data. When state requests are issued from the SDK Harness the
key is also sent over serialized. However, the binary representation of that key
currently does not always match the Runner's representation. This causes
troubles with persisting keyed state for checkpoints.
When the key coder is known by the Runner, then it uses the same encoding scheme
as the SDK Harness. However, when the coder is unknown, it will be replaced by a
"LengthPrefixCoder<ByteArrayCoder>" which is problematic because the SDK Harness
does not add this coder and thus may produce a different encoding as the Runner.
The solution is to add a LengthPrefixCoder around unknown key coders in the
ProcessBundleDescriptor, such that the SDK Harness will length prefix the key
correctly.
Otherwise we run into this error:
```
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 4: Traceback (most recent call last):
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 168, in _execute
response = task()
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 201, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in do_instruction
request.instruction_id)
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 667, in process_bundle
data.ptransform_id].process_encoded(data.data)
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "pricingrealtime/event_processing/stateful_event_processing.py", line 55, in process
recent_events_map = StatefulEventDoFn._load_recent_events_map(recent_events_state)
File "pricingrealtime/event_processing/stateful_event_processing.py", line 127, in _load_recent_events_map
items_in_recent_events_bag = [e for e in recent_events_state.read()]
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
for elem in self.first:
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 723, in _materialize_iter
self._underlying.get_raw(state_key, continuation_token)
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
continuation_token=continuation_token)))
File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 637, in _blocking_request
raise RuntimeError(response.error)
RuntimeError: java.lang.IllegalStateException: The current key '[1, -104, -97, -93, -34, -73, -128, -42, 36]' with key group index '274' does not belong to the key group range 'KeyGroupRange{startKeyGroup=153, endKeyGroup=154}'. Runner KeyCoder: LengthPrefixCoder(ByteArrayCoder). Ptransformid: ref_AppliedPTransform_process_events_with_stateful_dofn_23 Userstateid: recent_events
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:531)
at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.prepareStateBackend(ExecutableStageDoFnOperator.java:387)
at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:309)
at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:303)
at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleGetRequest(StateRequestHandlers.java:468)
at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handle(StateRequestHandlers.java:415)
at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.handle(StateRequestHandlers.java:206)
at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:130)
at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:118)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[while running 'process_events_with_stateful_dofn']
```
tweise
approved these changes
Nov 12, 2019
Contributor
Author
|
Run Python PreCommit |
Contributor
Author
|
It looks like the Dataflow image has not been built for 2.17.0. Merging. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Backport of #9997.
CC @tweise @lukecwik @Ardagan
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.