New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-19694][table] Support Upsert ChangelogMode for ScanTableSource #13721
Conversation
Hi @leonardBang , could you help to review this? |
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit a62fac3 (Wed Oct 21 10:28:42 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
d5e77b2
to
b03cd2b
Compare
I have rebased the branch to resolve conflicts. Appreciate if you can have a look @leonardBang . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wuchong for the contribution, I left some comments
import scala.collection.JavaConversions._ | ||
|
||
/** | ||
* Stream physical RelNode which materializes an upsert stream where where each data record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Stream physical RelNode which materializes an upsert stream where where each data record | |
* Stream physical RelNode which materializes an upsert stream where each record |
false, // inputInsertOnly | ||
rowSerializer, | ||
// disable state ttl, the upsert materialize should keep all state to have data integrity | ||
// we can enable state ttl if this is really needed in some cases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this conflicts with global TTL setting? I understand the motivation, but it may confuse user.
|) | ||
""".stripMargin) | ||
thrown.expect(classOf[UnsupportedOperationException]) | ||
thrown.expectMessage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test failed, please update the exception message.
@@ -307,7 +307,7 @@ Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], leftInputSpec=[NoUn | |||
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) | |||
+- Exchange(distribution=[single]) | |||
+- Calc(select=[IS NOT NULL(m) AS $f0]) | |||
+- GroupAggregate(select=[MIN(i) AS m]) | |||
+- GroupAggregate(select=[MIN_RETRACT(i) AS m]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the function change for existed test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug in previous FlinkRelMdModifiedMonotonicity
, the need retraction inference was not correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @wuchong , I left a few comments
if (!schema.getPrimaryKey().isPresent()) { | ||
throw new TableException( | ||
String.format( | ||
"Table '%s' produces a changelog stream contains UPDATE_AFTER no UPDATE_BEFORE, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which contains only UPDATE_AFTER, no UPDATE_BEFORE.
?
.replace(FlinkConventions.STREAM_PHYSICAL) | ||
val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet) | ||
|
||
new StreamExecUpsertMaterialize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the primary key fields are not required, we should add a Calc here to reduce output fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we have to add a Calc here? The transformed tree has the same output row type with the original Scan node. If the primary key fields are never used in the following nodes, there should already be a Calc after the Scan node. So I think we shouldn't add a Calc.
: +- Calc(select=[amount, currency, PROCTIME() AS proctime], changelogMode=[I]) | ||
: +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency], changelogMode=[I]) | ||
+- Exchange(distribution=[hash[currency]], changelogMode=[I,UB,UA,D]) | ||
+- UpsertMaterialize(key=[currency], changelogMode=[I,UB,UA,D]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does UpsertMaterialize
produce UB
message ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on whether the following nodes requires UB
messages.
In this case, the temporal join currently always root requires UB
, therefore UpsertMaterialize
has to emit `UB.
mq: RelMetadataQuery, | ||
columns: ImmutableBitSet, | ||
ignoreNulls: Boolean): JBoolean = { | ||
columns != null && util.Arrays.equals(columns.toArray, rel.uniqueKeys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the values of rel.uniqueKeys
in order ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
After discussing with @leonardBang , I renamed |
Hi @godfreyhe , I have added the plan test as we discussed offline, and I did find a bug. Appreciate if you can have another look . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wuchong for the update, LGTM +1
Build is passed in my own Azure build: https://dev.azure.com/imjark/Flink/_build/results?buildId=308&view=results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, I left a few minor comments
} | ||
|
||
@Test | ||
def testGetUniqueKeysOnStreamExecUpsertMaterialize(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to testGetUniqueKeysOnStreamExecChangelogNormalize
@@ -691,6 +691,18 @@ class FlinkRelMdHandlerTestBase { | |||
(calcOfFirstRow, calcOfLastRow) | |||
} | |||
|
|||
protected lazy val streamUpsertMaterialize = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to streamChangelogNormalize
} | ||
|
||
@Test | ||
def testGetRelMonotonicityOnUpsertMaterialize(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to testGetRelMonotonicityOnChangelogNormalize
@@ -275,6 +275,15 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase { | |||
assertFalse(mq.areColumnsUnique(streamDeduplicateLastRow, ImmutableBitSet.of(0, 1, 2))) | |||
} | |||
|
|||
@Test | |||
def testAreColumnsUniqueCountOnStreamExecUpsertMaterialize(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to testAreColumnsUniqueCountOnStreamExecChangelogNormalize
@@ -1140,48 +1140,4 @@ class JoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(sta | |||
val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") | |||
assertEquals(expected.sorted, sink.getRetractResults.sorted) | |||
} | |||
|
|||
@Test | |||
def testJoinOnChangelogSource(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this test is removed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been moved to ChangelogSourceITCase#testRegularJoin
to reuse the tests.
…canTableSource in planner This closes apache#13721
…w introduced StreamExecUpsertMaterialize node This closes apache#13721
…canTableSource in runtime This closes apache#13721
Thanks for the reviewing @godfreyhe . I have rebased and squashed the commits and hope I have addressed all the comments. Will merge this pull request once build passed. |
…canTableSource in planner This closes #13721
…w introduced StreamExecUpsertMaterialize node This closes #13721
What is the purpose of the change
Support
[UPDATE_AFTER, DELETE]
ChangelogMode which indicates the source will emit onlyUPDATE_AFTER
andDELETE
messages during runtime (e.g. an upsert source). The planner will add the a materialization operator when the ChangelogMode of the source is[UPDATE_AFTER, DELETE]
.The materialization operator will materialize the upsert stream and generate changelog stream with full change messages. In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values.
Brief change log
StreamExecUpsertMaterialize
when the scan is a upsert source.FlinkRelMdModifiedMonotonicity
.Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation