Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10189] Add ReadModifyWriteState user state to python sdk #11916

Merged
merged 5 commits into from Jun 25, 2020

Conversation

y1chi
Copy link
Contributor

@y1chi y1chi commented Jun 4, 2020

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@y1chi
Copy link
Contributor Author

y1chi commented Jun 4, 2020

R: @angoenka @robertwb

@robertwb
Copy link
Contributor

robertwb commented Jun 4, 2020

Last time someone started adding this feature, we decided to call it ReadModifyWrite state.

@y1chi
Copy link
Contributor Author

y1chi commented Jun 4, 2020

Last time someone started adding this feature, we decided to call it ReadModifyWrite state.

Java SDK still calls this ValueState, ReadModifyWriteState is only used in beam_runner_api, wondering if we should change Java SDK as well if we want to call it ReadModifyWriteState in python?

@robertwb
Copy link
Contributor

robertwb commented Jun 4, 2020

Yes, the plan was to consider changing Java too, though that's harder due to backwards compatibility issues.

@y1chi
Copy link
Contributor Author

y1chi commented Jun 5, 2020

Yes, the plan was to consider changing Java too, though that's harder due to backwards compatibility issues.

Renamed to ReadModifyWriteState.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, modulo minor comments.

sdks/python/apache_beam/runners/direct/direct_userstate.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/userstate_test.py Outdated Show resolved Hide resolved
@angoenka
Copy link
Contributor

Retest this please

@ajamato
Copy link

ajamato commented Jun 18, 2020

@committer This is approved. Could someone commit this please?

@y1chi y1chi changed the title [BEAM-10189] Add ValueState user state to python sdk [BEAM-10189] Add ReadModifyWriteState user state to python sdk Jun 18, 2020
@y1chi
Copy link
Contributor Author

y1chi commented Jun 19, 2020

retest this please

@y1chi
Copy link
Contributor Author

y1chi commented Jun 19, 2020

Run Python2_PVR_Flink PreCommit

@y1chi
Copy link
Contributor Author

y1chi commented Jun 19, 2020

@mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail

# There should only be one user state token present
is this a known issue and being worked on?

@mxm
Copy link
Contributor

mxm commented Jun 20, 2020

@mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail

# There should only be one user state token present

is this a known issue and being worked on?

The Flink Runner only sends one cache token per worker and application attempt. Please see

@y1chi y1chi closed this Jun 22, 2020
@y1chi y1chi reopened this Jun 22, 2020
@y1chi
Copy link
Contributor Author

y1chi commented Jun 22, 2020

@mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail

# There should only be one user state token present

is this a known issue and being worked on?

The Flink Runner only sends one cache token per worker and application attempt. Please see

it looks like the fn-execution core module is trying to add a cache token for each state key?

for (BeamFnApi.ProcessBundleRequest.CacheToken cacheToken : handler.getCacheTokens()) {

@mxm
Copy link
Contributor

mxm commented Jun 22, 2020

It is true that the fn protocol supports one cache token per handler (e.g. user state or side input handler). Those handler do not change for the lifetime of the application. I'm still trying to understand what the problem is. Cache tokens have been working fine so far. Could you provide some logs or test cases which show that there is a problem?

@y1chi
Copy link
Contributor Author

y1chi commented Jun 22, 2020

It is true that the fn protocol supports one cache token per handler (e.g. user state or side input handler). Those handler do not change for the lifetime of the application. I'm still trying to understand what the problem is. Cache tokens have been working fine so far. Could you provide some logs or test cases which show that there is a problem?

Last time I checked with @lukecwik he mentioned the SDK is expecting one global cache token per-bundle for all user states, and one cache token per side-input.

The problem is that it seems we can't declare more than one user state in Stateful Dofn, otherwise the SDK fails. Such as the test_pardo_state_only_test I'm trying to update in this PR.

09:13:30 [flink-runner-job-invoker] ERROR org.apache.beam.runners.jobsubmission.JobInvocation - Error during job invocation test_pardo_state_only_1592842407.43_aafef3e7-73a3-4a63-be4c-4be6c6fc7b7d.
09:13:30 java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
09:13:30 	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
09:13:30 	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
09:13:30 	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:864)
09:13:30 	at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:194)
09:13:30 	at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:116)
09:13:30 	at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:83)
09:13:30 	at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:83)
09:13:30 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
09:13:30 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
09:13:30 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
09:13:30 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
09:13:30 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
09:13:30 	at java.lang.Thread.run(Thread.java:748)
09:13:30 Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
09:13:30 	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
09:13:30 	at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:175)
09:13:30 	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
09:13:30 	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
09:13:30 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
09:13:30 	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
09:13:30 	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
09:13:30 	at akka.dispatch.OnComplete.internal(Future.scala:264)
09:13:30 	at akka.dispatch.OnComplete.internal(Future.scala:261)
09:13:30 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
09:13:30 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
09:13:30 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
09:13:30 	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
09:13:30 	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
09:13:30 	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
09:13:30 	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
09:13:30 	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
09:13:30 	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
09:13:30 	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
09:13:30 	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
09:13:30 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
09:13:30 	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
09:13:30 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
09:13:30 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
09:13:30 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
09:13:30 	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
09:13:30 	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
09:13:30 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
09:13:30 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
09:13:30 	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
09:13:30 	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
09:13:30 	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
09:13:30 	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
09:13:30 Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
09:13:30 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
09:13:30 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
09:13:30 	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
09:13:30 	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
09:13:30 	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
09:13:30 	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
09:13:30 	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
09:13:30 	at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
09:13:30 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
09:13:30 	at java.lang.reflect.Method.invoke(Method.java:498)
09:13:30 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
09:13:30 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
09:13:30 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
09:13:30 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
09:13:30 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
09:13:30 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
09:13:30 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
09:13:30 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
09:13:30 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
09:13:30 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
09:13:30 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
09:13:30 	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
09:13:30 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
09:13:30 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
09:13:30 	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
09:13:30 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
09:13:30 	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
09:13:30 	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
09:13:30 	... 4 more
09:13:30 Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 247, in _execute
09:13:30     response = task()
09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 304, in <lambda>
09:13:30     lambda: self.create_worker().do_instruction(request), request)
09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 473, in do_instruction
09:13:30     getattr(request, request_type), request.instruction_id)
09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 505, in process_bundle
09:13:30     instruction_id, request.cache_tokens):
09:13:30   File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__
09:13:30     return self.gen.next()
09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 894, in process_instruction_id
09:13:30     assert not user_state_cache_token
09:13:30 AssertionError

@mxm
Copy link
Contributor

mxm commented Jun 23, 2020

Thanks for the stacktrace, that helped to figure out what's going on here. The issue is only present in batch mode where Flink does not use its own memory backend but uses Beam's InMemoryBagUserStateFactory.

We have to adapt the implementation to return the same cache token for all InMemorySingleKeyBagState, see:


@mxm
Copy link
Contributor

mxm commented Jun 23, 2020

I've created a PR which fixes the problem: #12062

@mxm
Copy link
Contributor

mxm commented Jun 25, 2020

Run Python2_PVR_Flink PreCommit

@mxm
Copy link
Contributor

mxm commented Jun 25, 2020

Run Python PreCommit

@angoenka angoenka merged commit 6c95955 into apache:master Jun 25, 2020
@y1chi y1chi deleted the BEAM-10189 branch June 25, 2020 18:39
@mxm
Copy link
Contributor

mxm commented Jun 26, 2020

Glad to see we were able to unblock the changes here!

@y1chi
Copy link
Contributor Author

y1chi commented Jun 26, 2020

Glad to see we were able to unblock the changes here!

Thanks for the fix!

@mxm
Copy link
Contributor

mxm commented Jun 26, 2020

My pleasure!

yirutang pushed a commit to yirutang/beam that referenced this pull request Jul 23, 2020
…e#11916)

* [BEAM-10189] Add ValueState user state to python sdk

* Rename to ReadModifyWriteState

* Rename _ValueStateTag to _ReadModifyWriteStateTag

* Use deterministic teststream in tests and minor fix to direct_userstate

* lint
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants