Skip to content

Merge unfull flush tasks#241

Merged
liunyl merged 2 commits intomainfrom
merge_flush_task
Nov 26, 2025
Merged

Merge unfull flush tasks#241
liunyl merged 2 commits intomainfrom
merge_flush_task

Conversation

@liunyl
Copy link
Contributor

@liunyl liunyl commented Nov 26, 2025

Here are some reminders before you submit the pull request

  • Add tests for the change
  • Document changes
  • Reference the link of issue using fixes eloqdb/tx_service#issue_id
  • Reference the link of RFC if exists
  • Pass ./mtr --suite=mono_main,mono_multi,mono_basic

Summary by CodeRabbit

  • Refactor
    • Improved flush/task queue: FIFO ordering, task coalescing to reduce work, and backpressure with wait/notify — improves throughput and stability.
  • New Features
    • Added a shard cleanup request to reclaim memory proactively.
    • Track ongoing data-sync operations to better coordinate checkpoints (increment/decrement during sync lifetime).
  • Bug Fixes
    • Ensure waiting threads are properly notified after dequeue/merge to avoid stalls.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Nov 26, 2025

Walkthrough

Refactors flush-task queuing to use a deque with merge-before-enqueue and FIFO dequeue, adds FlushDataTask::MergeFrom, introduces DataSyncStatus lifetime counters that update Checkpointer ongoing-data-sync count, adds ShardCleanCc implementation, and removes waiting-checkpoint accessors and some old ShardCleanCc declarations.

Changes

Cohort / File(s) Summary
LocalCcShards data structure
include/cc/local_cc_shards.h
Switched pending_flush_work_ from std::vector<std::unique_ptr<FlushDataTask>> to std::deque<std::unique_ptr<FlushDataTask>>; added #include <deque>
Flush task merge API
include/data_sync_task.h
Added bool MergeFrom(std::unique_ptr<FlushDataTask> &&other) to FlushDataTask (deadlock-safe dual-mutex locking, size-limit validation, move-merge by table, update pending size)
Enqueue / dequeue & sync behavior
src/cc/local_cc_shards.cpp
Reworked enqueue with unique_lock, attempt MergeFrom into last task, backpressure wait on full queue, FIFO dequeue via pop_front, notify/wake semantics adjusted
DataSyncStatus lifetime counter
include/data_sync_task.h, src/data_sync_task.cpp
Declared/defined DataSyncStatus ctor/destructor that call Checkpointer::IncrementOngoingDataSyncCnt/DecrementOngoingDataSyncCnt to track ongoing data-sync operations
Checkpointer API
include/checkpointer.h
Added IncrementOngoingDataSyncCnt(), DecrementOngoingDataSyncCnt(), IsOngoingDataSync() const and private std::atomic<uint64_t> ongoing_data_sync_cnt_{0}
Shard clean request (new impl)
include/cc/cc_req_misc.h, src/cc/cc_req_misc.cpp
Added txservice::ShardCleanCc declaration and ShardCleanCc::Execute(CcShard&) implementation to perform shard-heap cleaning, possibly re-enqueue and trigger checkpoint notification
Removed old ShardCleanCc and waiting-ckpt accessors
include/cc/cc_request.h, include/cc/cc_shard.h, src/cc/cc_shard.cpp, include/cc/range_cc_map.h
Deleted previous ShardCleanCc declaration, removed CcShard::SetWaitingCkpt / IsWaitingCkpt declarations/definitions, and removed explicit SetWaitingCkpt(false) calls in range post-commit paths
Minor includes / headers
src/data_sync_task.cpp
Added DataSyncStatus ctor/destructor and removed now-unused includes (string_view, unordered_map, unordered_set)

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant LocalCcShards
    participant PendingQueue as pending_flush_work_
    participant LastTask as LastTask
    participant FlushWorker

    rect rgb(240,255,240)
    note right of LocalCcShards: Enqueue (new flow)
    Caller->>LocalCcShards: AddFlushTaskEntry(newTask)
    LocalCcShards->>LocalCcShards: lock(queue_mutex)
    alt queue non-empty
        LocalCcShards->>LastTask: LastTask.MergeFrom(newTask)
        alt merge succeeds
            LastTask-->>LocalCcShards: merged (no enqueue)
            LocalCcShards->>Caller: return
        else merge fails
            LocalCcShards->>LocalCcShards: wait if queue full (cond_var)
            LocalCcShards->>PendingQueue: push_back(newTask)
            LocalCcShards->>FlushWorker: notify_one
        end
    else queue empty
        LocalCcShards->>PendingQueue: push_back(newTask)
        LocalCcShards->>FlushWorker: notify_one
    end
    end

    rect rgb(255,245,240)
    note right of FlushWorker: Dequeue (FIFO)
    FlushWorker->>PendingQueue: pop_front()
    FlushWorker->>FlushWorker: process task
    FlushWorker->>LocalCcShards: notify_all (wake enqueuers)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Areas needing focused review:
    • Dual-mutex locking order and deadlock safety in FlushDataTask::MergeFrom
    • Correctness of moved entries and pending_flush_size_ accounting
    • Backpressure wait/notify correctness to avoid missed wakeups or deadlocks
    • Interaction of DataSyncStatus ctor/dtor with Checkpointer counters and lifetime semantics
    • Consequences of removing SetWaitingCkpt/IsWaitingCkpt on call sites and checkpoint logic

Possibly related PRs

Poem

🐇 I nudged a deque into the burrowed queue,
Merged tiny tasks until they felt like new,
Front-first I nibble oldest crumbs with care,
Backpressure hums — I pause, then clear the air,
A rabbit's hop of tidy flushes — hooray! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description only contains an unchecked reminder checklist with no actual description of changes, rationale, testing details, or issue references. Add a substantive description explaining what the PR changes, why it's needed, testing approach, and relevant issue/RFC links before the checklist.
Docstring Coverage ⚠️ Warning Docstring coverage is 5.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Merge unfull flush tasks' directly describes the main implementation change: adding logic to merge incomplete flush tasks together rather than enqueueing them separately.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch merge_flush_task

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/cc/local_cc_shards.cpp (3)

5296-5324: Merge + backpressure logic looks correct; consider minor cleanups and reuse

The merge-then-enqueue pattern with capacity check is sound: worker_lk correctly guards pending_flush_work_, merge is attempted only against the tail (newest) task, and backpressure via while (size >= worker_num_) cv.wait(...) will throttle producers safely until a worker pops work and FlushData notifies.

Two small follow-ups:

  • The merge/backpressure block in AddFlushTaskEntry and FlushCurrentFlushBuffer is duplicated almost verbatim; factoring it into a small helper (e.g. EnqueueFlushDataTask(std::unique_ptr<FlushDataTask>)) would reduce risk of future drift.
  • This relies on flush_data_worker_ctx_.worker_num_ > 0; if that’s not already enforced at construction time, it’s worth asserting or guarding once to avoid a theoretical infinite-wait scenario.

If you’d like to double‑check the worker count assumption across the codebase, you can run a quick search for flush_data_worker_ctx_ construction and ensure worker_num_ is always initialized to a positive value.

Also applies to: 5329-5357


5360-5369: FIFO dequeue change is consistent; notify_all may be stronger than necessary

Switching FlushData to take from front()/pop_front() is consistent with the new deque backing and gives proper FIFO semantics; notifying waiters after the pop is also correct so producers blocked on a full queue can proceed.

For performance, you could consider notify_one() instead of notify_all() here, since only one producer (or worker) actually needs to wake when a single slot is freed, but that’s an optimization rather than a correctness issue.


5564-5587: Unreachable merge branch in wait_for predicate can be removed or rethought

Inside the FlushDataWorker wait_for predicate, the leading:

if (!pending_flush_work_.empty() ||
    flush_data_worker_ctx_.status_ == WorkerStatus::Terminated) {
    return true;
}

guarantees that whenever the later block runs, pending_flush_work_ is empty. The inner:

if (!pending_flush_work_.empty()) {
    auto &last_task = pending_flush_work_.back();
    if (last_task->MergeFrom(std::move(flush_data_task))) {
        return true;
    }
}

is therefore dead code and will never execute. Since the comment below already assumes the queue is empty, this looks like leftover logic from copy‑pasting the enqueue helper.

Either remove that inner if entirely, or, if you intended to support merging here as well, relax the early if (!pending_flush_work_.empty()) return true; and move the merge logic into a shared helper to avoid subtle divergence.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7e38a11 and 7ca6922.

📒 Files selected for processing (3)
  • include/cc/local_cc_shards.h (2 hunks)
  • include/data_sync_task.h (1 hunks)
  • src/cc/local_cc_shards.cpp (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • include/cc/local_cc_shards.h
  • src/cc/local_cc_shards.cpp
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • src/cc/local_cc_shards.cpp
🔇 Additional comments (3)
include/cc/local_cc_shards.h (2)

29-29: LGTM: Include added for deque support.

The include is necessary for the type change of pending_flush_work_ to std::deque.


2364-2364: No contiguous storage assumptions or pointer arithmetic detected — deque change is safe and appropriate.

Verification of pending_flush_work_ usage in src/cc/local_cc_shards.cpp confirms:

  • All operations (empty(), back(), front(), pop_front(), emplace_back(), size()) are standard deque operations
  • No .data() calls or pointer arithmetic requiring contiguous memory
  • The FIFO pattern with front()/pop_front() is exactly what deque optimizes for (O(1) operations)
  • Merge-before-enqueue pattern using back() also benefits from deque (O(1) access and append)

The switch from std::vector to std::deque is not only safe but the ideal container for this queue pattern.

include/data_sync_task.h (1)

313-359: LGTM: Well-implemented merge function with proper synchronization.

The MergeFrom implementation correctly:

  • Uses address-ordered locking to prevent deadlock
  • Validates size constraints before merging
  • Employs move semantics for efficiency
  • Clears the source task after merge

The merge logic properly groups entries by table name and updates the pending size atomically under lock.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
include/cc/cc_req_misc.h (1)

1080-1093: New ShardCleanCc request type looks correct.

The struct properly inherits from CcRequestBase and provides the Execute override. Deleting the move constructor prevents accidental moves of queued request objects.

Minor note: free_count_ is initialized to 0 both in the constructor initializer list (line 1083) and the member declaration (line 1092). The member declaration initialization is redundant but harmless.

Consider removing the redundant inline initialization:

 private:
-    size_t free_count_{0};
+    size_t free_count_;
 };
src/data_sync_task.cpp (1)

41-54: Add null-safety checks for defensive programming, but this is not a critical issue.

The DataSyncStatus RAII pattern is sound. However, the chain Sharder::Instance().GetCheckpointer() theoretically could fail if local_shards_ or tx_service_ are nullptr.

In practice, this is highly unlikely because DataSyncStatus is constructed only after system initialization in established code paths (Checkpointer, RemoteCC, TxIndex, SnapshotManager, LocalCC). All six construction sites confirm they execute in fully initialized contexts.

The suggestion to add null checks is valid defensive programming but represents an optional hardening improvement rather than a critical fix, as the codebase pattern elsewhere does not include such checks for guaranteed-initialized components.

include/checkpointer.h (1)

131-145: Defend against accidental counter underflow in ongoing_data_sync_cnt_.

The counter API is fine, but DecrementOngoingDataSyncCnt() will underflow silently if it’s ever called when the count is already 0, which would make IsOngoingDataSync() permanently return true. Adding a simple debug check on the previous value would catch misuse early:

void DecrementOngoingDataSyncCnt()
{
-    ongoing_data_sync_cnt_.fetch_sub(1, std::memory_order_relaxed);
+    auto prev = ongoing_data_sync_cnt_.fetch_sub(1, std::memory_order_relaxed);
+    assert(prev > 0);
}

This keeps release behavior unchanged while surfacing mismatched increments/decrements in testing.

Also applies to: 165-165

src/cc/local_cc_shards.cpp (2)

5316-5337: Deduplicate merge/backpressure logic between FlushCurrentFlushBuffer and AddFlushTaskEntry.

FlushCurrentFlushBuffer() reimplements the same merge-then-wait-for-capacity pattern as AddFlushTaskEntry(). This is correct but increases the risk of the two paths diverging over time.

Consider extracting a small helper like EnqueueFlushDataTaskWithMerge(std::unique_ptr<FlushDataTask>) that encapsulates:

  • Attempt merge into pending_flush_work_.back().
  • Capacity wait on pending_flush_work_.size() >= worker_num_.
  • Final enqueue + cv_.notify_one().

Both callers could then share identical behavior.


5550-5569: Unreachable merge attempt in the 10s “stuck flush” path.

Inside the wait_for predicate, you now try to merge a forced flush_data_task into pending_flush_work_.back() if the queue is non-empty. However, earlier in the same lambda you return immediately when !pending_flush_work_.empty() is true, and the whole predicate runs under flush_worker_lk, so this block is never reached in practice.

You can simplify and clarify this branch by dropping the merge attempt and just enqueuing:

if (flush_data_task != nullptr)
{
-    if (!pending_flush_work_.empty()) {
-        auto &last_task = pending_flush_work_.back();
-        if (last_task->MergeFrom(std::move(flush_data_task))) {
-            return true;
-        }
-    }
-
-    // Add as new task. We just checked that pending_flush_work_ is empty,
+    // Add as new task. pending_flush_work_ is known empty under this predicate.
     pending_flush_work_.emplace_back(std::move(flush_data_task));
     return true;
}

This avoids dead code and keeps the “unstick DDL” behavior easier to reason about.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7ca6922 and 513196c.

📒 Files selected for processing (11)
  • include/cc/cc_req_misc.h (1 hunks)
  • include/cc/cc_request.h (0 hunks)
  • include/cc/cc_shard.h (0 hunks)
  • include/cc/local_cc_shards.h (2 hunks)
  • include/cc/range_cc_map.h (0 hunks)
  • include/checkpointer.h (2 hunks)
  • include/data_sync_task.h (2 hunks)
  • src/cc/cc_req_misc.cpp (1 hunks)
  • src/cc/cc_shard.cpp (0 hunks)
  • src/cc/local_cc_shards.cpp (3 hunks)
  • src/data_sync_task.cpp (1 hunks)
💤 Files with no reviewable changes (4)
  • src/cc/cc_shard.cpp
  • include/cc/range_cc_map.h
  • include/cc/cc_request.h
  • include/cc/cc_shard.h
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • include/cc/cc_req_misc.h
  • src/data_sync_task.cpp
  • include/checkpointer.h
  • src/cc/local_cc_shards.cpp
  • src/cc/cc_req_misc.cpp
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • include/cc/cc_req_misc.h
  • include/cc/local_cc_shards.h
  • src/data_sync_task.cpp
  • include/checkpointer.h
  • src/cc/local_cc_shards.cpp
  • src/cc/cc_req_misc.cpp
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • include/cc/cc_req_misc.h
  • include/checkpointer.h
  • src/cc/cc_req_misc.cpp
🧬 Code graph analysis (2)
include/data_sync_task.h (1)
src/data_sync_task.cpp (2)
  • DataSyncStatus (41-49)
  • DataSyncStatus (51-54)
src/cc/cc_req_misc.cpp (3)
include/cc/cc_req_misc.h (1)
  • Execute (857-869)
include/cc/cc_request.h (15)
  • Execute (133-264)
  • Execute (2482-2544)
  • Execute (3059-3101)
  • Execute (3510-3559)
  • Execute (3784-3825)
  • Execute (4116-4157)
  • Execute (4402-4438)
  • Execute (4473-4497)
  • Execute (4536-4548)
  • Execute (4568-4579)
  • Execute (4651-4786)
  • Execute (5298-5330)
  • Execute (5383-5389)
  • Execute (5552-5563)
  • Execute (5599-5629)
include/cc/reader_writer_cntl.h (1)
  • ccs (49-51)
🔇 Additional comments (6)
include/data_sync_task.h (2)

52-54: Constructor/destructor separation looks correct.

The constructor and destructor declarations are properly separated, with implementations in src/data_sync_task.cpp that manage the ongoing data-sync counter via IncrementOngoingDataSyncCnt() and DecrementOngoingDataSyncCnt(). This RAII pattern cleanly tracks data-sync lifecycle.


317-356: Well-designed merge operation with proper deadlock prevention.

The address-based mutex ordering (lines 320-325) correctly prevents deadlock when two tasks try to merge with each other. The size check before merging provides backpressure. Using std::make_move_iterator for efficient entry transfer is appropriate.

One minor note: the method signature accepts std::unique_ptr<FlushDataTask>&& but doesn't transfer ownership—it just empties the source. This is fine as the caller retains control, but a comment clarifying this behavior could help future maintainers.

include/cc/local_cc_shards.h (2)

29-29: Appropriate include for deque usage.


2354-2354: Good choice of container for FIFO queue semantics.

Changing from std::vector to std::deque enables efficient pop_front() operations for processing flush tasks in order. This supports the merge-before-enqueue pattern where tasks are coalesced at the back and dequeued from the front.

src/cc/cc_req_misc.cpp (1)

1376-1447: Well-structured shard cleaning implementation.

The Execute method properly handles memory pressure with a clear state machine:

  1. If heap is full and needs cleaning → clean, then either yield for more cleaning or abort waiting requests
  2. If heap is not full → dequeue waiting requests and finish

The integration with IsOngoingDataSync() (line 1411) ensures checkpointing isn't triggered during active data-sync operations, which aligns with the PR's lifecycle counter additions.

The return value semantics are correct: true means the request is finished (recycle it), false means re-enqueued or continuing.

src/cc/local_cc_shards.cpp (1)

5345-5352: FIFO dequeue and notify_all correctly complement the new producer backpressure.

Switching to front()/pop_front() makes the flush queue FIFO, which matches the “merge-into-back, consume-from-front” semantics and improves fairness. The added cv_.notify_all() after popping ensures producers blocked on the capacity check are reliably woken when space becomes available.

This change looks sound with the existing locking discipline around flush_data_worker_ctx_.mux_.

Comment on lines +5283 to +5304
std::unique_lock<std::mutex> worker_lk(flush_data_worker_ctx_.mux_);

// Try to merge with the last task if queue is not empty
if (!pending_flush_work_.empty())
{
auto &last_task = pending_flush_work_.back();
if (last_task->MergeFrom(std::move(flush_data_task)))
{
// Merge successful, task was merged into last_task
flush_data_worker_ctx_.cv_.notify_one();
return;
}
}

// Could not merge, wait if queue is full
while (pending_flush_work_.size() >=
static_cast<size_t>(flush_data_worker_ctx_.worker_num_))
{
flush_data_worker_ctx_.cv_.wait(worker_lk);
}

// Add as new task
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Backpressure loop may hang if producers run after flush workers terminate.

The merge + capacity check is reasonable, but the while (pending_flush_work_.size() >= worker_num_) loop has no awareness of flush_data_worker_ctx_.status_. If Terminate() can race with producers still calling AddFlushTaskEntry, this loop could wait forever because no worker will ever pop and signal again.

Consider guarding the wait with the worker status, e.g.:

while (pending_flush_work_.size() >=
       static_cast<size_t>(flush_data_worker_ctx_.worker_num_) &&
       flush_data_worker_ctx_.status_ == WorkerStatus::Active) {
    flush_data_worker_ctx_.cv_.wait(worker_lk);
}

// Optionally bail out early if status_ != Active.

This keeps backpressure behavior while avoiding shutdown-time hangs.

🤖 Prompt for AI Agents
In src/cc/local_cc_shards.cpp around lines 5283 to 5304, the backpressure wait
loop can hang during shutdown because it waits solely on
pending_flush_work_.size() vs worker_num_ and doesn't observe
flush_data_worker_ctx_.status_; update the loop to also check that
flush_data_worker_ctx_.status_ == WorkerStatus::Active before waiting and after
waking so producers can stop waiting if workers are terminating, and if status_
!= Active bail out early (return or skip adding the task) to avoid deadlock.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/cc/local_cc_shards.cpp (1)

5281-5337: Backpressure wait in flush enqueue still ignores worker status; possible shutdown hang

Both AddFlushTaskEntry and FlushCurrentFlushBuffer now do:

  • Try to MergeFrom into the last pending task.
  • Otherwise, block in a loop while pending_flush_work_.size() >= worker_num_, waiting only on cv_.

These loops still have no awareness of flush_data_worker_ctx_.status_. If Terminate() can race with producers still calling these methods (e.g., a data‑sync worker enqueues while flush workers are being torn down), a producer can end up waiting forever once all flush workers have exited and no one will ever pop / signal again.

Please double‑check lifecycle guarantees around Terminate() and enqueue call sites. If there is any path where enqueue can happen after flush_data_worker_ctx_.Terminate(), this needs guarding (e.g., conditionally breaking the loop when status_ != Active and failing/short‑circuiting the corresponding DataSyncTask instead of blocking).

🧹 Nitpick comments (2)
include/cc/cc_req_misc.h (1)

1079-1092: LGTM! ShardCleanCc struct is well-structured.

The new ShardCleanCc request type follows the established pattern for CcRequestBase-derived classes. The explicit deletion of the move constructor is appropriate for request objects.

Optional refinements:

  1. Remove redundant initialization: free_count_ is initialized both in the constructor's initializer list (line 1082) and via default member initializer (line 1091). You can remove the constructor initializer list since the default member initializer already sets it to 0.

  2. Consider explicitly deleting the copy constructor: Since the move constructor is deleted and request objects typically shouldn't be copied, consider adding ShardCleanCc(const ShardCleanCc &) = delete; for consistency and clarity.

Apply this diff if you'd like to address these optional refinements:

-    ShardCleanCc() : free_count_(0)
+    ShardCleanCc() = default;
+
+    ShardCleanCc(const ShardCleanCc &) = delete;
+    ShardCleanCc &operator=(const ShardCleanCc &) = delete;
src/cc/local_cc_shards.cpp (1)

5548-5567: Unreachable merge branch in FlushDataWorker’s stuck-buffer path

Inside the wait_for predicate in FlushDataWorker, the MoveFlushData(true) branch is only reached when both:

  • pending_flush_work_.empty() is true, and
  • flush_data_worker_ctx_.status_ == WorkerStatus::Active.

Because flush_worker_lk is held for the entire predicate, no other thread can modify pending_flush_work_ between that check and the later if (!pending_flush_work_.empty()) { ... MergeFrom ... }. That condition is therefore always false in this context, so the MergeFrom attempt is effectively dead code here.

You can simplify this block to just enqueue the forced flush_data_task directly, which makes intent clearer and avoids misleading comments:

-                        if (flush_data_task != nullptr)
-                        {
-                            // Try to merge with the last task if queue is not
-                            // empty Note: flush_worker_lk is already held here
-                            // (inside condition variable predicate)
-                            if (!pending_flush_work_.empty())
-                            {
-                                auto &last_task = pending_flush_work_.back();
-                                if (last_task->MergeFrom(
-                                        std::move(flush_data_task)))
-                                {
-                                    // Merge successful, task was merged into
-                                    // last_task
-                                    return true;
-                                }
-                            }
-
-                            // Add as new task. We just checked that
-                            // pending_flush_work_ is empty,
-                            pending_flush_work_.emplace_back(
-                                std::move(flush_data_task));
-                            return true;
-                        }
+                        if (flush_data_task != nullptr)
+                        {
+                            // Queue is known to be empty here; just enqueue.
+                            pending_flush_work_.emplace_back(
+                                std::move(flush_data_task));
+                            return true;
+                        }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 513196c and 686f586.

📒 Files selected for processing (11)
  • include/cc/cc_req_misc.h (1 hunks)
  • include/cc/cc_request.h (0 hunks)
  • include/cc/cc_shard.h (0 hunks)
  • include/cc/local_cc_shards.h (2 hunks)
  • include/cc/range_cc_map.h (0 hunks)
  • include/checkpointer.h (2 hunks)
  • include/data_sync_task.h (2 hunks)
  • src/cc/cc_req_misc.cpp (1 hunks)
  • src/cc/cc_shard.cpp (0 hunks)
  • src/cc/local_cc_shards.cpp (3 hunks)
  • src/data_sync_task.cpp (1 hunks)
💤 Files with no reviewable changes (4)
  • include/cc/range_cc_map.h
  • src/cc/cc_shard.cpp
  • include/cc/cc_request.h
  • include/cc/cc_shard.h
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/data_sync_task.cpp
  • src/cc/cc_req_misc.cpp
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • include/cc/cc_req_misc.h
  • src/cc/local_cc_shards.cpp
  • include/cc/local_cc_shards.h
  • include/checkpointer.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • include/cc/cc_req_misc.h
  • src/cc/local_cc_shards.cpp
  • include/cc/local_cc_shards.h
  • include/checkpointer.h
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • include/cc/cc_req_misc.h
  • include/checkpointer.h
🧬 Code graph analysis (1)
include/data_sync_task.h (1)
src/data_sync_task.cpp (2)
  • DataSyncStatus (41-49)
  • DataSyncStatus (51-54)
🔇 Additional comments (4)
include/checkpointer.h (1)

131-165: LGTM! Clean atomic counter implementation for tracking ongoing data sync operations.

The atomic counter with relaxed memory ordering is appropriate for tracking concurrent data sync operations. The increment/decrement pattern is properly paired through DataSyncStatus constructor/destructor lifecycle management.

include/data_sync_task.h (1)

52-54: LGTM! Constructor/destructor now properly manage checkpointer counter lifecycle.

Moving to out-of-line definitions enables the DataSyncStatus lifecycle to increment/decrement the checkpointer's ongoing data sync counter via RAII pattern.

include/cc/local_cc_shards.h (1)

29-29: LGTM! Deque is the appropriate container for FIFO flush work queue.

Changing from std::vector to std::deque provides efficient O(1) operations for both enqueue (push_back) and dequeue (pop_front), which aligns with the FIFO semantics introduced by the flush task merge-before-enqueue pattern.

Also applies to: 2354-2354

src/cc/local_cc_shards.cpp (1)

5343-5350: FIFO dequeue and capacity notification look correct

Switching to front()/pop_front() and issuing cv_.notify_all() after popping aligns with the new deque‑based FIFO semantics and correctly wakes threads blocked on the capacity wait in the enqueue paths.

Comment on lines +317 to +356
bool MergeFrom(std::unique_ptr<FlushDataTask> &&other)
{
// Lock both mutexes in consistent order (by address) to avoid deadlock
bthread::Mutex *m1 = &flush_task_entries_mux_;
bthread::Mutex *m2 = &other->flush_task_entries_mux_;
if (m1 > m2)
{
std::swap(m1, m2);
}

std::lock_guard<bthread::Mutex> lk1(*m1);
std::lock_guard<bthread::Mutex> lk2(*m2);

// Check if merge would exceed max size
if (pending_flush_size_ + other->pending_flush_size_ >
max_pending_flush_size_)
{
return false;
}

// Merge entries by table name
for (auto &[table_name, entries] : other->flush_task_entries_)
{
auto table_flush_entries_it =
flush_task_entries_.try_emplace(table_name);
auto &target_entries = table_flush_entries_it.first->second;
target_entries.insert(target_entries.end(),
std::make_move_iterator(entries.begin()),
std::make_move_iterator(entries.end()));
}

// Update size
pending_flush_size_ += other->pending_flush_size_;

// Clear the other task
other->pending_flush_size_ = 0;
other->flush_task_entries_.clear();

return true;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add defensive check against self-merge to prevent potential deadlock.

The address-ordered locking pattern correctly avoids deadlock when merging two different FlushDataTask objects. However, if this == other.get(), both mutex pointers would be identical, causing a double-lock deadlock when creating lk1 and lk2.

While the type signature (unique_ptr<FlushDataTask> &&) makes this unlikely, add a defensive check for robustness:

 bool MergeFrom(std::unique_ptr<FlushDataTask> &&other)
 {
+    // Prevent self-merge
+    if (this == other.get())
+    {
+        return false;
+    }
+
     // Lock both mutexes in consistent order (by address) to avoid deadlock
     bthread::Mutex *m1 = &flush_task_entries_mux_;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool MergeFrom(std::unique_ptr<FlushDataTask> &&other)
{
// Lock both mutexes in consistent order (by address) to avoid deadlock
bthread::Mutex *m1 = &flush_task_entries_mux_;
bthread::Mutex *m2 = &other->flush_task_entries_mux_;
if (m1 > m2)
{
std::swap(m1, m2);
}
std::lock_guard<bthread::Mutex> lk1(*m1);
std::lock_guard<bthread::Mutex> lk2(*m2);
// Check if merge would exceed max size
if (pending_flush_size_ + other->pending_flush_size_ >
max_pending_flush_size_)
{
return false;
}
// Merge entries by table name
for (auto &[table_name, entries] : other->flush_task_entries_)
{
auto table_flush_entries_it =
flush_task_entries_.try_emplace(table_name);
auto &target_entries = table_flush_entries_it.first->second;
target_entries.insert(target_entries.end(),
std::make_move_iterator(entries.begin()),
std::make_move_iterator(entries.end()));
}
// Update size
pending_flush_size_ += other->pending_flush_size_;
// Clear the other task
other->pending_flush_size_ = 0;
other->flush_task_entries_.clear();
return true;
}
bool MergeFrom(std::unique_ptr<FlushDataTask> &&other)
{
// Prevent self-merge
if (this == other.get())
{
return false;
}
// Lock both mutexes in consistent order (by address) to avoid deadlock
bthread::Mutex *m1 = &flush_task_entries_mux_;
bthread::Mutex *m2 = &other->flush_task_entries_mux_;
if (m1 > m2)
{
std::swap(m1, m2);
}
std::lock_guard<bthread::Mutex> lk1(*m1);
std::lock_guard<bthread::Mutex> lk2(*m2);
// Check if merge would exceed max size
if (pending_flush_size_ + other->pending_flush_size_ >
max_pending_flush_size_)
{
return false;
}
// Merge entries by table name
for (auto &[table_name, entries] : other->flush_task_entries_)
{
auto table_flush_entries_it =
flush_task_entries_.try_emplace(table_name);
auto &target_entries = table_flush_entries_it.first->second;
target_entries.insert(target_entries.end(),
std::make_move_iterator(entries.begin()),
std::make_move_iterator(entries.end()));
}
// Update size
pending_flush_size_ += other->pending_flush_size_;
// Clear the other task
other->pending_flush_size_ = 0;
other->flush_task_entries_.clear();
return true;
}
🤖 Prompt for AI Agents
In include/data_sync_task.h around lines 317 to 356, add a defensive self-merge
check at the top of MergeFrom to detect if this == other.get() and return true
immediately (no-op) before computing mutex pointers or locking; this prevents
attempting to lock the same mutex twice and avoids the potential deadlock when a
caller accidentally passes the same FlushDataTask in the unique_ptr.

@liunyl liunyl merged commit 84ac1ef into main Nov 26, 2025
4 checks passed
@liunyl liunyl deleted the merge_flush_task branch November 26, 2025 09:41
@coderabbitai coderabbitai bot mentioned this pull request Dec 2, 2025
5 tasks
@coderabbitai coderabbitai bot mentioned this pull request Dec 25, 2025
5 tasks
@coderabbitai coderabbitai bot mentioned this pull request Feb 2, 2026
5 tasks
@coderabbitai coderabbitai bot mentioned this pull request Mar 3, 2026
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants