-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-2398][api-breaking] Introduce StreamGraphGenerator #988
Conversation
5c7ae90
to
4068120
Compare
Why isn't rebalance implied when the 2 operators don't have the same parallelism and partitioning is not defined? If you don't specify the partitioning (which defaults to forward) means that you don't specifically care, in this case implied rebalance is the most natural thing. It's arguable that if the user specifically says forward then we give an error but otherwise I this this just hurts the developers. |
If I understand correctly, this also this changes the semantics that we execute programs without sinks, and also topology branches which don't end in sinks. I personally don't like the fact that the each branch in the processing graph needs to end in a sink, it is rather artificial. |
* This represents a feedback point in a topology. The type of the feedback elements must not match | ||
* the type of the upstream {@code StreamTransformation} because the only allowed operations | ||
* after a {@code CoFeedbackTransformation} are | ||
* {@link org.apache.flink.streaming.api.transformations.TwoInputTransformation TwoInputTransformations}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woInputTransformation TwoInputTransformations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the issue? 😄
I am not sure that I understand this correctly: If a non parallel source is used does the user need to call |
About rebalance()/forward(). Yes, when the parallelism differs it throws an exception now. Previously, when a user did not specify a partition strategy, forward was assumed. This was valid for a change of parallelism, which led to either the degenerative case of only one downstream instance receiving elements (1 to n parallelism) or one or several downstream instances receiving skewed numbers of instances (m to n, where m > n). I think we can document forward as the default for n -> n parallelism and rebalance as default for n -> m parallelism and change the behavior. About the dangling operators, also true. I think before it was more an implementation artifact because the stream graph was basically being built form the sources. Now it is built from the sinks. I see that this can be good behavior and I can adapt the current code if we agree on this. |
It would be good to get some feedback from the others as well, but in general my arguments are the following:
|
+1 for rebalancing automatically between operators of different DOP. The batch API does the same. But it should really be "rebalance", not a form of forward that typically creates skew. I am somewhat indifferent to the sink-or-no-sink question. The batch API requires sinks strictly, and it makes total sense there. For streaming, it may be different. If we require sinks, we should have a simple function |
Yes, I think the automatic rebalance is good. My approach of throwing the exception was just the easiest way of dealing with the previously faulty behavior. I think people coming from storm are also used to just having operations that are executed even if you don't have a sink. So maybe we should keep that. |
Sorry for the lack of activity. I'm currently on vacation and will pick this up again when I'm back, next week. |
3fb3e16
to
1addbec
Compare
I changed it to execute dangling operators now. There is, however, a strange "feature" that won't work anymore. This code works on master: https://gist.github.com/aljoscha/bbe74309a31a16ca8413. It catches away the exception that results from not being able to determine the output type of the generic map. Then, when With this PR this won't work anymore. Upon This behavior is problematic since the TestStreamEnvironment is reused for several streaming tests. Tests fail in seemingly unconnected parts of the code because dangling operators without type information still linger in the execution environment. I mentioned this here: https://issues.apache.org/jira/browse/FLINK-2508 I have a quick fix for this, for now. I think, however, that the streaming tests need to be consolidated and the streaming environments also need to be refactored a bit. (In addition to the batch exec envs, because they should probably be reused in large parts for streaming.) |
522a6d0
to
6332a1a
Compare
I think this is good now. I adapted the Streaming Tests to always use |
6332a1a
to
59cbf17
Compare
What I did is basically adding every operator to the list of "sinks". When Are there any objections to merging this now? |
I also updated the documentation to reflect the changes in shipping strategies/partitioning. |
04b02a8
to
17334cf
Compare
I think this looks reasonable. |
This decouples the building of the StreamGraph from the API methods. Before the methods would build the StreamGraph as they go. Now the API methods build a hierachy of StreamTransformation nodes. From these a StreamGraph is generated upon execution. This also introduces some API breaking changes: - The result of methods that create sinks is now DataStreamSink instead of DataStream - Iterations cannot have feedback edges with differing parallelism - "Preserve partitioning" is not the default for feedback edges. The previous option for this is removed. - You can close an iteration several times, no need for a union. - Strict checking of whether partitioning and parallelism work together. I.e. if upstream and downstream parallelism don't match it is not legal to have Forward partitioning anymore. This was not very transparent: When you went from low parallelism to high dop some downstream operators would never get any input. When you went from high parallelism to low dop you would get skew in the downstream operators because all elements that would be forwarded to an operator that is not "there" go to another operator. This requires insertion of global() or rebalance() in some places. For example with most sources which have parallelism one. This also makes StreamExecutionEnvironment.execute() behave consistently across different execution environments (local, remote ...): The list of operators to be executed are cleared after execute is called.
17334cf
to
bac21bf
Compare
Manually merged |
* Fix a scala example which is using a wrong variable * Remove partitioning descriptions * partitioning parameters are already removed from IterativeStream#closeWith/DateStream#iterate * apache#988 * apache#4655
* Fix a scala example which is using a wrong variable * Remove partitioning descriptions * partitioning parameters are already removed from IterativeStream#closeWith/DateStream#iterate * apache#988 * apache#4655 This closes apache#5249.
* Fix a scala example which is using a wrong variable * Remove partitioning descriptions * partitioning parameters are already removed from IterativeStream#closeWith/DateStream#iterate * apache#988 * apache#4655 This closes apache#5249.
* Fix a scala example which is using a wrong variable * Remove partitioning descriptions * partitioning parameters are already removed from IterativeStream#closeWith/DateStream#iterate * apache#988 * apache#4655 This closes apache#5249.
* Fix a scala example which is using a wrong variable * Remove partitioning descriptions * partitioning parameters are already removed from IterativeStream#closeWith/DateStream#iterate * apache#988 * apache#4655 This closes apache#5249.
* Fix a scala example which is using a wrong variable * Remove partitioning descriptions * partitioning parameters are already removed from IterativeStream#closeWith/DateStream#iterate * apache#988 * apache#4655 This closes apache#5249.
This decouples the building of the StreamGraph from the API methods.
Before, the methods would build the StreamGraph as they go. Now the API
methods build a hierachy of StreamTransformation nodes. From these a
StreamGraph is generated upon execution.
This also introduces some API breaking changes:
of DataStream
previous option for this is removed.
together. I.e. if upstream and downstream parallelism don't match it
is not legal to have Forward partitioning anymore. This was not very
transparent: When you went from low parallelism to high dop some
downstream operators would never get any input. When you went from high
parallelism to low dop you would get skew in the downstream operators
because all elements that would be forwarded to an operator that is not
"there" go to another operator. This requires insertion of global()
or rebalance() in some places. For example with most sources which
have parallelism one.
This is from the Javadoc of StreamTransformation, it describes quite well how it works:
I still have to fix the Scala examples, but you can already comment on the overall idea and implementation.