Skip to content

Conversation

@jerryjzhang
Copy link
Contributor

@jerryjzhang jerryjzhang commented Nov 4, 2020

What is the purpose of the change

Fetching first N rows is a special case of TopN, which currently is supported by the Rank operator. However, the full-fledged TopN implementation is heavyweight in that it involves sort key comparison and raw row buffering. This leads to large state and memory buffer that is not needed for the first-n-row case. This PR introduces a lightweight variant of TopN function to handle it more efficiently.

Brief change log

  • Add AppendOnlyFirstNFunction as a variant of AppendOnlyTopNFunction
  • Modify StreamExecRank to instantiate AppendOnlyFirstNFunciton when the first-n-row case is determined

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests inAppendOnlyFirstNFunctionTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 4, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 5b90fd3 (Sat Aug 28 12:14:32 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 4, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@jerryjzhang
Copy link
Contributor Author

@wuchong @godfreyhe would you mind take a look?

@JingsongLi
Copy link
Contributor

Hi @tragicjun, can you rebase latest master?

@JingsongLi JingsongLi self-requested a review May 24, 2021 10:28
@jerryjzhang
Copy link
Contributor Author

Hi @tragicjun, can you rebase latest master?

Hi @JingsongLi rebase latest master done, please take a look.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the rebasing. @tragicjun Can you create a ITCase or point out which ITCase you want to solve?

*/
public class AppendOnlyFirstNFunction extends AbstractTopNFunction {

private static final long serialVersionUID = -889227691088906246L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serialVersionUID should be 1L at first, From the Flink code style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serialVersionUID is at the 2nd in all other classes, thus in consistent with them.

cacheSize);
if (sortFields.length == 1
&& inputType.getChildren().get(sortFields[0]).getTypeRoot()
== LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just TIMESTAMP_WITHOUT_TIME_ZONE? I don't see any proc-time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, forgot to push the latest commit.

@jerryjzhang jerryjzhang requested a review from JingsongLi May 31, 2021 12:27
@jerryjzhang
Copy link
Contributor Author

Thanks for the rebasing. @tragicjun Can you create a ITCase or point out which ITCase you want to solve?

Thanks for reviewing. I have added an ITCase as RankJsonPlanITCase.testFirstN(). Please let me know if any issues.

boolean isAppendOnlyFirstN = false;
if (sortFields.length == 1) {
LogicalType sortKeyType = inputType.getChildren().get(sortFields[0]);
if (sortKeyType instanceof LocalZonedTimestampType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeCheckUtils.isProcTime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, thx

@JingsongLi
Copy link
Contributor

@tragicjun still some compile failure.

@jerryjzhang
Copy link
Contributor Author

@tragicjun still some compile failure.

sorry, some format violation. Project build passed on my local, please review.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tragicjun for the update.
Looks good to me, merging...

@JingsongLi JingsongLi merged commit 76a08c8 into apache:master Jun 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants