-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5290] Ensure backwards compatibility of the hashes used to gen… #2966
[FLINK-5290] Ensure backwards compatibility of the hashes used to gen… #2966
Conversation
…erate JobVertexIds
Stephan, Aljoscha: I can take a look at this tomorrow or next week. |
} | ||
} | ||
|
||
private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really feel comfortable with duplicating this code? I think it's very likely that we'll change that in the future and then we'll probably forget to change it here to. Maybe abstract it away behind an interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly, for me this was actually a reason to duplicate the code: if we change the implementation in StreamingJobGraphGenerator
, we still want this implementation to produce the same hashes and need to provide a new StreamGraphHasher
altogether. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. We should probably not have this for the default hasher and only have it in the legacy hasher, then. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a very general side, I actually think the whole hashing mechanics to assign state is very fragile and not well maintainable w.r.t. changes. In the middle to long run, we should think about an alternative. In particular when we want to remove the current chaining, backwards compatibility will become a nightmare.
For the time being, the current default will/might also become a legacy at some point in time. I wonder if it makes sense to call this thing "default" after all. Maybe those things should just be versioned and the latest version is implicitly default. Actually I was wondering if the "right" approach is to always copy the code for each versions. Inheriting from some default will break the legacy if somebody touches default before making the current default a legacy. Tricky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's probably what we need.
@aljoscha I duplicated the code in different hasher versions and moved the old version to a migration package. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the changes look good now, minus some comments I had about Javadoc formatting.
/** | ||
* StreamGraphHasher from Flink 1.1. This contains duplicated code to ensure that the algorithm does not change with | ||
* future Flink versions. | ||
* <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be
...
future Flink versions.
<p>DO NOT MODIFY THIS CLASS
i.e. a blank line is missing and the <p>
can be on the same line, doesn't have to be, though.
* Returns a map with a hash for each {@link StreamNode} of the {@link | ||
* StreamGraph}. The hash is used as the {@link JobVertexID} in order to | ||
* identify nodes across job submissions if they didn't change. | ||
* <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate <p>
and no blank line. This appears several times in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I fixed those.
2f4e5f6
to
4dc67bf
Compare
@StefanRRichter Thanks for you work! 👍 I merged this, could you please close the Jira issue and this PR? |
The way in which hashes for JobVertexIds are generated changed between Flink 1.1 and 1.2 (parallelism was considered for the hash in 1.1). We need to be backwards compatible to old JobVertexId generation so that we can still assign state from old savepoints.
This PR introduced backwards compatibility for hashes, by considering alternative hashes through different
StreamGraphHasher
implementations.