-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-30665][table] Planner supports row-level update #21698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a4db6c0 to
a3f1c61
Compare
|
@lincoln-lil The pr is ready to review. Could you please help review when you're free? |
b25d2be to
3dd9fa1
Compare
… a new separate SqlToOperationConverterTestUtils to avoid SqlToOperationConverterTest is too long to make check-style fail
lincoln-lil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luoyuxia thanks for contributing this! Looks good to me overall, I've left some comments.
| import java.util.Map; | ||
|
|
||
| /** Utils for {@link org.apache.flink.table.planner.operations.SqlToOperationConverterTest} . */ | ||
| public class SqlToOperationConverterTestUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the SqlToOperationConverterTest has exceeded the maximum row limit, we might as well split it by sql type, such as ddl, dml and other command, extract a SqlToOperationConverterTestBase to keep all the helper methods, and split the test into three pieces: SqlDDLToOperationConverterTest, SqlDMLToOperationConverterTest, and SqlCommandToOperationConverterTest , WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good suggestion.
| RelRoot updateRelational = flinkPlanner.rel(sqlUpdate); | ||
| // get target sink table | ||
| LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel; | ||
| List<String> targetTablePath = tableModify.getTable().getQualifiedName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use UnresolvedIdentifier directly to keep consistent with the delete operation
|
|
||
| /** | ||
| * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level | ||
| * update mode & updated columns to/from JSON, but also can update existing data for {@link |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'update mode & updated columns' -> 'update mode & columns'
| List<Column> updatedColumns = getUpdatedColumns(tableModify, resolvedSchema); | ||
| SupportsRowLevelUpdate.RowLevelUpdateInfo updateInfo = | ||
| supportsRowLevelUpdate.applyRowLevelUpdate( | ||
| updatedColumns, RowLevelModificationContextUtils.getScanContext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a local variable for RowLevelModificationContextUtils.getScanContext()
| .withDescription("The name for the required columns in row-level update"); | ||
|
|
||
| private static final ConfigOption<Boolean> ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE = | ||
| ConfigOptions.key("only_require_updated_columns_for_update") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'only_require_updated_columns_for_update' -> 'only-require-updated-columns-for-update'
| .booleanType() | ||
| .defaultValue(false) | ||
| .withDescription( | ||
| "Whether to only require the updated columns for update statement. "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add additional comments: ', require all columns by default'
| } | ||
|
|
||
| @Test | ||
| public void testUpdateWithOnlyRequireUpdatedCols() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a new case which not contains all columns in update clause, e.g., a source table with more columns here 'TABLE t (f0, f1,..., a int, b string, c double, ...)'
…nts for row-level update
|
@lincoln-lil Thanks for reviewing. I append two commits to fix your comments. The first one is to address the comments for row-level update, the second one is to split the test |
…three different test classes to avoid it exceed the maximum row limit
lincoln-lil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luoyuxia thanks for your update! I have a minor comment for the test splitting: although there is no strict guideline on how to classify various sql commands, it is clearly inappropriate to classify commands such as show, load, use, etc. into ddl or dml, should we adjust it to refer to mysql's definition of ddl, dml, and put the remaining into sqlOtherOperation (probably called sqlCommand is also easy to cause ambiguity).
Btw, since 'begin statement set' is used to assist multiple dml statements, we can also put it into dml test, WDYT?
Agree to it. The reason I put
Agree to it |
|
@lincoln-lil Thanks for reviewing. Now I move the test for the statements that neither belong to DDL nor DML to |
|
@flinkbot run azure |
lincoln-lil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What is the purpose of the change
To make planner supports row-level update.
Brief change log
Convert the
SqUpdatetoLogicalTableModify, and then rewrite theLogicalTableModifyto a RelNode that insert into the sink with rowsALL_ROWS, assuming the filter isa, convert the update expression toIF(a, xx, yyy).LogicaTableScan to add extra meta cols to the underlying table. Then create a Project node that only select the required
columns.
If update mode is
UPDATED_ROWS, add a operator to set theRowKindto RowKind#UPDATE_AFTER in CommonExecSinkMove the method
SqlToOperationConverterTest#prepareTableto a new class to avoidSqlToOperationConverterTestis too long to fail to pass check-style.Verifying this change
SqlToOperationConverterTest#testUpdateto verify update statement will be converted toSinkModifyOperation.RowLevelDeleteUpdateto verify the plan for row-level update.UpdateTableITCaseto verify update can be performed normally.Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation