Skip to content

Conversation

@jto
Copy link
Contributor

@jto jto commented Nov 29, 2023

See #29547 for context


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@jto jto force-pushed the jto/flink-fix-view branch from 797cf67 to 8e38182 Compare November 29, 2023 09:45
@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @johnjcasey added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Abacn
Copy link
Contributor

Abacn commented Nov 29, 2023

Run Flink ValidatesRunner

@Abacn
Copy link
Contributor

Abacn commented Nov 29, 2023

Run Flink ValidatesRunner Java 11

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick fix! The failure was in BatchWithDataStream mode while this fix code path is generic. Could this affect the default (withdatastream not enabled or streaming) ? Relatedly, would you mind explain a little bit how the fix works?

@jto
Copy link
Contributor Author

jto commented Nov 29, 2023

I shamelessly copied the implementation from the DirectRunner:

if (view.getViewFn() instanceof PCollectionViews.IsSingletonView) {
iterable =
input
.apply(
MapElements.into(TypeDescriptors.iterables(input.getTypeDescriptor()))
.via(Lists::newArrayList))
.setCoder(IterableCoder.of(input.getCoder()));
.

The DataflowRunner also does something very similar:

if (view.getViewFn() instanceof PCollectionViews.IsSingletonView) {
elements =
input.apply(
ParDo.of(
new DoFn<ElemT, List<ElemT>>() {
@DoFn.ProcessElement
public void process(@Element ElemT elemT, OutputReceiver<List<ElemT>> o) {
List<ElemT> elements = Lists.newArrayListWithExpectedSize(1);
elements.add(elemT);
o.output(elements);
}
}));
} else {
elements = input.apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults());
}

There's an explanation of how that works in the DF fix: #25940

Co-authored-by: Yi Hu <huuyyi@gmail.com>
@jto jto force-pushed the jto/flink-fix-view branch from 0b8cd58 to bc969af Compare November 29, 2023 17:57
Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@Abacn Abacn merged commit 2d58d4b into apache:master Nov 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants