Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12823][datastream] PartitionTransformation supports DataExchan… #8721

Closed
wants to merge 1 commit into from

Conversation

ifndef-SleePy
Copy link
Contributor

…geMode property

What is the purpose of the change

  • Since StreamTransformation would also support batch runner in 1.9, the result partition type of StreamTransformation should not be hard-coded with PIPELINED_BOUNDED
  • We need to provide a way for upper level API upon StreamTransformation to configure the result partition type of edge

Brief change log

  • Expose a property DataExchangeMode of PartitionTransformation as an internal API of StreamTransformation
  • Pass the DataExchangeMode to StreamEdge
  • StreamingJobGraphGenerator chooses the appropriate result partition type based on DataExchangeMode of StreamEdge

Verifying this change

  • Add an unit test of StreamingJobGraphGeneratorTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 13, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@aljoscha
Copy link
Contributor

I think we should not reuse the existing DataExchangeMode because it is a runtime class in the flink-runtime package. At some point we want to decouple the StreamTransformations and the DataStream API from runtime so we should not introduce more dependencies. I would suggest to add a new enum next to PartitionTransformation (in the same package) for this purpose, it should probably also be declared @PublicEvolving.

@ifndef-SleePy
Copy link
Contributor Author

ifndef-SleePy commented Jun 14, 2019

Hi @aljoscha
Thanks for feedback. Good point! I have added a new ShuffleMode instead of reusing DataExchangeMode.

@flinkbot attention @aljoscha

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very good now. I had a comment about a test and about adding null checks for the new shuffle mode.

@@ -428,7 +436,11 @@ private void addEdgeInternal(Integer upStreamVertexID,
}
}

StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
if (shuffleMode == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it happen that shuffleMode is null? We should add a null check in PartitionTransform and mark the field @Nonnull.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can be null here. Because there might be no PartitionTransformation if user does not specify explicitly. And then partitioner and shuffleMode would be null here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense

@@ -69,6 +88,15 @@ public PartitionTransformation(StreamTransformation<T> input, StreamPartitioner<
return partitioner;
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like these redundant descriptions to much, this can be simplified to:

/**
 * Returns the {@link ShuffleMode} of this {@link PartitionTransformation}.
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -79,6 +107,6 @@ public PartitionTransformation(StreamTransformation<T> input, StreamPartitioner<

@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
throw new UnsupportedOperationException("Cannot set chaining strategy on Partition Transformation.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! 😅

@@ -133,8 +136,8 @@ public void testChainStartEndSetting() throws Exception {
public Integer map(Integer value) throws Exception {
return value;
}
})
.print();
}).setParallelism(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed? It seems to be an unrelated test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an unrelated change in a sense.
I found it when adding unit test. It's a potential unstable case since it depends on the StreamExecutionEnvironment.defaultLocalParallelism which must be not 1 in this case.
However I just found someone has fixed it yesterday. What a coincident! I can remove it from this PR.

@ifndef-SleePy
Copy link
Contributor Author

Hi @aljoscha
I have rebased master and addressed comments.

@aljoscha
Copy link
Contributor

Thanks! I think I can merge once Travis is green. 👌

@ifndef-SleePy
Copy link
Contributor Author

ifndef-SleePy commented Jun 17, 2019

Travis failed, there is an importing error in python module. I don't think it is caused by this PR. I will try to rebase master and trigger travis checking again.

@ifndef-SleePy
Copy link
Contributor Author

ifndef-SleePy commented Jun 17, 2019

Python module is fine but connectors module failed this time on Travis checking. It seems a travis relevant issue. I have filed a JIRA issue for this, https://issues.apache.org/jira/browse/FLINK-12866
Will trigger checking again through closing and reopening, hope it works.

@aljoscha
Copy link
Contributor

I merged this. Thanks for working on this with me! 👍

@aljoscha aljoscha closed this Jun 17, 2019
@ifndef-SleePy ifndef-SleePy deleted the FLINK-12823 branch June 17, 2019 13:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants