Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan #22593

Merged
merged 4 commits into from
Jun 5, 2023

Conversation

LadyForest
Copy link
Contributor

What is the purpose of the change

This PR is a subtask of FLIP-292 to enable operator-level state TTL configuration via CompiledPlan.

Brief change log

Introduce StateMetadata to all ExecNodes that translate to stateful operators, and changes the way how #translateToPlanInternal get the state retention time.

The affected ExecNode list

    StreamExecChangelogNormalize
    StreamExecDeduplicate
    StreamExecGlobalGroupAggregate
    StreamExecGroupAggregate
    StreamExecIncrementalGroupAggregate
    StreamExecJoin
    StreamExecLimit
    StreamExecLookupJoin
    StreamExecRank
    StreamExecSink
    StreamExecSortLimit

Verifying this change

Since we have upgraded some ExecNodes to version 2, we have to test the following 3 parts:

  1. The plan serialized using version 1 can be deserialized using the current version. This can be verified by TransformationsTest#testUidFlink1_15
  2. The plan with the current version SerDe work as expected. This can be verified by all tests under package org.apache.flink.table.planner.plan.nodes.exec.stream
  3. The way by modifying the JSON content to change state TTL works as expected. This can be verified by ITCase ConfigureOperatorLevelStateTtlJsonITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduces a new feature? yes
  • If yes, how is the feature documented? FLINK-31957

@flinkbot
Copy link
Collaborator

flinkbot commented May 16, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -71,20 +75,36 @@
producedTransformations = StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
@ExecNodeMetadata(
name = "stream-exec-changelog-normalize",
version = 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to introduce a new version ? Because the topology and state layout do not change,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to introduce a new version ? Because the topology and state layout do not change,

I also hesitated for a long time on this point. Introducing StateMetadata did not actually change the state layout, but the FLIP-190 design document provides an example of upgrading the exec node version after adding a member variable.

The JSON node of ExecNode A gets an additional property in Flink 1.16:

Before:

{
some-prop: 42
}
After:

{
some-prop: 42,
some-flag: false
}

Every plan change will increase the ExecNode version. Since the ExecNode supports both plan layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}
New plans will use the new ExecNode version 2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the doc of the method minPlanVersion

Needs to be updated when the JSON for the {@link ExecNode} changes: e.g. after adding an
attribute to the JSON spec of the ExecNode.

I think we need introduce a new version since the json for the node changes.

Copy link
Contributor

@twalthr twalthr Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@godfreyhe @luoyuxia @LadyForest If the testing infrastructure would be already in place, I would agree with increasing the version. But we are not testing different versions of ExecNode yet. So increasing the annotation has no effect.

I just noticed that you bumped the ExecNode version of a couple of ExecNodes for the new state metadata properties in CompiledPlan. Your change actually allows having null for the stateMetadataList , I'm wondering if we should not increase the version for 1.18. We can stay at the old ExecNode version as 1.18 is able to consume 1.17 plans and state.
The reason why noticed that is due to FLINK-32613 where I added an option to the wrong annotation during rebase and no test failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @twalthr, during the implementation of FLIP-292, I did notice that the current testing for compiled plans is not comprehensive enough, such as FLINK-31884, FLINK-31917, and FLINK-32219. FLIP-292 added ExecNodeVersionUpgradeSerdeTest and TransformationsTest#testUidFlink1_15 to test the deserialization of old plans using the new version, but it may not be sufficient. After reconsideration, I think we can keep the original exec node version until we have a more comprehensive testing framework. I will open a PR and hope you can help review the code. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @LadyForest, sounds good to me. Thanks for reverting the version bump. Btw FLINK-31917 seems pretty serious. Are we still able to deserialize 1.15 literals with this change? Also the SARG values cannot be null anymore? I guess that should be fine. Ping me in Slack, I can help with a review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still able to deserialize 1.15 literals with this change?

I suppose so. I adapted the test TransformationsTest#testUidFlink1_15 to verify the change. The test deserialized the serialized plan using Flink v1.15, where numbers are serialized as string literals.

Also the SARG values cannot be null anymore?

Sorry, I'm a bit confused on this point. Did you find any incompatibilities? AFAIK, the changes made on RexNodeJsonSerializer does not aim to change the processing logic for SARG. You can check RexNodeJsonSerdeTest and SargJsonPlanITCase for more details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @twalthr, it appears that modifying an already approved FLIP requires a new vote. Considering that feature cut for 1.18 is only a few days away, do you have any suggestions for a quick fix?

@LadyForest LadyForest requested a review from godfreyhe May 23, 2023 06:32
Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LadyForest Thanks for contribution. I left some comments. PTAL.

@@ -71,20 +75,36 @@
producedTransformations = StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
@ExecNodeMetadata(
name = "stream-exec-changelog-normalize",
version = 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the doc of the method minPlanVersion

Needs to be updated when the JSON for the {@link ExecNode} changes: e.g. after adding an
attribute to the JSON spec of the ExecNode.

I think we need introduce a new version since the json for the node changes.

JsonNode target = JsonTestUtils.readFromString(json);
JsonTestUtils.setExecNodeStateMetadata(
target, "stream-exec-deduplicate", 0, 6000L);
JsonTestUtils.setExecNodeStateMetadata(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether it'll be better to change it 8000L.
The reason is that if we remove this code line, the test can still passs so that we can't make sure set state for stream-exec-group-aggregate also make difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting out this line means that the TTL of the deduplication node will take the default value of 0, which means that the status will never expire. However, the actual effect is the same as setting a TTL of 6 or 8 seconds.

I think this case demonstrates the significance of supporting operator-level TTL configuration. That is to say, as long as we ensure that the data calculation results remain unchanged, we do not need to unnecessarily extend the state TTL for the sake of a long-period state of a certain operator.

Let me add more description to the case.

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants