New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-32001][table] Row-level update should support returning partial columns #22525
Conversation
3c3db8c
to
e63a149
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. I left some comments.
if (sinkAbilitySpec instanceof RowLevelUpdateSpec) { | ||
RowLevelUpdateSpec rowLevelUpdateSpec = (RowLevelUpdateSpec) sinkAbilitySpec; | ||
return getPhysicalRowType(schema, rowLevelUpdateSpec.getRequireColumnIndices()); | ||
} else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) { | ||
RowLevelDeleteSpec rowLevelDeleteSpec = (RowLevelDeleteSpec) sinkAbilitySpec; | ||
return getPhysicalRowType( | ||
schema, rowLevelDeleteSpec.getRequiredPhysicalColumnIndices()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just wonder whether it's better to introduce a method like Optional<RowType> getConsumedType
for this sink ability spec. Actually, we already have getProducedType
in the source ability spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There're many places use getProducedType
in the source ability spec, but seems it's the only place to use something like Optional<RowType> getConsumedType
. I think we can just limit it in here to avoid think too much to early and consider to expose in SinkabilitySpec
if we found it'll be needed in many places in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine. We can modify this if we needed in the future.
*/ | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
@JsonTypeName("RowLevelDelete") | ||
public class RowLevelDeleteSpec implements SinkAbilitySpec { | ||
public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = "rowLevelDeleteMode"; | ||
public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES = | ||
"requiredPhysicalColumnIndices"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProjectPushDownSpec
also contains an array to mark the projection. I think requiredPhysicalColumn
is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by saying requiredPhysicalColumn
is enough? AFAIC, ProjectPushDownSpec
also contains an array to mark the projection, so we can also make RowLevelDeleteSpec
contains an array to mark the required physical columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh. I mean it's better to rename to requiredPhysicalColumn
to align the name behaviour. It's fine to use requiredPhysicalColumnIndices
@@ -78,6 +87,11 @@ public SupportsRowLevelDelete.RowLevelDeleteMode getRowLevelDeleteMode() { | |||
return rowLevelDeleteMode; | |||
} | |||
|
|||
@Nonnull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove it? It's no bad to keep it. I think we can keep the annotation just like what we do for the other method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually in the code style, it suggests it's not a necessary behaviour to mark the field not null. BTW, it's very verbose if we mark every method's return type not null[1].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me. I'll remove it.
public int[] getRequiredPhysicalColumnIndices() { | ||
return requiredPhysicalColumnIndices; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to modify the equals method.
int fieldIndex = sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i)); | ||
if (fieldIndex == -1) { | ||
return new int[0]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should try our best to validate all avaliable pk columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In here, we do validate all avaliable pk columns.
The logic is try to find all the pk columns from the ResolvedSchema
, but if for one pk column, we can't find from the ResolvedSchema
, that means the pk columns miss some columns in which case we will consider it as no pk columns.
Note: this case only happens in update statement, the required columns returned by connector don't contain all pk columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean why we don't return all available positions here?
uniqueConstraint.getColumns().stream()
.mapToInt(sinkRowType::getFieldIndex)
.filter(i -> i == -1)
.toArray();
I find the indcies is used as shuffle key, in the update statement we don't need shuffle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use primary keys as shuffle keys, but they still are primary keys which maybe used for other purposes in the CommonExecSink
. If not all primary keys are available, we should consider it as no primary keys which is the semantic of primary keys.
Btw, I have moved such logic to BatchExecSink
to make every clear since only update statement which is only supported in batch needs to consider it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. I left some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. I left some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luoyuxia thanks for fixing this! Overall looks good to me, and it is recommended to add some more cases to cover partition, metadata(both virtual and non-virtual) and computed columns.
@flinkbot run azure |
@@ -138,7 +181,7 @@ public void testMixDelete() throws Exception { | |||
@Test | |||
public void testStatementSetContainDeleteAndInsert() throws Exception { | |||
tEnv().executeSql( | |||
"CREATE TABLE t (a int, b string, c double) WITH" | |||
"CREATE TABLE t (a int , b string, c double) WITH" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rm extra space
Agree. I create a separate Jira FLINK-32117 to track it as it seems a separate issue to me. I expect to finish it in another jira. |
@lincoln-lil @fsk119 Thanks for your review. I have addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luoyuxia seems e2e test2 always fails, could you contact the release manager to see what happens?
Others also encounter the problem as reported in FLINK-32121. The reason is shown as FLINK-32123. |
What is the purpose of the change
To make row-level update support returning partial columns. Without this pr,
ArrayIndexOutOfBoundsException
may happen inConstraintEnforcer
operator just like FLINK-32001 reported.Brief change log
CommonExecSink
so that theConstraintEnforcer
won't check the coulmns not existed in the columns to be written.TestUpdateDeleteTableFactory
to make it can delete / update rows with partial columns.Verifying this change
Added test in
UpdateTableITCase#testPartialUpdate
,DeleteTableITCase#testRowLevelDeleteWithPartitionColumn
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation