-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction #17605
Conversation
…nicity on the sort key field of UpdatableTopNFunction
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 10ca9cf (Fri Oct 29 12:11:53 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
Some tests failed, I'll fix it. |
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lincoln-lil for your contribution, left some comments.
private final boolean lenient = true; | ||
|
||
// data converter for logging only. | ||
private final DataStructureConverter rowConverter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will break compatibility. We can just introduce two transient fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! transient is better.
testHarness.processElement(binaryRecord(INSERT, "f", 1: JInt, 70: JInt)) | ||
|
||
testHarness.processElement(binaryRecord(UPDATE_AFTER, "b", 1: JInt, 10: JInt)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just left one line is OK
currentRow = rowKeyMap.get(rowKey).row; | ||
if (oldRank <= currentRank) { | ||
if (currentRank == oldRank) { | ||
checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this true? It might be that the oldSortKey
is unique and after the change it does not exist anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a wrong check here, and it's unnecessary after think it over, so I'll remove it.
@flinkbot run azure |
if (oldRank <= currentRank) { | ||
if (currentRank == oldRank) { | ||
collectUpdateBefore(out, oldRow.row, oldRank); | ||
} else { | ||
collectUpdateBefore(out, prevRow, currentRank); | ||
collectUpdateAfter(out, prevRow, currentRank - 1); | ||
if (currentRank == newRank) { | ||
collectUpdateAfter(out, newRow, currentRank); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if newSortKey > oldSortKey
but oldRank == currentRank
? The update after message will be lost. Please also add a test about this after fixing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that a more proper solution to this is not to send any message if oldRank == currentRank
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! will add case to cover it.
while (iterator.hasNext() && currentRank <= newRank) { | ||
Map.Entry<RowData, Collection<RowData>> entry = iterator.next(); | ||
Collection<RowData> rowKeys = entry.getValue(); | ||
Iterator<RowData> rowKeyIter = rowKeys.iterator(); | ||
while (rowKeyIter.hasNext()) { | ||
RowData rowKey = rowKeyIter.next(); | ||
currentRank += 1; | ||
currentRow = rowKeyMap.get(rowKey).row; | ||
if (oldRank <= currentRank) { | ||
if (currentRank == oldRank) { | ||
collectUpdateBefore(out, oldRow.row, oldRank); | ||
} else { | ||
collectUpdateBefore(out, prevRow, currentRank); | ||
collectUpdateAfter(out, prevRow, currentRank - 1); | ||
if (currentRank == newRank) { | ||
collectUpdateAfter(out, newRow, currentRank); | ||
} | ||
} | ||
} | ||
prevRow = currentRow; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This piece of algorithm seems awkward to me. Consider modifying it to:
while (iterator.hasNext() && currentRank < newRank) {
// ...
while (rowKeyIter.hasNext()) {
// ...
if (oldRank <= currentRank) {
collectUpdateBefore(out, currentRow, currentRank + 1);
collectUpdateAfter(out, currentRow, currentRank);
}
}
}
collectUpdateBefore(out, oldRow.row, oldRank);
collectUpdateAfter(out, newRow, newRank);
so that there is no prevRow
thingy. It is misleading to see a prevRow
and a currentRank
sending within the same message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! It's more simpler and readable. A small change is to emit the UB of old row before the following changes.
@@ -220,4 +221,74 @@ class RankHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) { | |||
assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result) | |||
testHarness.close() | |||
} | |||
|
|||
@Test | |||
def testUpdateRankWithRowNumber(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more test scenarios.
- Calculate top 5 but there are more than 5 candidates.
- Sort key drops but ranking does not change.
- Sort key drops but does not drop to the last ranking.
- Calculate top 5, 7 candidates, previous rank 3 drops to rank 6 (but it is still "rank 5").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review, I'll update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Let's wait for the CI to pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
…s monotonicity on the sort key field of UpdatableTopNFunction This closes apache#17605
…s monotonicity on the sort key field of UpdatableTopNFunction This closes #17605
…s monotonicity on the sort key field of UpdatableTopNFunction This closes apache#17605
What is the purpose of the change
Fix the exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction
Verifying this change
This change is already covered by RankHarnessTest
Does this pull request potentially affect one of the following parts:
Documentation