Skip to content

Flink*DoFnFunction: fix check for single-output dofns#3263

Closed
dhalperi wants to merge 2 commits into
apache:masterfrom
dhalperi:flink-output-manager
Closed

Flink*DoFnFunction: fix check for single-output dofns#3263
dhalperi wants to merge 2 commits into
apache:masterfrom
dhalperi:flink-output-manager

Conversation

@dhalperi
Copy link
Copy Markdown
Contributor

Fixes Findbugs and (presumably) increases efficiency by using the right OutputManager.

R: @aljoscha but since this is a PostCommit break we may merge sooner if tests pass.

R: @tgroh @kennknowles

Fixes Findbugs and (presumably) increases efficiency
@coveralls
Copy link
Copy Markdown

Coverage Status

Changes Unknown when pulling 1590032 on dhalperi:flink-output-manager into ** on apache:master**.

@asfbot
Copy link
Copy Markdown

asfbot commented May 31, 2017

--none--

@dhalperi
Copy link
Copy Markdown
Contributor Author

Build was not green - failed in integration tests.

@aljoscha
Copy link
Copy Markdown
Contributor

Do you have a link to the failed integration tests?

@dhalperi
Copy link
Copy Markdown
Contributor Author

@aljoscha :

SEVERE: Error in task code:  CHAIN MapPartition (MapPartition at ParMultiDo(AddTimestamp)) -> FlatMap (FlatMap at ParDo(AddTimestamp)/ParMultiDo(AddTimestamp).out0) -> FlatMap (Window.Into()/Window.Assign.out) (1/1)
org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
	at org.apache.beam.examples.WindowedWordCount$AddTimestampFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
	at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
	at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
	at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)
	at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82)
	at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
	at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:149)
	at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
	at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:433)
	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:420)
	at org.apache.beam.examples.WindowedWordCount$AddTimestampFn.processElement(WindowedWordCount.java:120)

This is from running WindowedWordCountIT in batch mode.

Since the old code would simply never have invoked the single-output DoFnFunction, maybe it's broken?

@dhalperi
Copy link
Copy Markdown
Contributor Author

dhalperi commented May 31, 2017

@aljoscha in e0434d3 I copied the code that replaces the value with a RawUnionValue, and that seems to have made the tests pass. Error messaging could be better here.

@dhalperi
Copy link
Copy Markdown
Contributor Author

And flink module + flink ITs passed.

@dhalperi
Copy link
Copy Markdown
Contributor Author

Going ahead and self-merging to unbreak the build. Please continue to review when you have time, @aljoscha

Thanks!

@asfgit asfgit closed this in 4884d48 May 31, 2017
@dhalperi dhalperi deleted the flink-output-manager branch May 31, 2017 18:36
@coveralls
Copy link
Copy Markdown

Coverage Status

Changes Unknown when pulling e0434d3 on dhalperi:flink-output-manager into ** on apache:master**.

@asfbot
Copy link
Copy Markdown

asfbot commented May 31, 2017

Build finished.
--none--

@aljoscha
Copy link
Copy Markdown
Contributor

aljoscha commented Jun 1, 2017

@dhalperi LGTM! Thanks for fixing this quickly and merging already. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants