New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-11653][DataStream] Add configuration to enforce custom UID's o… #7747
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
* | ||
* <p>Auto generated UID's are enabled by default. | ||
* | ||
* @see #enableAutoGeneratedUIDs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to change @see #enableAutoGeneratedUUIDs()
-> @see {@link #enableAutoGeneratedUUIDs()}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little confused by what your asking. @see
automatically creates a link to the method if thats what you mean.
@@ -305,4 +308,15 @@ public void invoke(Integer value) throws Exception { | |||
} | |||
} | |||
} | |||
|
|||
@Test(expected = IllegalStateException.class) | |||
public void testDisabledAutoUIDGenerator() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding a test that enabled auto-generated uid
@flinkbot approve consensus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should get back to how do we want to approach this issue.
There are two mechanisms for setting hashes:
- setUid - current, recommended approach
- setUidHash - legacy mode, that works sort of like a fallback in case when the
setUid
cannot work
In current version of this PR, we would enforce users to always use the setUidHash
, which is probably not what we want to do. I think ensuring that at least one of those is set will be extremely hard. There is no obvious one place when we can introduce that check.
My suggestion would be to force only setUid
calls, as this is the recommended approach, whereas setUidHash
as far as I understand it is just a last resort method. What do you think? I would appreciate @aljoscha opinion on that as well.
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
Outdated
Show resolved
Hide resolved
|
||
/** | ||
* Disable's auto-generated UID's forces users to manually specify UID's | ||
* on their datastreams using {@code DataStream#uid} or {@code DataStream#setUidHash}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe let's not link DataStream#setUidHash
? As this is last resort, a hackish approach. What do you think?
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
Outdated
Show resolved
Hide resolved
* | ||
* <p>It is highly recommended that user's specify UID's before deploying to | ||
* production since they are used to match state in savepoints to operators | ||
* in a job. Because auto-generated ID's are likely to change when modifying |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* in a job. Because auto-generated ID's are likely to change when modifying | |
* in a job. Because auto-generated IDs are likely to change when modifying |
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
Outdated
Show resolved
Hide resolved
* @see #enableAutoGeneratedUIDs() | ||
* @see #disableAutoGeneratedUIDs() | ||
*/ | ||
public boolean hasAutoGeneratedUIDsDisabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public boolean hasAutoGeneratedUIDsDisabled() { | |
public boolean isAutoGeneratedUIDsDisabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about inverting the check? isAutoGeneratedUIDsEnabled
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going for consistency with hasGenericTypesDisabled
but I don't have a strong preference here.
Thanks for the review @dawidwys, that is the correct understanding of |
ca3ff21
to
5f0bd6a
Compare
|
5f0bd6a
to
d36ec88
Compare
d36ec88
to
3b3e225
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sjwiesman for the update. I've put some additional comments.
} | ||
|
||
@Test | ||
public void testDisabledAutoUIDWithManuallySetId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't all DisabledAutoUID*
tests fail currently? Each of the test have operators without uids assigned (e.g. .map(new NoOpIntMap())
, .addSink(new DiscardingSink<>())
),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, again because of operator chaining. That’s why the third test explicitly disables chaining in one place to check that condition.
@@ -348,6 +348,13 @@ private StreamConfig createJobVertex( | |||
"Did you generate them before calling this method?"); | |||
} | |||
|
|||
if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is the best place to check it.
Why don't we move the check to org.apache.flink.streaming.api.graph.StreamGraphGenerator#transform
? I think what we want to effectively enforce is that a StreamTransformation
has either uid
or userHash
set, don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we can’t do that, flink only uses the uid for the first operator in each operator chain. Most users seem to understand that and only set UIDs in those places. This is the only place we can check that the opeator id assigned to each job vertex is from a manually set uid which is what we’re really after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to disagree. AFAIK for state mapping purposes all OperatorIds are preserved, even those that are chained into a single operator chain (a single chained JobVertex), otherwise it would be impossible to e.g. restore job with changed chaining, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the uid for the head operator in a chain is used to generate the OperatorId for a JobVertex, and I am not familiar with uids being anywhere else to partition state within a single vertex. I believe it is impossible to restore a job with changed chaining but I will double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure you can, and all operators are used to generate OperatorId. See org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator#createJobVertex:368-394
.
Also see the ctor of JobVertex
, it's true that the id of head operator is used as primaryId
, but all chained operator ids are passed as operatorIds
, which is later used in e.g. org.apache.flink.runtime.checkpoint.StateAssignmentOperation#assignTaskStateToExecutionJobVertices
to assign proper state(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out, I had no idea you could restore with different chaining. I'll make the necessary updates.
|
||
/** | ||
* Disables auto-generated UIDs. Forces users to manually specify UIDs | ||
* on their datastreams using {@code DataStream#uid} or {@code DataStream#setUidHash} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually DataStream
has neither uid
nor setUidHash
methods. I think it is enough to say that it forces to use manually specified UIDs.
@flinkbot approve architecture |
@dawidwys That was a very good point about |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it looks good now. Thank you @sjwiesman for your contibution.
Merging.
@flinkbot approve all
…n datastream
What is the purpose of the change
Current best practice when deploying Flink applications to production is to set a custom UID, using DataStream#uid, so jobs can resume from savepoints even if they job graph has been modified. Flink should contain a configuration that can allow users to fail submission if their program contains an operator without a custom UID; enforcing best practices similarly to #disableGenericTypes.
Brief change log
Adds a new configuration #disableAutoGeneratedUIDs which will fail job submission if there are any operators without a custom uid.
Verifying this change
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation