New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-24651][iteration] Add bounded all-round iteration #13
Conversation
eda820a
to
d17dcb1
Compare
d17dcb1
to
49c658f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gaoyunhaii for opening the pr. I have a little question about building the criteria sub-topology.
In the pr you let the "Head" and "Tail" connect together by introducing a fake operator. Maybe we could the "Head" connect to the "Criteria" operator, which returning by the developer. I think it could avoid introducing a fake operator. Correct me if I miss something.
Thanks
flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationFactory.java
Outdated
Show resolved
Hide resolved
4ba3606
to
5a8880b
Compare
...n/src/test/java/org/apache/flink/iteration/itcases/BoundedAllRoundStreamIterationITCase.java
Outdated
Show resolved
Hide resolved
|
||
@Test(timeout = 60000) | ||
public void testSyncVariableOnlyBoundedIteration() throws Exception { | ||
try (MiniCluster miniCluster = new MiniCluster(createMiniClusterConfiguration(2, 2))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could use a same minicluster for all the test to reduce the time used in the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly I extract the process of minicluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. But I think you could create a jira to track this improvement.
*/ | ||
public class BoundedAllRoundStreamIterationITCase { | ||
|
||
private static BlockingQueue<OutputRecord<Integer>> result = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field could be final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also change this test to SharedObject
flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationFactory.java
Outdated
Show resolved
Hide resolved
|
||
@Test | ||
public void testTerminationCriteria() throws Exception { | ||
try (MiniCluster miniCluster = new MiniCluster(createMiniClusterConfiguration(2, 2))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above
5a8880b
to
efd697d
Compare
Very thanks @guoweiM for the review! will merge~ |
This PR adds the support of the bounded all-round iteration.