Skip to content

Conversation

matriv
Copy link
Contributor

@matriv matriv commented Jan 24, 2022

What is the purpose of the change

Add versioning to the ExecNodes to enable upgrading a plan to a next Flink version.

Brief change log

  • Introduce a new annotation ExecNodeMetadata on ExecNodes which is used to
    improve the serialiasation/de-serialiasation to/from JSON plan of ExecNodes and
    facilitate the upgrade of the pipeline, since every ExecNode has now also a version
    attached.

  • List all the JSON plan eligible ExecNodes in ExecNodeMetadataUtil and use a
    static list to register them in Jackson.

  • Annotate all those eligible ExecNodes with the new annotation and provide a
    name constructed by the class name using - separators. All versions are set now
    to 1.

  • Use an ExecNodeContext POJO which uses the uniqueId, name and version to
    serialize/de-serialise them in a JSON plan in the form of <id>_<exec-node-name>_<version>.

  • Fix issues with @JsonIgnoreProperties and @JsonIgnore, and opt for the class
    based annotation instead of the per field annotation of @JsonIgnore.

  • Update the test plans with the new JSON scheme derived by the changes.

Verifying this change

Adapted the test JSON plans with the new scheme.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no)
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit d2b8689 (Mon Jan 24 14:37:40 UTC 2022)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 24, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@matriv matriv force-pushed the FLINK-25387 branch 2 times, most recently from 90cd6d8 to c52f5b9 Compare January 25, 2022 15:17
@matriv matriv marked this pull request as ready for review January 25, 2022 15:21
@matriv
Copy link
Contributor Author

matriv commented Jan 25, 2022

FYI, the consumedOptions in the annotation is not populated yet, will be done with a followup commit, or followup PR.

Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a first scan of the PR. I think overall it looks good.

I have one specific comment about the context field. Why have you opted for mixing together in the serialization the instance identifier and the type identifier (composed by name and version)? For me "type" identity and "instance" identity are very different concepts, and they definitely deserve different fields.

@matriv
Copy link
Contributor Author

matriv commented Jan 25, 2022

I have one specific comment about the context field. Why have you opted for mixing together in the serialization the instance identifier and the type identifier (composed by name and version)? For me "type" identity and "instance" identity are very different concepts, and they definitely deserve different fields.

Because, with the upgrade story, we can have an @ExecNodeMetadata annotation with the same name, on a subclass of a current ExecNode class, which does something new/different and defines a newer version. So we need the combination of name + version to uniquely identify the class when we lookup and rebuild the Java object graph from the JSON plan.

@ExecNodeMetadata(name="myNode", version=1)
public class MyNode {}

@ExecNodeMetadata(name="myNode", version=2)
public class MyNodeWithNewFunctionality extends MyNode {}

@slinkydeveloper
Copy link
Contributor

Because, with the upgrade story, we can have an @ExecNodeMetadata annotation with the same name, on a subclass of a current ExecNode class, which does something new/different and defines a newer version. So we need the combination of name + version to uniquely identify the class when we lookup and rebuild the Java object graph from the JSON plan.

I may have badly explained myself, but i'm not questioning the name + version tuple. That's ok and I get why we need it.

What I'm questioning is that in the same field you add id + name + version, and this seems wrong to me, because id is the "instance identifier", like a pointer to a specific node of the graph, while name + version is the "type identifier", which tells you which ExecNode class and version the node is. I don't think these two concepts (instance id and type id) should be in the same field, as they are logically very different things, and also because future tooling will have hard time parsing this JSON.

@matriv
Copy link
Contributor Author

matriv commented Jan 26, 2022

Because, with the upgrade story, we can have an @ExecNodeMetadata annotation with the same name, on a subclass of a current ExecNode class, which does something new/different and defines a newer version. So we need the combination of name + version to uniquely identify the class when we lookup and rebuild the Java object graph from the JSON plan.

I may have badly explained myself, but i'm not questioning the name + version tuple. That's ok and I get why we need it.

What I'm questioning is that in the same field you add id + name + version, and this seems wrong to me, because id is the "instance identifier", like a pointer to a specific node of the graph, while name + version is the "type identifier", which tells you which ExecNode class and version the node is. I don't think these two concepts (instance id and type id) should be in the same field, as they are logically very different things, and also because future tooling will have hard time parsing this JSON.

Thx for explaining! I see your point. The decision was more towards JSON and code simplification, so that this int id is part of the one liner context field, so that everything is handled in one place regarding the complete identification of a node. So we read the context as one POJO from JSON, and we construct a new context from the annotation plus a freshly incremented id for new nodes. Imho, I don't see a big issue in this approach, since we have javadocs describing the usage of those sub-fields of the context, and I don't see any problem for tooling, since it's a well defined string with 3 components, and a simple split("_") gives those independently.

@twalthr WDYT?

@matriv matriv force-pushed the FLINK-25387 branch 3 times, most recently from 59073c3 to 2f4e2b8 Compare January 27, 2022 09:18
@twalthr
Copy link
Contributor

twalthr commented Jan 27, 2022

I kind of agree with @slinkydeveloper. The FLIP declares the full string for naming the operators 13_stream-exec-sink-1_upsert-materializer and there you also see that I choose an underscore to separate instance id and type id and dash for the type id. It is fine to have all of those components in the Context, the context needs this to compute the uid for operators. However, one could argue that the instance id must not be part of the type id in the JSON plan. I could also imagine downstream tools would rather like to access a field then performing the splitting manually.

However, I don't have a strong opinion here. If it is too complicated implementation wise to make id a separate JSON field as it was before, I'm also fine with the current solution.

@matriv
Copy link
Contributor Author

matriv commented Jan 27, 2022

Having the id as before, doesn't add "complication", just adds another argument to all constructors, so I can still revert the and remove it from ExecNodeContext, just needs some more work, to fix all the nodes in the hierarchy.

On one hand, I think that accessing the id through the Context is nice, and we have everything in one place + the operator uid in the future. On the other hand, I agree that this way, we are mixing the id together with the mandatory info for the type.

I also don't really have a strong opinion, just a slight preference to have everything in the POJO, but please let me know.
If you both prefer a separate id, I'm happy to do it.

@twalthr @slinkydeveloper

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @matriv. I added some feedback. My biggest question is why StreamExecMultipleInput has no annotation?

metadata.add(annotation);
}

ExecNodeMetadatas annotations =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we skip the check above? isn't the multi annotation including the single one or vice versa?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the "wrapper" MultipleExecNodeMetadata we can allow multiple individual ExecNodeMetadata annotations or a single MultipleExecNodeMetadata, where the value is an array of ExecNodeMetadata, but apparently java allows to have 2 annotations as well, one MultipleExecNodeMetadata and one ExecNodeMetadata, so we're checking this as not allowed.

@twalthr
Copy link
Contributor

twalthr commented Jan 27, 2022

As I said, having the id in Context is fine and also no additional constructor argument is good. I'm just wondering if we can somehow have the id in the JSON plan directly. Can't we add it as a field in the base but fill the field with information from the Context in the base constructor?

@slinkydeveloper
Copy link
Contributor

I personally think we should have instance and type id splitted, even if this requires a bit more code on the POJO side.

@matriv matriv force-pushed the FLINK-25387 branch 3 times, most recently from e0137cf to 1c866a4 Compare January 28, 2022 16:35
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Repeatable(value = MultipleExecNodeMetadata.class)
@PublicEvolving
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes, thx, @twalthr right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather say @Internal. It should only be used by us.

@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@PublicEvolving
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental?

@matriv matriv force-pushed the FLINK-25387 branch 2 times, most recently from 158a4df to 037e444 Compare January 31, 2022 11:14
@matriv matriv requested a review from twalthr January 31, 2022 11:14
@matriv
Copy link
Contributor Author

matriv commented Jan 31, 2022

@twalthr @slinkydeveloper I think the PR is in a state for a final review.

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @matriv. I left my last set of comments. Should be good in the next iteration if @slinkydeveloper agrees?

* into the JSON plan.
*/
@JsonProperty(value = FIELD_NAME_TYPE, access = JsonProperty.Access.READ_ONLY, index = 1)
public ExecNodeContext getContextFromAnnotation() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be final and protected

public final class ExecNodeContext {

/** This is used to assign a unique ID to every ExecNode. */
private static Integer idCounter = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AtomicInteger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought of it, but I didn't change it and left it as a plain int, do you think it can be incremented in a concurrent context?

idCounter = 0;
}

private Integer id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark @Nullable and final, quickly explain why nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not marking here as @Nullable but int the relevant ctor and added the javadoc there.


@JsonCreator
public ExecNodeContext(String value) {
String[] split = value.split("_");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain the format in the class JavaDoc.


/**
* Annotation to be used for {@link ExecNode}s to keep necessary metadata when
* serialising/deserializing them in a plan. It's used for internal bookkeeping across Flink
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serializing

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Repeatable(value = MultipleExecNodeMetadata.class)
@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Internal only used for our internal bookkeeping. we don't need to provide API stability here.

if (LOOKUP_MAP.containsKey(key)) {
throw new IllegalStateException(
String.format(
"Found duplicate ExecNode: %s. This is a bug, please contact developers.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug. Please file an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to remove this, I think it's ok to completely remove the phrase This is a bug....

* Returns the {@link ExecNodeMetadata} annotation of the class with the highest (most recent)
* {@link ExecNodeMetadata#version()}.
*/
@SuppressWarnings("rawtypes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are quite a few suppressions. As mentioned in a comment before, I think we can get rid of a couple of them by correct declaration.

@matriv
Copy link
Contributor Author

matriv commented Feb 2, 2022

@twalthr addressed your comments, Thank you!

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @matriv. LGTM, please ping me when it is ready to be merged. Currently, it seems to be failing (might need another rebase?).

…NodeMetadata

- Introduce a new annotation `ExecNodeMetadata` on `ExecNode`s which is used to
improve the serialiasation/de-serialiasation to/from JSON plan of `ExecNode`s and
facilitate the upgrade of the pipeline, since every ExecNode has now also a version
attached.

- List all the JSON plan eligible `ExecNode`s in `ExecNodeMetadataUtil` and use a
 static list to register them in Jackson.

- Annotate all those eligible `ExecNode`s with the new annotation and provide a
name constructed by the class name using `-` separators. All versions are set now
to 1.

- Use an `ExecNodeContext` POJO which uses the uniqueId, name and version to
serialize/de-serialise them in a JSON plan in the form of `<id>_<exec-node-name>_<version>`.

- Fix issues with `@JsonIgnoreProperties` and `@JsonIgnore`, and opt for the class
based annotation instead of the per field annotation of `@JsonIgnore`.

- Update the test plans with the new JSON scheme derived by the changes.
@twalthr twalthr closed this in 05eecb0 Feb 2, 2022
snuyanzin pushed a commit to snuyanzin/flink that referenced this pull request Feb 2, 2022
…NodeMetadata

- Introduce a new annotation `ExecNodeMetadata` on `ExecNode`s which is used to
improve the serialization/deserialization to/from JSON plan of `ExecNode`s and
facilitate the upgrade of the pipeline, since every ExecNode has now also a version
attached.

- List all the JSON plan eligible `ExecNode`s in `ExecNodeMetadataUtil` and use a
 static list to register them in Jackson.

- Annotate all those eligible `ExecNode`s with the new annotation and provide a
name constructed by the class name using `-` separators. All versions are set now
to 1.

- Use an `ExecNodeContext` POJO which uses the uniqueId, name and version to
serialize/deserialize them in a JSON plan in the form of `<id>_<exec-node-name>_<version>`.

- Fix issues with `@JsonIgnoreProperties` and `@JsonIgnore`, and opt for the class
based annotation instead of the per field annotation of `@JsonIgnore`.

- Update the test plans with the new JSON scheme derived by the changes.

This closes apache#18479.
snuyanzin pushed a commit to snuyanzin/flink that referenced this pull request Feb 2, 2022
…NodeMetadata

- Introduce a new annotation `ExecNodeMetadata` on `ExecNode`s which is used to
improve the serialization/deserialization to/from JSON plan of `ExecNode`s and
facilitate the upgrade of the pipeline, since every ExecNode has now also a version
attached.

- List all the JSON plan eligible `ExecNode`s in `ExecNodeMetadataUtil` and use a
 static list to register them in Jackson.

- Annotate all those eligible `ExecNode`s with the new annotation and provide a
name constructed by the class name using `-` separators. All versions are set now
to 1.

- Use an `ExecNodeContext` POJO which uses the uniqueId, name and version to
serialize/deserialize them in a JSON plan in the form of `<id>_<exec-node-name>_<version>`.

- Fix issues with `@JsonIgnoreProperties` and `@JsonIgnore`, and opt for the class
based annotation instead of the per field annotation of `@JsonIgnore`.

- Update the test plans with the new JSON scheme derived by the changes.

This closes apache#18479.
@matriv matriv deleted the FLINK-25387 branch February 3, 2022 08:52
matriv added a commit to matriv/flink that referenced this pull request Feb 3, 2022
Follows: #d9d72ef142a2343f37b8b10ca6e04dc7f6ca086e
of: apache#18479
matriv added a commit to matriv/flink that referenced this pull request Feb 3, 2022
Follows: #d9d72ef142a2343f37b8b10ca6e04dc7f6ca086e
of: apache#18479
zentol pushed a commit that referenced this pull request Feb 3, 2022
Follows: #d9d72ef142a2343f37b8b10ca6e04dc7f6ca086e
of: #18479
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants