Skip to content

Conversation

@lincoln-lil
Copy link
Contributor

What is the purpose of the change

As the issue shows there's some chance for optimizing the lookup join when do a left join (maybe full outer join as well in future) which has filter condition on left input in the join condition. We can achieve this by adding a prefilter in lookup join operator, this is what has been done in the pr.

Brief change log

  • add a new FilterCondition to the codegen part
  • add pre-filter (via codegen) for lookup join operator

Verifying this change

  • json plan test (LookupJoinJsonPlanTest)
  • lookup join operator tests (LookupJoinHarnessTest KeyedLookupJoinHarnessTest AsyncLookupJoinHarnessTest)
  • lookup join itcase (LookupJoinITCase)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @public(Evolving): (no)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 29, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@swuferhong swuferhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lincoln-lil. Thanks for your contribution, I left some comments.

/** Describes a generated {@link FilterCondition}. */
public class GeneratedFilterCondition extends GeneratedFunction<FilterCondition> {

private static final long serialVersionUID = 1L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change serialVersionUID to 2L as other GeneratedXXX do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Flink code guideline, all new serializable class should contain the uid start from 1L, the existed GeneratedXXXs have the number 2L because they have been changed and increased.

Comment on lines +181 to +182
@JsonProperty(FIELD_NAME_PRE_FILTER_CONDITION)
@JsonInclude(JsonInclude.Include.NON_NULL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need add this JsonInclude? Add Itcase & Ut case to cover it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-filter condition is a nullable attribute, we can omit it in the json plan which will not affect the serialization & deserialization. This has been covered by the LookupJoinJsonPlanTest, agree you that add another case into LookupJoinJsonPlanITCase

Comment on lines +103 to +104
finalPreFilterCondition.orNull,
finalRemainingCondition.orNull,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are finalPreFilterCondition and finalRemainingCondition both orNull, but in CommonExexcLookupJoin, only preFilterCondition with json include NON_NULL, but remainingJoinCondition without.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the compatibility, I didn't change the original finalRemainingCondition (also add a NON_NULL json annotation), but the newly added one (finalPreFilterCondition) can be tagged safely.

val remainingCondition: Option[RexNode] = getRemainingJoinCondition(
// split remaining condition into pre-filter(used to filter the left input before lookup) and
// remaining parts(used to filter the joined records)
val (finalPreFilterCondition, finalRemainingCondition) = splitRemainingJoinCondition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rename this method. splitRemainingJoinCondition looks like split the finalRemainingCondition instead of split condition into pre-filter and remaining.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just simplified to splitJoinCondition?

Copy link
Contributor Author

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swuferhong thanks for reviewing this! I've udpated the pr according to your comments.

Comment on lines +181 to +182
@JsonProperty(FIELD_NAME_PRE_FILTER_CONDITION)
@JsonInclude(JsonInclude.Include.NON_NULL)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-filter condition is a nullable attribute, we can omit it in the json plan which will not affect the serialization & deserialization. This has been covered by the LookupJoinJsonPlanTest, agree you that add another case into LookupJoinJsonPlanITCase

val remainingCondition: Option[RexNode] = getRemainingJoinCondition(
// split remaining condition into pre-filter(used to filter the left input before lookup) and
// remaining parts(used to filter the joined records)
val (finalPreFilterCondition, finalRemainingCondition) = splitRemainingJoinCondition(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just simplified to splitJoinCondition?

Comment on lines +103 to +104
finalPreFilterCondition.orNull,
finalRemainingCondition.orNull,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the compatibility, I didn't change the original finalRemainingCondition (also add a NON_NULL json annotation), but the newly added one (finalPreFilterCondition) can be tagged safely.

/** Describes a generated {@link FilterCondition}. */
public class GeneratedFilterCondition extends GeneratedFunction<FilterCondition> {

private static final long serialVersionUID = 1L;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Flink code guideline, all new serializable class should contain the uid start from 1L, the existed GeneratedXXXs have the number 2L because they have been changed and increased.

Copy link
Contributor

@lsyldliu lsyldliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lincoln-lil Thanks for your contribution, the changes look good to me overall. I just left one minor comment. In addition, it make sense to if we can add some tests to cover the batch mode.


/** remaining join condition except pre-filter & equi-conditions except lookup keys. */
@JsonProperty(FIELD_NAME_REMAINING_JOIN_CONDITION)
private final @Nullable RexNode remainingJoinCondition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the context, so this field also can add JsonInclude annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason for not adding 'NON_NULL' json annotation is to support older versions of serialized json plan for compatibility reasons.

Copy link
Contributor

@swuferhong swuferhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

Copy link
Contributor Author

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lsyldliu thanks for your comments! For the testing, the existed batch case LookupJoinITCase#testLeftJoinTemporalTableWithLocalPredicate already covers the new pre-filter condition path, considering both batch and streaming share the same codegen & runtime operator, and also there's no json plan in batch mode, so I didn't add more case for batch.


/** remaining join condition except pre-filter & equi-conditions except lookup keys. */
@JsonProperty(FIELD_NAME_REMAINING_JOIN_CONDITION)
private final @Nullable RexNode remainingJoinCondition;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason for not adding 'NON_NULL' json annotation is to support older versions of serialized json plan for compatibility reasons.

@lincoln-lil
Copy link
Contributor Author

reorg the commits before merging.

@lincoln-lil lincoln-lil merged commit 360b97a into apache:master Aug 31, 2023
@lincoln-lil lincoln-lil deleted the FLINK-18445 branch August 31, 2023 01:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants