Skip to content

[FLINK-37901][table] Support to serde for StreamExecMLPredictTableFunction #26641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 10, 2025

Conversation

fsk119
Copy link
Member

@fsk119 fsk119 commented Jun 5, 2025

What is the purpose of the change

Support to serde for StreamExecMLPredictTableFunction

Brief change log

  • Add serialization for all related classes.
  • Support to create ModelProvider from the ContextResolvedModelSpec

Verifying this change

  • Added ContextResolvedModel serde test
  • Added MLPredictRestoreTest to verify generate and restore from execplan.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 5, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to fix doc: #26630 (comment)

Thanks for the PR!

return true;
}

static TableException schemaNotMatching(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also almost similar to ContextResolvedTableJsonDeserializer. Move to util if you want

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception messages are different. One is for table, the other is for model.

serializerProvider.defaultSerializeField(
OPTIONS, resolvedCatalogModel.getOptions(), jsonGenerator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For table, there's a try/catch, curious why no catch here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actaully, I think the catalog table serde is a little weird here. The try/catch here is used to notify users Flink doesn't support to generate execplan for query. You can take a look at ExternalCatalogTable#getOptions. I think model is much simpler here, we don't need to complicate the case.

asyncLookupOptions.asyncTimeout,
asyncLookupOptions.asyncBufferCapacity,
asyncLookupOptions.asyncOutputMode),
asyncOptions.asyncTimeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need these data shuffling logic in createModelPredict? Looks in lookup exec node, it's in sync transformation creation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add this later if users require this. UpsertMaterialize is a complicated optimization and works if the pk of the source are different from the sink pk. What's more, it requires users use the output of the model as the part of pk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless of upsertMaterialize, looks we have upsert key shuffling in async ordered mode, should it also be in sync mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope.

  1. If upsert keys are subset of the the lookup keys, we don't need to introduce shuffle here. Because planner promises the data with same upsert keys is located at the same subtask.
  2. If the downstream operator doesn't use output of the predict function as upsert keys, we don't need deterministic here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion with @lihaosky offline, I think it's a mistake to introduce shuffle for async mode. We should only introduce shuffle if we use upsert materalize.

InternalTypeInfo.of(getOutputType()),
inputTransformation.getParallelism(),
false);
} else if (asyncLookupOptions.asyncOutputMode == AsyncDataStream.OutputMode.ORDERED) {
} else if (asyncOptions.asyncOutputMode == AsyncDataStream.OutputMode.ORDERED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about the shuffle logic. Looks shuffle is only done when upsertMaterialize is true in lookup join exec node. And upsertMaterialize seems to be always false for lookup join. Why doesn't lookup need to do it for cdc? Also async is disabled when upsertMaterialize for lookup join

Copy link
Member Author

@fsk119 fsk119 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This a good question! Let me share some points here.

When using materialize, it means lookup operator will store the lookup results in its state. So when a update-before or delete message arrives, the lookup operator tries to search the results in its state. If state contains the results, it emits the result with the content in the state to keep the output of the lookup join op is deterministic.

Why shuffle is required for cdc mode

Planner requires the lookup join operator uses keyed state to make sure all messages with the same lookup keys should be located at the same subtask. Currently, Flink requires there is a shuffle before a keyed stream.

And upsertMaterialize seems to be always false for lookup join.

In some cases, planner will omit upsertMaterialize. You can take a look at StreamNonDeterministicUpdatePlanVisitor#visitLookupJoin.

First of all, users should set 'table.optimizer.non-deterministic-update.strategy' = 'TRY_RESOLVE';
Then user's query should not use pk of the upstream operator as lookup keys or ...

async is disabled when upsertMaterialize for lookup join

Because current AsyncWaitOperator is not friendly to cdc stream. For example , +I message and -D message are almost at the same time arriving at the operator and then both enter the input queue. It means the async lookup join function should process these messages at the same time. It's possible that the output of +I message and -D message are different if lookup source is changed frequently.

Hope my explanation can solve your questions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only support insert mode for ml_predict then (StreamNonDeterministicUpdatePlanVisitor should reject upsert mode for mlpredict plan) since ml_predict function itself isn't deterministic. Non-deterministic function can result in error according to https://docs.confluent.io/cloud/current/flink/concepts/determinism.html. We can support cdc mode later by introducing configs user can use to tell us their model is deterministic. Created https://issues.apache.org/jira/browse/FLINK-37928 and https://issues.apache.org/jira/browse/FLINK-37929

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @lihaosky , I think we can improve this feature in the next version. After all, correctness is the top priority.

@fsk119
Copy link
Member Author

fsk119 commented Jun 9, 2025

Before merging #26630 (comment), I have fixed the problem in my local env, then push the codes to the master branch.

@fsk119
Copy link
Member Author

fsk119 commented Jun 10, 2025

@fsk119 fsk119 merged commit 449fef0 into apache:master Jun 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants