Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28848][table-planner] Introduces LOOKUP join hint to support delayed retry for lookup join (table alias unsupported in hint) #20482

Closed
wants to merge 8 commits into from

Conversation

lincoln-lil
Copy link
Contributor

What is the purpose of the change

Introduces LOOKUP join hint to support delayed retry for lookup join (table alias unsupported in hint)
This is the main part of FLINK-28779 to implement FLIP-234: Support Retryable Lookup Join To Solve Delayed Updates Issue In External Systems.
The join hint based on [FLINK-28682][table-planner] support join hint in batch rules #20359, and adds a new hint 'LOOKUP' described in FLIP-234.

Brief change log

  • Add new hint LOOKUP
  • Add related option class and validation
  • Add related tests for new classes
  • Adapt to async operator with retry and also adds a new RetryableLookupFunctionDelegator to wrap user's sync lookup func to support retry

Verifying this change

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @public(Evolving): (no)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 7, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@lincoln-lil
Copy link
Contributor Author

rebased master and updated the pr

partialCachingLookupProvider.getCache(),
partialCachingLookupProvider.createAsyncLookupFunction());
asyncLookupFunctions =
new CachingAsyncLookupFunction(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function should be refactored to create nested delegators of user lookup func

[CachingLookupFunction]
|
[RetryableLookupFunctionDelegator]
|
userLookupFunc

I'll update this

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @lincoln-lil , I left some comments, and could you add some hint propagation tests ?

LookupJoinHintSpec joinHintSpec,
boolean upsertMaterialize) {
// async & sync lookup candidates
Tuple2<UserDefinedFunction, UserDefinedFunction> lookupFunctions = new Tuple2<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduce a new class to represent the functions, which is more clear and readable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will add a private static class LookupFunctionCandidates

@@ -543,6 +554,201 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
util.verifyExecPlan(sql)
}

@Test
def testInvalidJoinHint(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add some tests which hints refers an inner table name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -179,6 +181,9 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
private final ChangelogMode inputChangelogMode;

@JsonProperty(FIELD_NAME_JOIN_HINT)
private final @Nullable LookupJoinHintSpec joinHintSpec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore if joinHintSpec is null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

|JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
| ON T.a = D.id
|""".stripMargin,
"Invalid LOOKUP hint options, parsing error: Could not parse value 'yes' for key 'async'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsing error will cause misunderstanding, just remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change to "Invalid LOOKUP hint options: Could not..."

private final Long asyncTimeout;

@JsonProperty(FIELD_NAME_RETRY_PREDICATE)
private final @Nullable String retryPredicate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore empty: @JsonInclude(JsonInclude.Include.NON_NULL)

Copy link
Contributor Author

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@godfreyhe thanks for reviewing this! I'll update the pr according to your comments

@@ -179,6 +181,9 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
private final ChangelogMode inputChangelogMode;

@JsonProperty(FIELD_NAME_JOIN_HINT)
private final @Nullable LookupJoinHintSpec joinHintSpec;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

LookupJoinHintSpec joinHintSpec,
boolean upsertMaterialize) {
// async & sync lookup candidates
Tuple2<UserDefinedFunction, UserDefinedFunction> lookupFunctions = new Tuple2<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will add a private static class LookupFunctionCandidates

@@ -543,6 +554,201 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
util.verifyExecPlan(sql)
}

@Test
def testInvalidJoinHint(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

|JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
| ON T.a = D.id
|""".stripMargin,
"Invalid LOOKUP hint options, parsing error: Could not parse value 'yes' for key 'async'",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change to "Invalid LOOKUP hint options: Could not..."

@lincoln-lil lincoln-lil force-pushed the FLINK-28848 branch 3 times, most recently from 09cbf8a to 06da0fb Compare August 9, 2022 08:17
Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

…elayed retry for lookup join (table alias unsupported in hint)

This is the main part of FLINK-28779 to implement FLIP-234: Support Retryable Lookup Join To Solve Delayed Updates Issue In External Systems
…on type instead of one-level parent class compartion in LookupJoinCodeGenerator

This bug can be reproduced by AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry when caching is disabled in FLINK-28849
…retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator

It is hard to reproduce this in runtime tests, but occasionally happens in AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry of FLINK-28849. It's better to add a separte test in runtime.
@lincoln-lil
Copy link
Contributor Author

@flinkbot run azure

@lincoln-lil
Copy link
Contributor Author

@flinkbot run azure

@lincoln-lil
Copy link
Contributor Author

lincoln-lil commented Aug 10, 2022

there're several merge conflicts for the latest master, so need to verify the merged pr completely, the local verify is running now

@lincoln-lil
Copy link
Contributor Author

local verify ok for table modules(the ci pipeline was too slow)
image

@godfreyhe godfreyhe closed this in 8b25b96 Aug 10, 2022
godfreyhe pushed a commit that referenced this pull request Aug 10, 2022
…on type instead of one-level parent class compartion in LookupJoinCodeGenerator

This bug can be reproduced by AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry when caching is disabled in FLINK-28849

This closes #20482
godfreyhe pushed a commit that referenced this pull request Aug 10, 2022
…retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator

It is hard to reproduce this in runtime tests, but occasionally happens in AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry of FLINK-28849. It's better to add a separate test in runtime.

This closes #20482
godfreyhe pushed a commit that referenced this pull request Aug 10, 2022
…okup and add more tests

Disable retry on async because of two problems need to be resolved first

This closes #20482
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
…elayed retry for lookup join (table alias unsupported in hint)

This is the main part of FLINK-28779 to implement FLIP-234: Support Retryable Lookup Join To Solve Delayed Updates Issue In External Systems

This closes apache#20482
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
…on type instead of one-level parent class compartion in LookupJoinCodeGenerator

This bug can be reproduced by AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry when caching is disabled in FLINK-28849

This closes apache#20482
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
…retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator

It is hard to reproduce this in runtime tests, but occasionally happens in AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry of FLINK-28849. It's better to add a separate test in runtime.

This closes apache#20482
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
…okup and add more tests

Disable retry on async because of two problems need to be resolved first

This closes apache#20482
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants