-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37901][table] Support to serde for StreamExecMLPredictTableFunction #26641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to fix doc: #26630 (comment)
Thanks for the PR!
...g/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedModelJsonDeserializer.java
Outdated
Show resolved
Hide resolved
...g/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedModelJsonDeserializer.java
Outdated
Show resolved
Hide resolved
return true; | ||
} | ||
|
||
static TableException schemaNotMatching( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also almost similar to ContextResolvedTableJsonDeserializer
. Move to util if you want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception messages are different. One is for table, the other is for model.
...g/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedModelJsonDeserializer.java
Show resolved
Hide resolved
...g/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedModelJsonDeserializer.java
Show resolved
Hide resolved
...g/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogModelJsonDeserializer.java
Outdated
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogModelJsonSerializer.java
Outdated
Show resolved
Hide resolved
serializerProvider.defaultSerializeField( | ||
OPTIONS, resolvedCatalogModel.getOptions(), jsonGenerator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For table, there's a try/catch, curious why no catch here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actaully, I think the catalog table serde is a little weird here. The try/catch here is used to notify users Flink doesn't support to generate execplan for query. You can take a look at ExternalCatalogTable#getOptions
. I think model is much simpler here, we don't need to complicate the case.
asyncLookupOptions.asyncTimeout, | ||
asyncLookupOptions.asyncBufferCapacity, | ||
asyncLookupOptions.asyncOutputMode), | ||
asyncOptions.asyncTimeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need these data shuffling logic in createModelPredict
? Looks in lookup exec node, it's in sync transformation creation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add this later if users require this. UpsertMaterialize is a complicated optimization and works if the pk of the source are different from the sink pk. What's more, it requires users use the output of the model as the part of pk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless of upsertMaterialize, looks we have upsert key shuffling in async ordered mode, should it also be in sync mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope.
- If upsert keys are subset of the the lookup keys, we don't need to introduce shuffle here. Because planner promises the data with same upsert keys is located at the same subtask.
- If the downstream operator doesn't use output of the predict function as upsert keys, we don't need deterministic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion with @lihaosky offline, I think it's a mistake to introduce shuffle for async mode. We should only introduce shuffle if we use upsert materalize.
InternalTypeInfo.of(getOutputType()), | ||
inputTransformation.getParallelism(), | ||
false); | ||
} else if (asyncLookupOptions.asyncOutputMode == AsyncDataStream.OutputMode.ORDERED) { | ||
} else if (asyncOptions.asyncOutputMode == AsyncDataStream.OutputMode.ORDERED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about the shuffle logic. Looks shuffle is only done when upsertMaterialize
is true in lookup join exec node. And upsertMaterialize
seems to be always false for lookup join. Why doesn't lookup need to do it for cdc? Also async is disabled when upsertMaterialize
for lookup join
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This a good question! Let me share some points here.
When using materialize, it means lookup operator will store the lookup results in its state. So when a update-before or delete message arrives, the lookup operator tries to search the results in its state. If state contains the results, it emits the result with the content in the state to keep the output of the lookup join op is deterministic.
Why shuffle is required for cdc mode
Planner requires the lookup join operator uses keyed state to make sure all messages with the same lookup keys should be located at the same subtask. Currently, Flink requires there is a shuffle before a keyed stream.
And upsertMaterialize seems to be always false for lookup join.
In some cases, planner will omit upsertMaterialize. You can take a look at StreamNonDeterministicUpdatePlanVisitor#visitLookupJoin.
First of all, users should set 'table.optimizer.non-deterministic-update.strategy' = 'TRY_RESOLVE';
Then user's query should not use pk of the upstream operator as lookup keys or ...
async is disabled when upsertMaterialize for lookup join
Because current AsyncWaitOperator is not friendly to cdc stream. For example , +I message and -D message are almost at the same time arriving at the operator and then both enter the input queue. It means the async lookup join function should process these messages at the same time. It's possible that the output of +I message and -D message are different if lookup source is changed frequently.
Hope my explanation can solve your questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should only support insert mode for ml_predict then (StreamNonDeterministicUpdatePlanVisitor should reject upsert mode for mlpredict plan) since ml_predict function itself isn't deterministic. Non-deterministic function can result in error according to https://docs.confluent.io/cloud/current/flink/concepts/determinism.html. We can support cdc mode later by introducing configs user can use to tell us their model is deterministic. Created https://issues.apache.org/jira/browse/FLINK-37928 and https://issues.apache.org/jira/browse/FLINK-37929
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @lihaosky , I think we can improve this feature in the next version. After all, correctness is the top priority.
Before merging #26630 (comment), I have fixed the problem in my local env, then push the codes to the master branch. |
.../src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
Outdated
Show resolved
Hide resolved
All tests pass in my private CI pipeline: https://dev.azure.com/1059623455/Flink/_build/results?buildId=694&view=logs&j=0e31ee24-31a6-528c-a4bf-45cde9b2a14e Merging... |
What is the purpose of the change
Support to serde for StreamExecMLPredictTableFunction
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation