Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8157] Ensure key encoding for state requests is consistent across SDKs #9484

Merged
merged 2 commits into from Sep 24, 2019

Conversation

@mxm
Copy link
Contributor

mxm commented Sep 5, 2019

The key encoding for state requests across the SDKs, i.e. Python and Java, is
not consistent. Java uses a NESTED encoding, whereas Python uses OUTER. We
should settle on one encoding.

The Flink runner so far uses OUTER encoding, but it seems sensible to switch to
NESTED encoding since inferring the OUTER encoding for a key coder in a KV<K,V>
structure is a potential source of error.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@mxm mxm requested a review from tweise Sep 5, 2019
@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 5, 2019

@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 5, 2019

It might be worth to add a test for the state handler, to guard against changes in the future.

@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 5, 2019

Test failures unrelated to changes in this PR: https://builds.apache.org/job/beam_PreCommit_Java_Commit/7598/

Test Result (2 failures / +2)

    org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
    org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
@tweise

This comment has been minimized.

Copy link
Contributor

tweise commented Sep 6, 2019

Unfortunately with this change the test pipeline fails:

RuntimeError: java.lang.RuntimeException: Failed to remove nested context from key: �XXXXXX
	at org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils.removeNestedContext(FlinkKeyUtils.java:85)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.prepareStateBackend(ExecutableStageDoFnOperator.java:370)
@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 7, 2019

Thanks for testing. I've pushed an update. It was wrong to assume we could simply strip the length prefix from the ByteBuffer, as it simply holds whatever bytes the original coder produced. We need to perform an encoding round-trip to recover the original key in OUTER encoding.

@tweise Could you please test again with the new update?

@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 7, 2019

Run Java Flink PortableValidatesRunner Streaming

@mxm mxm force-pushed the mxm:BEAM-8157 branch 2 times, most recently from f17ac5f to 98d2f53 Sep 8, 2019
@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 8, 2019

Updated the description and squashed the commit. After we give this another test, this should be good to be merged.

@tweise

This comment has been minimized.

Copy link
Contributor

tweise commented Sep 9, 2019

This actually fails the test that I had provided in the gist.

RuntimeError: java.lang.IllegalArgumentException: Forbidden IOException when reading from InputStream
        at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
        at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils.removeNestedContext(FlinkKeyUtils.java:81)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.prepareStateBackend(ExecutableStageDoFnOperator.java:351)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:283)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:278)
        at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleGetRequest(StateRequestHandlers.java:435)
        at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handle(StateRequestHandlers.java:400)
        at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.handle(StateRequestHandlers.java:205)
        at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:130)
        at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:118)
        at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
        at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
        at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
        at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: reached end of stream after reading 1 bytes; 107 bytes expected
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:780)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:762)
        at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:108)
        at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
        at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 9, 2019

@tweise Are you sure you tested with the most recent version of this PR? I had updated the PR. Sorry if that was not clear. We need the orginal coder used to removed the nested encoding, since the ByteStringCoder in the state handler does not provide any information of the key encoding used.

@tweise

This comment has been minimized.

Copy link
Contributor

tweise commented Sep 9, 2019

I'm reasonably sure. But if you tell me that you tested your latest change with the pipeline in the gist then I can repeat it.

@lukecwik

This comment has been minimized.

Copy link
Member

lukecwik commented Sep 11, 2019

The coder for the main input's key should match the coder being chosen for the state key.

e.g., if the main input coder is KV<LP<K>, LP<V>> then the state key coder should be LP<K> since encode(LP<K>) != encode(K) and even if the runner drops the length prefixing from encode(LP<K>)'s bytes, it still may not be equal to encode(K).

@mxm mxm force-pushed the mxm:BEAM-8157 branch 2 times, most recently from 89a29d9 to 3480907 Sep 19, 2019
@mxm mxm changed the title [BEAM-8157] Remove length prefix from state key for Flink's state backend [BEAM-8157] Ensure key encoding for state requests is consistent across SDKs Sep 19, 2019
@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 19, 2019

@lukecwik @tweise @robertwb PR is updated. Encoding for state keys was not consistent across the Java and Python SDK. This has been addressed here by letting them both use the NESTED context, i.e. adjusting the key encoding in the Python SDK.

@mxm mxm requested review from tweise and robertwb Sep 19, 2019
@mxm mxm force-pushed the mxm:BEAM-8157 branch 2 times, most recently from b9e2dc1 to 92331d8 Sep 19, 2019
mxm added 2 commits Sep 19, 2019
…ss SDKs

The key encoding for state requests across the SDKs, i.e. Python and Java, is
not consistent. Java uses a NESTED encoding, whereas Python uses OUTER. We
should settle on one encoding.

The Flink runner so far uses OUTER encoding, but it seems sensible to switch to
NESTED encoding since inferring the OUTER encoding for a key coder in a KV<K,V>
structure is a potential source of error.
@mxm mxm force-pushed the mxm:BEAM-8157 branch from 92331d8 to 61b8fa0 Sep 20, 2019
@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 20, 2019

Any opinions here? I think this warrants some attention to finally fix the inconsistent key encoding across SDKs.

@tweise
tweise approved these changes Sep 23, 2019
Copy link
Contributor

tweise left a comment

With these changes, the manual test passes. The only remaining question I have is regarding the test coverage. When the key encoding was changed prior to the Python SDK change, this PR was green, but my test failed. Are we missing (Flink runner) integration test coverage for state and timers for Python?

@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 24, 2019

When the key encoding was changed prior to the Python SDK change, this PR was green, but my test failed. Are we missing (Flink runner) integration test coverage for state and timers for Python?

I had assumed that the Python SDK would do nested encoding for the key in StateRequest, just like Java. Turns out, it did not. That was the reason why after removing the nested context for Java (first version of the PR), your tests failed because it would try removing the nested context from the Python encoded key, which was encoded using outer context.

Now, the real problem is that the inconsistent key encoding is shadowed by Flink <=1.8 because it accepts keys for the state backend which do not belong to the partition key range. The Flink 1.9 (#9296) made it visible that the Java SDK was using the nested encoding, contrary to Python and the Runner itself, which were using the outer encoding. The reason the Python PVR tests in the Flink 1.9 PR were not failing was that we are using parallelism 1, which I had advised against in the past (and I now have a better reason why). So we should increase the parallelism in a follow-up.

@mxm mxm merged commit fce6380 into apache:master Sep 24, 2019
8 checks passed
8 checks passed
Java ("Run Java PreCommit") SUCCESS
Details
JavaPortabilityApi ("Run JavaPortabilityApi PreCommit") SUCCESS
Details
Java_Examples_Dataflow ("Run Java_Examples_Dataflow PreCommit") SUCCESS
Details
Portable_Python ("Run Portable_Python PreCommit") SUCCESS
Details
Python ("Run Python PreCommit") SUCCESS
Details
Python_PVR_Flink ("Run Python_PVR_Flink PreCommit") SUCCESS
Details
RAT ("Run RAT PreCommit") SUCCESS
Details
Spotless ("Run Spotless PreCommit") SUCCESS
Details
@sunjincheng121

This comment has been minimized.

Copy link
Member

sunjincheng121 commented Sep 27, 2019

Hi @mxm, Sorry for late reply. I go through the changes of this PR and found that the solution in this PR is the same as #9464. The functionality of Java side changes is the same as #9464. Regarding to the Python part, it's a good catch. I think it will be better to recover the changes of #9464 and add fixes for the Python part on top of it. I don't think it's a best way to revert the contributions in this case. Even we revert a commit in some reason, we should also recover it if we finally find the original solution is correct. This is a respect for the contributors and the committers.

What do you think? @mxm @tweise @lukecwik

Best, Jincheng

@mxm

This comment has been minimized.

Copy link
Contributor Author

mxm commented Sep 27, 2019

I very much appreciate your work on this @sunjincheng121. Let it be said, your contributions are recognized. I also have to repeat that your solution in #9464 was incomplete. Not only did it not adhere to our contribution standard because it was originally without a JIRA ("hotfix"), but it also broke the Python support, as our tests showed. It required careful investigation to find out the root of the problem and to not break any other SDK as the result of a quick fix. We could not iterate on your PR because it was already merged, so reverting it was the only option.

The changes, while similar, are not identical. I could have based them on your original PR, but that would have still required to do more changes, since there are differences related to the code and the tests. I know that getting one's commit reverted is not a good feeling. It happens to everyone eventually, and it is not to diminish your contributors in any way. Keep up the great contributions. I know that everyone in the Beam community greatly appreciates your work.

@sunjincheng121

This comment has been minimized.

Copy link
Member

sunjincheng121 commented Sep 29, 2019

Thanks for your reply in both this PR and #9464 @mxm, I left my thoughts in #9464. (I suggest that we can discuss in one PR).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.