-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism #20324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
7fa8139 to
236db88
Compare
|
rebased latest master branch |
godfreyhe
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution, I left some comments
| */ | ||
| @Internal | ||
| public abstract class ListenableCollector<T> extends TableFunctionCollector<T> { | ||
| private CollectListener<T> collectListener; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should mark it as @Nullable
| * TableFunctionCollector} which combines left and right into a JoinedRowData. | ||
| */ | ||
| public static final class TestingFetcherCollector extends TableFunctionCollector { | ||
| public static final class TestingFetcherCollector extends ListenableCollector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type should be ListenableCollector<RowData> ?
|
|
||
| List<Integer> refKeys = | ||
| allLookupKeys.entrySet().stream() | ||
| .filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
directly filter the FieldRefLookupKeys using filter(key -> (key.getValue() instanceof LookupJoinUtil.FieldRefLookupKey)) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
| new KeyedProcessOperator<>(keyedLookupJoinWrapper); | ||
|
|
||
| List<Integer> refKeys = | ||
| allLookupKeys.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use allLookupKeys.values() can avoid key.getValue()
| OneInputTransformation<RowData, RowData> transform = | ||
| ExecNodeUtil.createOneInputTransformation( | ||
| partitionedTransform, | ||
| createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, config), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should define another operatorName for the Transformation, Because LOOKUP_JOIN_TRANSFORMATION has used for the join Transformation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| } | ||
|
|
||
| @Test | ||
| def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any IT case to verify the change? this pr aims to support sync LookupJoin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this case can cover the change, the legacy source can provide both sync and async functions, so it can fallback to sync lookup function with state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added more cases in LookupJoinITCase
236db88 to
2232c0d
Compare
lincoln-lil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@godfreyhe thank you for reviewing this! I've address your comments and updated the pr
|
|
||
| List<Integer> refKeys = | ||
| allLookupKeys.entrySet().stream() | ||
| .filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
| OneInputTransformation<RowData, RowData> transform = | ||
| ExecNodeUtil.createOneInputTransformation( | ||
| partitionedTransform, | ||
| createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, config), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| } | ||
|
|
||
| @Test | ||
| def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this case can cover the change, the legacy source can provide both sync and async functions, so it can fallback to sync lookup function with state.
f8cf4e2 to
3329be5
Compare
| } | ||
|
|
||
| @Test | ||
| public void testAggAndAllConstantLookupKeyWithTryResolveMode() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should be moved to LookupJoinTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
542a420 to
e257c65
Compare
lincoln-lil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@godfreyhe I've rebased master to resolve conflicts and added more cases according to your comments, thanks again !
| } | ||
|
|
||
| @Test | ||
| public void testAggAndAllConstantLookupKeyWithTryResolveMode() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| } | ||
|
|
||
| @Test | ||
| def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added more cases in LookupJoinITCase
godfreyhe
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, I left some comments
| return transform; | ||
| } | ||
|
|
||
| private LogicalType getLookupKeyLogicalType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method can be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this newly method from master is unused, I'll remove it
|
|
||
| public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join"; | ||
|
|
||
| public static final String LOOKUP_JOIN_WITH_STATE_TRANSFORMATION = "lookup-join-with-state"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about lookup-join-with-materialize ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to 'lookup-join-materialize', omit the word 'with'
| ? (RowType) toLogicalType(temporalTableOutputType.get()) | ||
| : tableSourceRowType; | ||
| GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector = | ||
| getProjectionRowTypeOnTemporalTable(relBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can call the getProjectionRowTypeOnTemporalTable method in if (projectionOnTemporalTable != null) branch, and the implementation of getProjectionRowTypeOnTemporalTable method can be simplified
There is some duplicated code in createAsyncLookupJoin method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| public abstract class ListenableCollector<T> extends TableFunctionCollector<T> { | ||
| @Nullable private CollectListener<T> collectListener; | ||
|
|
||
| public void setCollectListener(CollectListener<T> collectListener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: @nullable CollectListener collectListener
| RowData right = (RowData) record; | ||
| RowData right = record; | ||
| getCollectListener() | ||
| .ifPresent(listener -> ((CollectListener) listener).onCollect(record)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cast is no needed here
fd845d9 to
c502d15
Compare
godfreyhe
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, LGTM. Please rebase master, the failure test had been fixed
…ync mode only) with state to eliminate non-deterministic result
…no for ListenableCollector
2d453dc to
50f6719
Compare
|
rebased latest master to get rid of failed yarn it case |
|
@lincoln-lil run azure |
|
An irrelevant failure case of es sink https://issues.apache.org/jira/browse/FLINK-28877 |
I will merge it |
…ync mode only) with state to eliminate non-deterministic result This closes apache#20324
What is the purpose of the change
This is a followup implementation of FLINK-28570 which introduces a new lookup join operator (sync mode only) with state to eliminate the non determinism.
Brief change log
Verifying this change
newly added KeyedLookupJoinHarnessTest and existing LookupJoinITCase、AsyncLookupJoinITCase
Does this pull request potentially affect one of the following parts:
Documentation