Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11879] Add validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput #8742

Merged
merged 3 commits into from
Aug 19, 2019

Conversation

sunhaibotb
Copy link
Contributor

What is the purpose of the change

This pull request add the following validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput.

  • Rejects the jobs containing operators which implement InputSelectable in case that checkpointing is enabled.
  • Rejects the jobs containing operators which implement BoundedInput or BoundedMultiInput in case that checkpointing is enabled.
  • Rejects the jobs containing operators which implement InputSelectable in case that credit-based flow control is disabled.

Brief change log

  • In StreamingJobGraphGenerator, add a validator that does not support the operators which implement InputSelectable or BoundedOneInput or BoundedMultiInput when checkpointing is enabled.
  • In StreamTask, add a validator that does not support the operators which implement InputSelectable when credit-based mode is disabled.

Verifying this change

This change added tests and can be verified as follows:

  • Added test that validates the correctness of the new validator in StreamingJobGraphGenerator.
  • Added test that validates the correctness of the new validator in StreamTask.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 14, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 77727c7 (Fri Aug 23 10:21:13 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@1u0 1u0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sunhaibotb, thank you for your PR.
I have left some comments, after a quick read.

@@ -337,6 +338,9 @@ public final void invoke() throws Exception {
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

// check environment for selective reading
checkSelectiveReadingEnv();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any reasons to have it here at job execution time? Is it possible to validate it in JM side before submitting tasks to TMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If validated on the JM side, one way to determine whether a job contains selective reading operator is to mark it by adding a boolean field in JobGraph when generating JobGraph, and another way is to deserialize StreamOperatorFactory from StreamConfig. I don't think the second way is good, because JM currently does not need to deserialize StreamConfig, which will increase the time cost. Considering that the default value of taskmanager.network.credit-model is true, this check is a protective code and it does not normally trigger a check exception. In addition, taskmanager.network.credit-model is an option that is deprecated, and the validation will not be needed after it is removed. So I think it's easier to add validation here and to remove it in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main suspect was that you may not have the same environment configuration at JM's side.

But regarding OperatorChain.hasSelectiveReadingOperator() implementation, I think delegating it to JM should not add more overhead. You already have

Class<?> operatorClass = operatorFactory.getStreamOperatorClass();
InputSelectable.class.isAssignableFrom(operatorClass);

check in StreamingJobGraphGenerator.

Copy link
Contributor Author

@sunhaibotb sunhaibotb Jul 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if what you're talking about is the first way I said. If yes, I don't think it's necessary to add a boolean field to the job graph for the checking logic which will be removed in the future. If not, I understand you're talking about the second way as I said, that StreamOperatorFactory needs to be deserialized from StreamConfig on the JM side, which will increase the cost ( Please note that JM currently does not need to deserialize StreamConfig). If neither method is not, I don't know what your proposal is like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that StreamingJobGraphGenerator.createJobGraph() happens in JM side only. But it looks like it is also called at sql-client, where you may not have the same environment configuration.

taskmanager.network.credit-model is an option that is deprecated, and the validation will not be needed after it is removed. So I think it's easier to add validation here and to remove it in the future.

I'm fine with this.


if (f instanceof WithMasterCheckpointHook) {
hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f));
}
}

if (cfg.isCheckpointingEnabled() && operatorFactory != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this check (for all nodes) into a separate function and invoke the new function earlier (in StreamingJobGraphGenerator.createJobGraph() call or even before StreamingJobGraphGenerator is used).

StreamGraph.getJobGraph() already has a validation check before creating a JobGraph. Imo, it's a better place to carry such validation for now (excluding the case for InputSelectable and network credit-based flow control).

Copy link
Contributor Author

@sunhaibotb sunhaibotb Jul 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following validation that you are talking about allows to enable the forced checkpointing for the iterative streaming, which means that no exception is thrown in that case. In this PR, we need to add the check to cover this case, otherwise IterateITCase#testWithCheckPointing will fail.

if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean to change this logic.
Currently you are overloading configureCheckpointing() with logic that can be done independently (validation) here.
This code would be better if it's extracted into a separate method and better if called before all other things in StreamingJobGraphGenerator.createJobGraph() are run ("fail early").

As mentioned before, StreamGraph.getJobGraph() already has some validation check - you can unify them together with the new check.

The only missing validation case is InputSelectable and network credit-based flow control. But it seems fine to keep it as it's now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @1u0. This method is definitely already too big if we want to add some new functionality here, it must be decomposed into smaller pieces. But I also agree that this check would be better to perform earlier. Maybe in some private void StreamingJobGraphGenerator#validateJobGraph() method (together with the present validation that happens in StreamGraph#getJobGraph).

With this validateJobGraph() either we could call it pre-emptively as a first thing in createJobGraph() or expect a user to call it manually before createJobGraph().

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 8, 2019

CI report for commit 9ab0b7d: CANCELED Build

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 9, 2019

CI report for commit 3f57c8f: FAILURE Build


if (f instanceof WithMasterCheckpointHook) {
hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f));
}
}

if (cfg.isCheckpointingEnabled() && operatorFactory != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @1u0. This method is definitely already too big if we want to add some new functionality here, it must be decomposed into smaller pieces. But I also agree that this check would be better to perform earlier. Maybe in some private void StreamingJobGraphGenerator#validateJobGraph() method (together with the present validation that happens in StreamGraph#getJobGraph).

With this validateJobGraph() either we could call it pre-emptively as a first thing in createJobGraph() or expect a user to call it manually before createJobGraph().

@sunhaibotb
Copy link
Contributor Author

Thanks for reviewing @pnowojski @1u0 .
The code has been updated. Because of the conflict with the latest master branch, I rebased on the latest master and forcibly pushed. You can just look at StreamingJobGraphGenerator and StreamGraph.

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 12, 2019

CI report:

@@ -178,6 +182,36 @@ private JobGraph createJobGraph() {
return jobGraph;
}

@SuppressWarnings("deprecation")
private void prevalidate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: preValidate() or just validate().

|| BoundedMultiInput.class.isAssignableFrom(operatorClass)) {

throw new UnsupportedOperationException(
"Checkpointing currently does not supported the operator that implements" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to change it to "Checkpointing is currently not supported for operators that implement ".

new TestAnyModeReadingStreamOperator("test operator"))
.print();

StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about refactoring this test, by extracting building a sample job graph (with operator under the test as a parameter) via a helper method and also, splitting the test into three separate tests?

If they run separately, you can also use simpler structure

@Test(expected = UnsupportedOperationException.class)
public void testValidationForOperator...() throws Exception {
    // setup graph:
    ...
    StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
    fail("unreachable");
}


@Override
public Class<? extends StreamOperator> getStreamOperatorClass() {
return generatedClass.getClass(Thread.currentThread().getContextClassLoader());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this code can be triggered in a settings where the context class loader is not a user code class loader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a Classloader type parameter to this method.

@sunhaibotb
Copy link
Contributor Author

The code has been updated, and please review it again. Thanks @1u0

@1u0
Copy link
Contributor

1u0 commented Jul 12, 2019

LGTM, % with still open question:

generatedClass.getClass(Thread.currentThread().getContextClassLoader());

Is it possible that this code can be triggered in a settings where the context class loader is not a user code class loader?

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing @1u0 :)

I was thinking about this getContextClassLoader @1u0 before and as far as I understand this should be fine. Operators, including all of the user's code, should be loaded and used only in the scope of one class loader - user's class loader. Please correct me if I'm wrong (@sunhaibotb this might be worth double checking) I think we are setting getContextClassLoader correctly to the user's class loader.

I've left one smaller comment/issue, besides that also LGTM and I can merge it once it is solved.

}

@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is inconsistent with isOperatorSelectiveReading() api that just uses getContextClassLoader(). Please make them consistent and use the same mechanism: either passing classLoader in both places or using getContextClassLoader in both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because a general method getStreamOperatorClass() is added, isOperatorSelectiveReading() is not needed and can be dropped. It is planned to drop isOperatorSelectiveReading() from the PR of the JIRA (https://issues.apache.org/jira/browse/FLINK-13051), so no changes had been made to make them consistent. But the best thing to do is to keep them consistency first whether isOperatorSelectiveReading() is dropped or not. I will solve it.

@sunhaibotb
Copy link
Contributor Author

The code has been updated. Because of the conflict with the latest master branch, I rebased on the latest master and forcibly pushed.

A new problem is that batch and streaming will share some operators, some of which may need to implement Bounded[Multi|One]Input in the batch mode to do something in the endInput() method, such as ContinuousFileReaderOperator (please look at issue#13376). But if these operators implement Bounded[Multi|One]Input, streaming will not be able to use them because of the validation in this PR.

After I re-checked the code related to the checkpoint handler, I think that there was no need to add the validator for the use of Bounded[Multi|One]Input. Because as long as an EndOfPartition event arrives, the current checkpointing mechanism will not generate a successful checkpoint after that. That is, failure to handle the endInput state for checkpointing does not cause correctness problems. What do you think? @pnowojski

(In the latest code of this PR, I've removed the validation on Bounded[Multi|One]Input. If I'm wrong, I'll add it back.)

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change LGTM and sorry for the delay @sunhaibotb.

I can merge it once conflict is resolved and tests are passing. Could you @sunhaibotb ping me once this is done?

@sunhaibotb
Copy link
Contributor Author

sunhaibotb commented Aug 19, 2019

The conflict has resolved and test passed. Thank you for reviewing @pnowojski .

@pnowojski
Copy link
Contributor

Thanks @sunhaibotb . Merging.

@pnowojski pnowojski merged commit 2f324f8 into apache:master Aug 19, 2019
@sunhaibotb sunhaibotb deleted the FLINK-11879 branch August 20, 2019 02:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants