Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add spilling capability for multi_cast_local_exchange operator #47982

Merged

Conversation

silverbullet233
Copy link
Contributor

@silverbullet233 silverbullet233 commented Jul 8, 2024

Why I'm doing:

When CTEs in a query are reused, the same data is sent to multiple downstream through the MultiCastLocalExchange operator. In the current implementation, all unconsumed data is cached in memory, depending on the gap between the fastest and slowest consumers, resulting in uncontrollable memory usage.

In this scenario, we cannot control memory by restricting the producer's writes, as there may be dependencies between downstream CTEs, such as when the hash join build side and probe side appear simultaneously, the probe side needs to wait for the build side to complete. If restrictions are imposed on the write side, it will cause queries to be stuck forever due to mutual waiting.

What I'm doing:

In this PR, I introduced a new implementation of MultiCastLocalExchanger called SpillableMultiCastLocalExchanger that can control the memory usage of operators by spilling data to disk.

SpillableMultiCastLocalExchanger is implemented based on MemLimitedChunkQueue.

MemLimitedChunkQueue is an MPMC queue that accepts multiple producers writing data from different threads and supports multiple consumers consuming the same data, similar to a message queue like Kafka. It controls memory usage by spilling data to disk, while also considering the efficiency of spilling io. as we know, too many small io can seriously affect performance.
MemLimitedChunkQueue organizes data internally in the form of a block linked list. The nodes in a linked list are called Block, which contain multiple Cells, each Cell contains a Chunk.

Every time chunk is pushed, a Cell is created and added to the Block at the end of the linked list. When the size of the Block exceeds a certain limit, a new Block is generated and added to the end of the linked list.

Block is the smallest unit handling by spill io task. when writing, if the data in memory exceeds a certain size, a flush io task will be submitted to flush the oldest block to the disk.

when consumers consume data and find that the block they want to read is not in memory, they will submit a load io task to load the corresponding block back into memory.

Through this approach, we can ensure that both producers and consumers can function properly while controlling memory usage.

Test

I constructed a query based on the tpch 100g dataset for testing.

set pipeline_dop=8;
select l_shipdate,count(distinct l_orderkey),count(distinct l_linenumber),count(distinct l_partkey),count(distinct l_suppkey),count(distinct l_quantity),count(distinct l_extendedprice),count(distinct l_discount),count(distinct l_tax),count(distinct l_returnflag),count(distinct l_linestatus) from lineitem group by l_shipdate;

baseline doesn't change other session variables.
for testing, only enable force spill on multi_cast_local_exchange operator.

set enable_spill=true;
set spill_mode='force';
set spillable_operator_mask=32;

here is some metrics from query profile

MULTI_CAST_LOCAL_EXCHANG_SINK
baseline

          UniqueMetrics:
             - ExchangerPeakBufferRowSize: 551.312M (551311886)
             - ExchangerPeakMemoryUsage: 31.848 GB

with spillable_multi_cast_local_exchange

          UniqueMetrics:
             - ExchangerPeakBufferRowSize: 3.473M (3473408)
             - ExchangerPeakMemoryUsage: 205.394 MB
             - FlushIOBytes: 16.275 GB
             - FlushIOCount: 4.079K (4079)
             - FlushIOTime: 18s58ms
             - ReadIOBytes: 16.036 GB
             - ReadIOCount: 3.982K (3982)
             - ReadIOTime: 10s871ms

QueryPeakMemoryUsage and QueryTime
baseline: 78.354 GB, 48s952ms
with spillable_multi_cast_local_exchange:67.408 GB, 1m27s

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
@silverbullet233 silverbullet233 changed the title [Ignore][WIP]ignore me [Enhancement] add spilling capability for multi_cast_local_exchange operator Jul 12, 2024
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
satanson
satanson previously approved these changes Jul 17, 2024
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
satanson
satanson previously approved these changes Jul 22, 2024
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
satanson
satanson previously approved these changes Jul 25, 2024
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
Copy link

sonarcloud bot commented Jul 25, 2024

@silverbullet233 silverbullet233 enabled auto-merge (squash) July 25, 2024 06:13
Copy link

[FE Incremental Coverage Report]

pass : 0 / 0 (0%)

Copy link

[BE Incremental Coverage Report]

pass : 417 / 455 (91.65%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 be/src/exec/pipeline/exchange/spillable_multi_cast_local_exchange.cpp 32 37 86.49% [43, 48, 93, 94, 95]
🔵 be/src/exec/pipeline/exchange/mem_limited_chunk_queue.h 37 43 86.05% [172, 173, 174, 176, 177, 178]
🔵 be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp 331 358 92.46% [51, 144, 145, 166, 171, 182, 235, 236, 291, 292, 293, 325, 326, 327, 328, 329, 349, 374, 471, 472, 543, 553, 554, 555, 556, 557, 558]
🔵 be/src/util/bit_mask.h 9 9 100.00% []
🔵 be/src/exec/pipeline/exchange/multi_cast_local_exchange.h 2 2 100.00% []
🔵 be/src/runtime/runtime_state.h 1 1 100.00% []
🔵 be/src/exec/data_sink.cpp 5 5 100.00% []

@silverbullet233 silverbullet233 merged commit ca962a5 into StarRocks:main Jul 25, 2024
52 of 54 checks passed
@silverbullet233 silverbullet233 deleted the mcast_local_exchange_limit branch July 25, 2024 07:13
dujijun007 pushed a commit to dujijun007/starrocks that referenced this pull request Jul 29, 2024
…perator (StarRocks#47982)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
@silverbullet233
Copy link
Contributor Author

@mergify backport branch-3.3

Copy link
Contributor

mergify bot commented Sep 14, 2024

backport branch-3.3

✅ Backports have been created

mergify bot pushed a commit that referenced this pull request Sep 14, 2024
…perator (#47982)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
(cherry picked from commit ca962a5)
silverbullet233 added a commit to silverbullet233/starrocks that referenced this pull request Sep 18, 2024
…perator (StarRocks#47982)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
(cherry picked from commit ca962a5)
silverbullet233 added a commit that referenced this pull request Sep 18, 2024
…perator (#47982)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
(cherry picked from commit ca962a5)
silverbullet233 added a commit that referenced this pull request Sep 19, 2024
…perator (#47982)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
(cherry picked from commit ca962a5)
wanpengfei-git pushed a commit that referenced this pull request Sep 19, 2024
…perator (backport #47982) (#51037)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
Co-authored-by: eyes_on_me <nopainnofame@sina.com>
Co-authored-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants