Skip to content

Conversation

@wuchong
Copy link
Member

@wuchong wuchong commented Oct 15, 2020

What is the purpose of the change

Implements the cumulative window assigner and supports cumulative window aggregate in WindowOperator.

Brief change log

  • Add CumulativeWindowAssigner
  • Add cumulative window builder method in WindowOperatorBuilder

Verifying this change

  • Add CumulativeWindowAssignerTest to test CumulativeWindowAssigner.
  • Add harness tests for cumulative window aggregate in WindowOperatorTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@wuchong
Copy link
Member Author

wuchong commented Oct 15, 2020

cc @danny0405

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 15, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 2cd041a (Fri May 28 07:11:56 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 15, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

this.offset = offset;
this.isEventTime = isEventTime;
this.paneSize = step;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If paneSize always equals step, keep only step is enough, because we always have limitation size must be an integral multiple of step.

}

this.size = size;
this.step = step;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support size argument that is not 1 day ? Say 2 days or 3 days ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should name it the maxSize because the window size is actually increasing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can support 2 days max size.

assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());

testHarness.processWatermark(new Watermark(5999));
expectedOutput.addAll(doubleRecord(isTableAggregate, insertRecord("key2", 1L, 1L, 3000L, 6000L, 5999L)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also need to add a test case for the late arrive data.

@wuchong
Copy link
Member Author

wuchong commented Oct 20, 2020

@danny0405 please have a look again.

* @return The time policy.
*/
public static CumulativeWindowAssigner of(Duration size, Duration step) {
return new CumulativeWindowAssigner(size.toMillis(), step.toMillis(), 0, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size -> maxSize

@wuchong
Copy link
Member Author

wuchong commented Oct 21, 2020

The failed Azure is the unstable e2e tests. This change shouldn't affect e2e part.

Merging...

@wuchong wuchong merged commit 9423cf1 into apache:master Oct 21, 2020
@wuchong wuchong deleted the cumulative-agg-runtime branch October 21, 2020 02:40
@HuangZhenQiu
Copy link
Contributor

@danny0405 @wuchong
As I remember, there was a FLIP talking about the cumulative window. But I can't find it right now. Would you please point me to the design doc? We have some similar requirements internally, so want to align more on the functionality with you guys.

@wuchong
Copy link
Member Author

wuchong commented Jan 22, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants