Skip to content

[BEAM-9132] De-register state request handler last when closing ActiveBundle#10611

Closed
mxm wants to merge 1 commit intoapache:masterfrom
mxm:BEAM-9132
Closed

[BEAM-9132] De-register state request handler last when closing ActiveBundle#10611
mxm wants to merge 1 commit intoapache:masterfrom
mxm:BEAM-9132

Conversation

@mxm
Copy link
Contributor

@mxm mxm commented Jan 16, 2020

We have observed these errors in a state-intense application:

Error processing instruction 107. Original traceback is
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "redacted.py", line 56, in process
    recent_events_map = load_recent_events_map(recent_events_state)
  File "redacted.py", line 128, in _load_recent_events_map
    items_in_recent_events_bag = list(recent_events_state.read())
  File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
    for elem in self.first:
  File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
    self._state_key, self._coder_impl, is_cached=self._is_cached)
  File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
    self._materialize_iter(state_key, coder))
  File "apache_beam/runners/worker/sdk_worker.py", line 723, in _materialize_iter
    self._underlying.get_raw(state_key, continuation_token)
  File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
    continuation_token=continuation_token)))
  File "apache_beam/runners/worker/sdk_worker.py", line 637, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: Unknown process bundle instruction id '107'

Notice that the error is thrown on the Runner side. The solution is to
de-register the state request handler only after ensuring no more elements are
in-flight, i.e. the input receivers are closed.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

…eBundle

We have observed these errors in a state-intense application:

```
Error processing instruction 107. Original traceback is
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "redacted.py", line 56, in process
    recent_events_map = load_recent_events_map(recent_events_state)
  File "redacted.py", line 128, in _load_recent_events_map
    items_in_recent_events_bag = list(recent_events_state.read())
  File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
    for elem in self.first:
  File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
    self._state_key, self._coder_impl, is_cached=self._is_cached)
  File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
    self._materialize_iter(state_key, coder))
  File "apache_beam/runners/worker/sdk_worker.py", line 723, in _materialize_iter
    self._underlying.get_raw(state_key, continuation_token)
  File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
    continuation_token=continuation_token)))
  File "apache_beam/runners/worker/sdk_worker.py", line 637, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: Unknown process bundle instruction id '107'
```

Notice that the error is thrown on the Runner side. The solution is to
de-register the state request handler only after ensuring no more elements are
in-flight, i.e. the input receivers are closed.
@mxm mxm requested a review from lukecwik January 16, 2020 13:28
@mxm
Copy link
Contributor Author

mxm commented Jan 16, 2020

Run Java PreCommit

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we shouldn't get the process bundle response until the SDK received responses for all outstanding state requests and hence we should be able to deregister in the code where we are.

Its an error if we got a process bundle response back and the SDK still has outstanding state requests. There were some ideas about adding the id of the last state request to the process bundle response as a way to have the SDK send the state requests and then the process bundle response without needing to wait for pending state requests (to support something like blind writes)

Either an SDK isn't following what is expected of them or the bundle has errored out and error handling isn't correct.

@mxm
Copy link
Contributor Author

mxm commented Jan 16, 2020

I thought we assume state requests to be finished only when the input receivers are closed. However, as you pointed out, we shouldn't be receiving state requests after the ProcessBundleResponse has arrived.

Interesting optimization to send over the last pending state request id. I wonder how much this would improve performance because the bundle response would still be mostly blocked on the state requests before the last. For what it's worth, we have async writes already in the Python SDK.

I'm trying to gather some more information on the matter. As far as I can see, the Python SDK should be working correctly in terms of finishing the bundle. I'm kind of suspicious of the bundle processor caching logic which may be selecting the wrong state request handler when the environment expires for any reason.

@mxm
Copy link
Contributor Author

mxm commented Jan 27, 2020

Closing and opening a new PR as I've now identified the root cause.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants