-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-38993][table] Move materialized table query change validation logic from converter to operation #27488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
liuyongvs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
| AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = | ||
| new AlterMaterializedTableChangeOperation( | ||
| tableIdentifier, | ||
| op.getTableChanges(), | ||
| op.getCatalogMaterializedTable()); | ||
| return operationExecutor.callExecutableOperation( | ||
| handle, alterMaterializedTableChangeOperation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyway here and in other places it is just a copy as AlterMaterializedTableChangeOperation since AlterMaterializedTableAsQueryOperation#execute fails with UnsupportedOperationException
not sure why it is done like that initially. Do not change the logic, just make code a bit more compact here
| + "Unsupported table change detected: %s. ", | ||
| op.getTableIdentifier(), tableChange)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since now AlterMaterializedTableChangeOperation contains old table and a list of changes, there is no need to generate new list of changes to calculate rollbacked table
| } | ||
|
|
||
| private void applyTableChanges(List<TableChange> tableChanges) { | ||
| isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof ModifyDefinitionQuery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depending on whether it is a query change or not the validation is slightly different
| */ | ||
| static ModifyDefinitionQuery modifyDefinitionQuery(String definitionQuery) { | ||
| return new ModifyDefinitionQuery(definitionQuery); | ||
| static ModifyDefinitionQuery modifyDefinitionQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it was missed when original query was added
…logic from converter to operation
|
|
||
| // Used to build changes introduced by changed query like | ||
| // ALTER MATERIALIZED TABLE ... AS ... | ||
| public static List<TableChange> buildSchemaTableChanges( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generation of table change based on query change
In case of table schema changes the generation is happening at SchemaConverter
The reason why it is not used here is that SchemaConverter works with AST objects
AHeise
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. Looks very good already. My main question is if we can restructure the change application to avoid the 100 if(change instanceof ...) checks.
| .comment(oldTable.getComment()) | ||
| .partitionKeys(oldTable.getPartitionKeys()) | ||
| .options(oldTable.getOptions()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that mean that we currently don't support changing these fields?
(I'm fine if this is also the current behavior)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only way how partition might be changed is
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH [PARTITION (key1=val1, key2=val2, ...)]and this is handled with completely different operation/converter AlterMaterializedTableRefreshOperation, SqlAlterMaterializedTableRefreshConverter and I don't see any specific validations there, just conversion to operation
other's change is impossible for no, yes
...g/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
Outdated
Show resolved
Hide resolved
| "(\n" | ||
| + " `m` STRING METADATA VIRTUAL,\n" | ||
| + " `calc` AS 'a' || 'b',\n" | ||
| + " `calc` AS ['a' || 'b'],\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these changes necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
together with validation resolving moved to operation level
square brackets mean unresolved expression
Lines 41 to 42 in ae45832
| // indicates that this is an unresolved expression consistent with unresolved data types | |
| private static final String FORMAT = "[%s]"; |
| List.of( | ||
| TableChange.add(physical("b", DataTypes.INT())), | ||
| TableChange.dropColumn("a"))), | ||
| TestSpec.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need tests for column type changes (e.g. widening)? If that's not supported yet, ignore this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a test
| handleColumns(tableChange); | ||
| handleDistribution(tableChange); | ||
| handleWatermark(tableChange); | ||
| handleUniqueConstraint(tableChange); | ||
| handleRefreshStatus(tableChange); | ||
| handleRefreshHandler(tableChange); | ||
| handleModifyDefinitionQuery(tableChange); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could return true to short cut the handles on the first successful invocation.
Alterantively, we could build nested Handler classes that provide a list of supported change classes. Then we can have an index from type -> handler.
Going further we could have just 1 handler per change class, where we use a generic type to capture the handled change and automatically extract the change class to builld the inex.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably I forgot to fix if condition at the very beginning
fixed that
thanks 👍
| primaryKeyName = null; | ||
| primaryKeyColumns = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to verify if the drop constraint is actually for that particular primary key? Or is this always the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to sequence the changes somehow? If I change the PK from pk1 to pk2, do I get a DropConstraint and an AddUniqueConstraint. If so, we need to drop first before readding, right?
Can this also happen for other changes?
Since changes is a list, do we already have the sequence? If so, please document that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now it is impossible to have several primary keys for one [materialized ]table
for that reason while dropping there is no name, just this command
ALTER MATERIALIZED TABLE DROP PRIMARY KEY;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to sequence the changes somehow? If I change the PK from pk1 to pk2, do I get a DropConstraint and an AddUniqueConstraint. If so, we need to drop first before readding, right?
Can this also happen for other changes?
Since changes is a list, do we already have the sequence? If so, please document that.
may be in future we will have such cases
right now I don't know how I could tell to Flink just with changing query operation (what I should put in SQL...)
Anyway I will put the comment about ordering just in case
What is the purpose of the change
The issue is that right now validation logic (the one which is not related to common syntax) is hard coded in converter.
From another side converters do not allow to extend them.
The idea is to move validation to operation. Operations could be custom and could contain slightly relaxed validation logic.
Brief change log
Materialized table operations
Verifying this change
Mostly existed tests
however some new test also added
Does this pull request potentially affect one of the following parts:
@Public(Evolving): ( no)Documentation