Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] support flush #23129

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open

[WIP] support flush #23129

wants to merge 2 commits into from

Conversation

Hebella
Copy link

@Hebella Hebella commented Aug 3, 2023

Parameter Type Description
execution.max-flush-interval long Maximum interval between two flush operations
dateNum long Number of input data

Example usage

$ ./bin/flink run examples/streaming/AllowedLatency.jar --execution.max-flush-interval 2000 --dataNum 10000000

Benchmark

With Heap State Backend

Parameter Value
Number of records 100000000
Number of distinct keys 2000000
Max flush interval 5000ms
Throughput Memory Visits
4967 /ms 121324416
2390 /ms 100000000

When flush is enabled, the operator buffers the input records until a flush event is triggered. At that point, it emits a single record for each distinct key, effectively reducing the number of emitted records. On the other hand, without the flush operation, the operator would emit one output record for each input record, potentially leading to a larger number of records being processed.
Despite the higher memory access overhead, the aggregation with flush enabled proves more efficient due to its optimized handling of data emission by reducing the number of output records through key-based aggregation.

Number of Distinct Keys

Number of records Number of distinct keys Average key count Max Flush Interval RocksDB Visits Throughput
1e7 1e7 / / 10000000 206 /ms
1e7 1e7 1 1000 ms 10000000 150 /ms
1e8 2e6 3 1000 ms 26000012 589 /ms
1e8 1e6 8 1000 ms 11000010 1701 /ms
1e6 1e6 / / 1000000 175 /ms
1e6 1e6 1 1000 ms 1000000 168 /ms
5e7 1e6 8 500 ms 6000005 1528 /ms
1e8 5e5 16 500 ms 6000011 2785 /ms

Performance improvements are attained when working with a smaller number of distinct keys. This is primarily attributed to the decreased visits to the state backend.

Checkpoint

Number of distinct keys Max Flush Interval Checkpoint Interval Average duration Max duration
1e6 1000ms 10000ms 67 ms 100 ms
1e6 / 10000ms 86 ms 161 ms

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 3, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Hebella Hebella force-pushed the feat/flush branch 2 times, most recently from 2715643 to c85316d Compare August 9, 2023 02:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants