Skip to content

Conversation

@fapaul
Copy link
Contributor

@fapaul fapaul commented Dec 7, 2021

What is the purpose of the change

This PR adds an option to disable programmatic configurations in a user program when running with the Application mode. In case, the option is enabled program changing the configuration will result in a failed job.
By default, this configuration is turned off to not break existing setups.

This subsumes #17995

Brief change log

  • Introduce a new exception for failures during the execution of the user jar
  • Allow the storing of failed jobs without a submission

Verifying this change

Added tests to verify the exception is thrown in the correct scenarios and the job result is retrievable after the exception was thrown.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs (ConfigOption) / JavaDocs / not documented)

@fapaul fapaul changed the title Flink 25206 [FLINK-25206] Add configuration option to disable configuration in user programs Dec 7, 2021
@flinkbot
Copy link
Collaborator

flinkbot commented Dec 7, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit ba15885 (Tue Dec 07 13:31:33 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 7, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k first round of review. Please separate refactors into a seperate commit. It makes review unnecessarily hard.

}

@Override
public void submitFailedJob(JobStartupFailedException exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not submit but rather register?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep the name to be consistent with submitJob because the lifecycle is pretty similar to submitting a job that immediately fails.

})
.exceptionally(
t -> {
final Optional<JobStartupFailedException> jobStartupFailedOpt =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right commit for the change? Maybe it should be 3 commits. I more thought that the scope of this commit is ending with PackageProgram...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the change is not that big anymore. Do you see it as a blocker?


private int jobCounter;

private final Collection<JobValidationError> errors;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik this field is unneeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is set in the ctor with the forbidden configuration coming from the instantiation of the StreamEnvironment i.e. StreamExecutionEnvironment.getExecutionEnvironment(Configuration config)

if (allowConfigurations) {
return errors;
}
final MapDifference<String, String> diff =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is originalConfiguration not empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the call of the parent ctor the StreamExecutionEnvironment translates the configuration into the equivalent sub configurations. [1]

[1]

@fapaul fapaul force-pushed the FLINK-25206 branch 3 times, most recently from 2cf1a66 to 96f31d5 Compare December 9, 2021 13:34
Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @fapaul, I think this is headed in the right direction 👍 I've added few comments, PTAL

resetContextEnvironment();
}

private List<String> collectNotAllowedConfigurations() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the approach with the configuration diffing here 🤔

Would it make sense to simply make the configurations objects exposed to the user immutable? (something along the lines of java.util.Collections#unmodifiableCollection)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we can do this because it would break the behavior of users modifying the configurations. In general, I agree 100% the current situation is definitely not good that users can mutate these objects and we have to handle it now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it actually make more sense? We simply don't want user to be able to mutate any configuration.

The only difference would be failing earlier and providing user with a full stack trace of the problematic call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be also a half-way option of letting user "mutate" the configuration as long as it doesn't change the default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea but I think the effort will be significant because we need to monkey patch all methods in the CheckpointConfig[1] and ExecutionConfig[2].
We deemed it as less maintainable.

[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
[2] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to rework the configuration management a bit, unfortunately, did not succeed. The problem is that some configurations you can set at the ExecutionConfig or CheckpointConfig do not have direct ConfigOption. All the configurations basically load a user class like some custom serializer.

We would need to consolidate the resolution of these classes first before unifying that all configurations are reflected through ConfigOption but I do not see this PR as a good point to rework the configuration management.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, until we get rid of Execution and Checkpoint config, this would be tricky to achieve.

}

@Override
public void submitFailedJob(FatalProgramInvocationException exception) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this method? We didn't really execute the job, so there is nothing to archive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that the user can receive the application result so we have to submit something containing the error message.

+ " (either successfully or as result of a failure). Has no effect for other deployment modes.");

public static final ConfigOption<Boolean> ALLOW_CLIENT_JOB_CONFIGURATIONS =
ConfigOptions.key("execution.allow-client-job-configurations")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execution.immutable-configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in Flink there is a general misunderstanding of where the configuration is coming from. I miss which configuration is immutable with immutable-configuration.

errorMessages.addAll(collectNotAllowedConfigurations());
if (!errorMessages.isEmpty()) {
// HACK: We shortcut the StreamGraph to jobgraph translation because we already
// know that the job needs to fail and can derive the jobId.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job fails before being submitted, there is no need to generate jobId here. Also wouldn't this break with multiple job submission (we support that in non-ha setups)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right in the case of multiple executes we will only capture the first one #17995 (comment) .

I would see exception handling for multiple jobs out of scope for this PR because therefore we probably have to rethink how the Application mode interacts with the cluster components.

return false;
}
CheckpointConfig that = (CheckpointConfig) o;
return checkpointInterval == that.checkpointInterval
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems fragile, when the new field will be added, this could not get updated. Could we do the comparison of the serialized form (original vs user) instead? I'd expect the serialized form to be stable within the same JVM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to using serialization to check whether something has changed.

* <p>The job will transition to FAILED state, and it will not be recovered.
*/
@Internal
public class FatalProgramInvocationException extends ProgramInvocationException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new exception type? As far as I can tell this does the same thing as ProgramInvocationException (we don't recover from that one either -> so it's also "fatal").

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also PTAL at the UnsuccessfulExecutionException, that seems to be related.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal might not be the right word. WDYT about HandledProgramInvocationException? I do not really want to make all ProgramInvocationException causing the program to go to a failed state.

I would see the normal ProgramInvocationException as a reason the JM shuts down and HA might recover and I want to introduce a new exception allowing a transition to a failed state.

@fapaul
Copy link
Contributor Author

fapaul commented Dec 15, 2021

@dmvk @AHeise thanks for your fruitful feedback. I tried addressing most of your comments and left responses for the rest. PTAL

@fapaul
Copy link
Contributor Author

fapaul commented Jan 25, 2022

@dmvk I have based this feature now on FLINK-25715. Please have another look.

Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @fapaul, this seems to be headed in a right direction 👍. My biggest concern is about diffing of the configuration, which doesn't seem to behave correctly.

Comment on lines +221 to +225
conf.toMap()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage
.ofConfigurationKeyAndValue(k, v)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would throwing an exception here right away make more sense? It could point user to the exact location where the problem is.

Also it would avoid passing the errors into the stream environment, which could simplify the change-set a bit.

eg.

Suggested change
conf.toMap()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage
.ofConfigurationKeyAndValue(k, v)));
throw new MutatedConfigurationException("Supplying a custom configuration for the stream environment is not allowed, because the client-side configuration is disabled.");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I like that all errors are collected and users immediately see what they are not allowed to set. Otherwise, it might take multiple submissions until they have seen all errors. Regarding the code complexity, it seems okay to me since we are only adding to the StreamContextEnvironment. In fact, I am a bit surprised that the StreamContextEnvironment is marked as @PublicEvolving not sure when users will ever interact with it.

resetContextEnvironment();
}

private List<String> collectNotAllowedConfigurations() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, until we get rid of Execution and Checkpoint config, this would be tricky to achieve.

}
final MapDifference<String, String> diff =
Maps.difference(originalConfiguration.toMap(), configuration.toMap());
diff.entriesOnlyOnRight()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct, it only covers cases where we add new options to the config. I think we need to cover all three cases:

  1. Config option has been removed (left side only)
  2. Config option has changed (diff)
  3. Config option has been added (right side)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I also update the other cases.

@fapaul
Copy link
Contributor Author

fapaul commented Feb 1, 2022

@dmvk I updated the diffing and addressed your comments.

Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for update @fapaul, LGTM overall 👍 Could you please add a simple unit test for StreamContextEnviroment that ensures we've properly covered different violation scenarios (CheckpointConfig, ExecutionConfig, add / remove / edit config options)?

… configurable

Add configuration option to disable configuration in user jars. The
submission will fail instantly before the job creation.
@fapaul
Copy link
Contributor Author

fapaul commented Feb 2, 2022

@dmvk I have added a unit test to verify the different violation scenarios.

Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍 Thanks for the PR, good job ;)

@fapaul fapaul merged commit 4953732 into apache:master Feb 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants