Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation #7258

Closed
wants to merge 2 commits into from

Conversation

Clarkkkkk
Copy link
Contributor

@Clarkkkkk Clarkkkkk commented Dec 7, 2018

What is the purpose of the change

This PR throws a hard exception to remind developers while there's no stream node between two split transformation and recommend developers to use side output instead of split stream.

Brief change log

  • Validate the SplitTransformation before actually transform it.

Verifying this change

This change added tests and can be verified as follows:
DataStreamTest::testConsecutiveSplitRejection
DataStreamTest::testSelectBetweenConsecutiveSplitRejection
DataStreamTest::testUnionBetweenConsecutiveSplitRejection
DataStreamTest::testKeybyBetweenConsecutiveSplitRejection

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? No.

@Clarkkkkk Clarkkkkk changed the title Flink 11084 [Flink-11084]Fix incorrect output after consecutive split and select Dec 7, 2018
@Clarkkkkk Clarkkkkk changed the title [Flink-11084]Fix incorrect output after consecutive split and select [FLINK-11084]Fix incorrect output after consecutive split and select Dec 7, 2018
@Clarkkkkk
Copy link
Contributor Author

cc @twalthr

@Clarkkkkk
Copy link
Contributor Author

cc @aljoscha

@dawidwys dawidwys self-assigned this Dec 13, 2018
…s no stream node between two split transformation
@Clarkkkkk Clarkkkkk changed the title [FLINK-11084]Fix incorrect output after consecutive split and select [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation Dec 17, 2018
@walterddr
Copy link
Contributor

Thanks for the PR @Clarkkkkk . based on the JIRA discussion, should we also put a @deprecate tagging on the .split method as well?

I fully agree consecutive split are buggy features within proper safeguarding and also can be done much more elegantly using side output. Any other concerns that we want to keep the splitTransform feature?

@Clarkkkkk
Copy link
Contributor Author

@walterddr Good point, I will tag it.

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generally looks good. Had just a concern about applying split to side-output. Could you check if that combination works?


private <T> void validateSplitTransformation(StreamTransformation<T> input) {
if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
throw new IllegalStateException("Error while tranforming SplitTransformation, please use side output instead.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change the exception message to sth along the lines: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.

@@ -643,4 +645,18 @@ private String determineSlotSharingGroup(String specifiedGroup, Collection<Integ
return inputGroup == null ? "default" : inputGroup;
}
}

private <T> void validateSplitTransformation(StreamTransformation<T> input) {
if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about SideOutputTransformation? Can we apply split on top of side-output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dawidwys Good point.


DataStreamSource<Integer> src = env.fromElements(0, 0);

OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you extract common outputSelector?

@Clarkkkkk
Copy link
Contributor Author

cc @dawidwys

dawidwys pushed a commit to dawidwys/flink that referenced this pull request Jan 10, 2019
…s no stream node between two split transformation

This closes apache#7258
dawidwys pushed a commit to dawidwys/flink that referenced this pull request Jan 10, 2019
…s no stream node between two split transformation

This closes apache#7258
dawidwys pushed a commit to dawidwys/flink that referenced this pull request Jan 10, 2019
…s no stream node between two split transformation

This closes apache#7258
@dawidwys
Copy link
Contributor

+1, merging when my travis gives green

dawidwys pushed a commit that referenced this pull request Jan 11, 2019
@dawidwys dawidwys closed this in e0efabe Jan 11, 2019
dawidwys pushed a commit that referenced this pull request Jan 11, 2019
tisonkun pushed a commit to tisonkun/flink that referenced this pull request Jan 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants