Skip to content

Update datasyncscan execution#324

Merged
yi-xmu merged 5 commits intomainfrom
update_datasyncscan_execution
Dec 31, 2025
Merged

Update datasyncscan execution#324
yi-xmu merged 5 commits intomainfrom
update_datasyncscan_execution

Conversation

@yi-xmu
Copy link
Collaborator

@yi-xmu yi-xmu commented Dec 25, 2025

Here are some reminders before you submit the pull request

  • Add tests for the change
  • Document changes
  • Reference the link of issue using fixes eloqdb/tx_service#issue_id
  • Reference the link of RFC if exists
  • Pass ./mtr --suite=mono_main,mono_multi,mono_basic

Summary by CodeRabbit

  • Refactor

    • Centralized slice coordination for range scans, improving cross-core batching, lifecycle handling, and readiness semantics.
    • More robust slice pin/unpin and end-of-batch handling with deterministic batch boundaries and pause/resume alignment.
    • New observability/control APIs for current slice, slice keys, batch/end state, unfinished-core tracking, and advancing/unpinning slices.
  • Chores

    • Reduced per-iteration parallel dispatch: scan work now targets a single randomized core to lower concurrent load and simplify scheduling.

✏️ Tip: You can customize this high-level summary in your review settings.

@yi-xmu yi-xmu self-assigned this Dec 25, 2025
@coderabbitai
Copy link

coderabbitai bot commented Dec 25, 2025

Walkthrough

Adds a nested SliceCoordinator to RangePartitionDataSyncScanCc, replaces per-core iterator state with per-core slice indices and coordinator-driven pinning/batching, rewrites template_cc_map scan/batching to use the coordinator, and changes several dispatch paths to enqueue scans to a single randomly selected core.

Changes

Cohort / File(s) Summary
Slice coordinator & request state
tx_service/include/cc/cc_request.h
Adds private SliceCoordinator, replaces per-core curr_slice_it_ with curr_slice_index_, changes slices_to_scan_ to std::vector<std::pair<TxKey,bool>>, adds many coordinator-driven APIs (CurrentSlice, CurrentSliceKey, MoveToNextSlice, TheBatchEnd, IsSlicePinned, SetUnfinishedCoreCnt, UnpinSlices, UpdateMinPausedSlice overloads, IsLastBatch, FixCurrentSliceIndex) and updates ctor/Reset/Finish/Abort flows to manage coordinator lifecycle.
Cross-core scan & batching logic
tx_service/include/cc/template_cc_map.h
Reworks range-scan and batching loops to use SliceCoordinator for pinning, readiness, start/end key derivation, batch-end detection, and coordinated MoveNextSlice/unpin semantics; centralizes finish/guardrail logic and replaces many per-core drain/pause checks with coordinator-driven behavior.
Dispatch target change (single random core)
tx_service/src/cc/local_cc_shards.cpp, tx_service/src/sk_generator.cpp
Replaces per-iteration enqueue-to-all-cores with a single low-priority enqueue to one randomly selected core (core_rand % n_cores) for DataSyncForRangePartition / delta-size scans and ScanAndEncodeIndex; preserves Wait/error paths but reduces immediate multi-core dispatch.

Sequence Diagram(s)

sequenceDiagram
    actor ScanLoop as ScanLoop
    participant SliceCoord as SliceCoordinator
    participant Store as Store/RangeState
    participant CoreSelector as CoreSelector
    participant CoreWorker as CoreWorker

    rect rgb(235,245,255)
      Note over ScanLoop,SliceCoord: Initialize coordinator with slices (is_continuous flag)
      ScanLoop->>SliceCoord: Prepare(slices_to_scan_, is_continuous)
      SliceCoord->>Store: register start indices / pinned=false
    end

    rect rgb(235,255,235)
      loop per batch iteration
        ScanLoop->>SliceCoord: IsReadyForScan(core_id)?
        alt ready
          SliceCoord-->>ScanLoop: (StartKey, EndKey) / pinned slice
          ScanLoop->>CoreSelector: choose random core (core_rand % n_cores)
          CoreSelector-->>CoreWorker: selected core id
          ScanLoop->>CoreWorker: Enqueue scan(StartKey, EndKey)
          CoreWorker-->>ScanLoop: complete / Wait
        else not ready
          SliceCoord-->>ScanLoop: not ready
        end
        ScanLoop->>SliceCoord: TheBatchEnd(core_id)?
        alt batch end
          SliceCoord-->>ScanLoop: true
          ScanLoop->>SliceCoord: MoveToNextSlice(core_id)
          SliceCoord->>Store: advance index / update pinned state
        end
      end
    end

    rect rgb(250,235,255)
      ScanLoop->>SliceCoord: IsLastBatch()?
      alt last batch
        ScanLoop->>SliceCoord: UnpinSlices()
        SliceCoord->>Store: clear pinned slices
      end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • liunyl

Poem

🐰 I hop through slices, pinning one by one,
Batching tiny carrots until the scan is done.
One luckly core runs fast, the others wait their turn,
When final batch is finished, I unpin and I learn. ✨

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description only contains an unchecked submission checklist with no actual implementation details, rationale, or context explaining what changes were made, why they were necessary, or what issues they address. Add a comprehensive description explaining the changes (SliceCoordinator introduction, coordination refactoring, dispatch strategy modifications), the rationale for these changes, and reference related issues or RFC links.
Docstring Coverage ⚠️ Warning Docstring coverage is 6.25% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title 'Update datasyncscan execution' is vague and generic, using non-descriptive language that doesn't convey the specific nature of the substantial changes (SliceCoordinator introduction, coordination model refactor, dispatch strategy changes). Provide a more specific title that describes the main change, such as 'Introduce SliceCoordinator for datasyncscan slice management' or 'Refactor datasyncscan execution with centralized slice coordination'.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch update_datasyncscan_execution

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yi-xmu yi-xmu linked an issue Dec 25, 2025 that may be closed by this pull request
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tx_service/include/cc/cc_request.h (1)

4179-4210: unfinished_cnt_ = 1 initialization creates a race condition with early error paths

Reset() hard-sets unfinished_cnt_ = 1, but the subsequent call to SetUnfinishedCoreCnt(core_cnt) is not guaranteed to execute before SetError() in Execute(). At lines 5200–5206 and 5223–5226 in template_cc_map.h, early error paths call SetError() before line 5229 where SetUnfinishedCoreCnt() is called. This causes:

  1. unfinished_cnt_ starts at 1 (after Reset())
  2. SetError() decrements it to 0 and immediately calls UnpinSlices() + notifies Wait()
  3. Caller's Wait() returns prematurely while cores have not been dispatched
  4. Data corruption or missing scans if the caller proceeds

This affects both sk_generator.cpp (line 360–365) and local_cc_shards.cpp (line 4045–4047) which rely on Wait() blocking until all cores finish.

Fix: Initialize unfinished_cnt_ to core_cnt_ in Reset() directly, or guard early SetError() calls to ensure SetUnfinishedCoreCnt() is called first.

Also, UnpinSlices() correctly centralizes slice unpinning and ensures exactly-once semantics when the last core finishes or errors.

🧹 Nitpick comments (3)
tx_service/src/cc/local_cc_shards.cpp (1)

4070-4075: Random single-core dispatch relies on RangePartitionDataSyncScanCc to fan out correctly

Switching from per-core enqueue to picking a single shard with butil::fast_rand() % cc_shards_.size() looks intentional given the comment that the “first core” will further dispatch to remaining cores. This should reduce queue pressure while preserving parallelism, assuming RangePartitionDataSyncScanCc indeed owns the cross-core fan-out. Two minor points to double‑check:

  • Ensure the header that declares butil::fast_rand() is included somewhere in the translation unit (directly or transitively), otherwise this will fail to compile.
  • Confirm that RangePartitionDataSyncScanCc is only expected to be enqueued on one shard per iteration now; if it still assumes multi-enqueue from all cores, this change could silently reduce scan coverage/concurrency.

If both assumptions hold, the change looks sound.

tx_service/include/cc/cc_request.h (1)

4041-4092: SliceCoordinator wiring and slices_to_scan_ population look correct

Initializing slice_coordinator_ with export_base_table_item_ and &slices_to_scan_, reserving slices_to_scan_ capacity from old_slices_delta_size, and using GetShallowCopy() to avoid mutating the source map all look sound and consistent with the intended ownership model. curr_slice_index_ is correctly initialised per core.

If you want a tiny readability tweak later, a ranged for over the map would be clearer than std::for_each, but that’s cosmetic only.

tx_service/include/cc/template_cc_map.h (1)

5244-5416: SliceCoordinator initialization and multi‑core dispatch: behavior looks consistent, but a few invariants are worth double‑checking

The new RangePartitionDataSyncScanCc::Execute flow using slice_coordinator_ (pinning via pin_range_slice, PreparedSliceCnt, UpdateBatchEnd, MoveToNextSlice, and per‑core PausePos / IsDrained) is generally coherent and matches the intended design of coordinated, batched slice scanning across cores.

A few points to explicitly verify in tests:

  1. Single‑core ownership of batch preparation

    • if (!req.slice_coordinator_.IsReadyForScan()) { ... SetReadyForScan(); ... Enqueue(...,&req); } assumes this block is only ever run once per batch (on the initial core) before other cores see the request. Given Enqueue is used for cross‑core dispatch, please confirm there is no path where a non‑initiator core can execute Execute on the same req before SetReadyForScan() is set, which would cause duplicate pinning / slice‑state corruption.
  2. PinRangeSlice status handling vs. subsequent use of new_slice_id

    • In pin_range_slice, RangeSliceOpStatus::NotOwner is treated as succ = true after an internal assert("Dead branch"). Downstream, the call site always does:
      • req.slice_coordinator_.StorePinnedSlice(new_slice_id);
      • const TemplateStoreSlice<KeyT> *slice = static_cast<const TemplateStoreSlice<KeyT> *>(new_slice_id.Slice());
    • Please confirm that in the NotOwner case new_slice_id.Slice() is guaranteed to be non‑null and safe to dereference; otherwise this path should probably be treated as a failure (or short‑circuited before accessing the slice) even in non‑debug builds.
  3. find_non_empty_slice invariants when the map/slices are empty

    • When all slices in the batch are empty (or CCM is empty), find_non_empty_slice can legitimately return key_it == slice_end_it, and slice_end_key coming from the last slice. The later assert(key_it != slice_end_it || req.TheBatchEnd(shard_->core_id_)); relies on TheBatchEnd() being true in this case. That seems reasonable, but it’s worth validating with a unit/integration test where a batch contains only empty slices to ensure TheBatchEnd and IsLastBatch stay consistent and that the resume logic using PausePos works as expected.
  4. Pause/resume semantics across batches

    • no_more_data is computed as (key_it == slice_end_it) && req.IsLastBatch(); and PausePos is updated with either the next key or an empty TxKey. Combined with the TheBatchEnd(core_id) condition in the final if:
      • if (is_scan_mem_full || no_more_data || accumulated >= scan_batch_size || req.TheBatchEnd(core_id)) { req.SetFinish(core_id_); ... }
    • Please double‑check that for non‑last batches we always leave enough state (PausePos, current slice index, coordinator state) for the next batch to resume correctly and that IsDrained(core_id) transitions only when the final batch truly has no more slices for that core.
  5. Cross‑core dispatch helper choice

    • Here we dispatch with shard_->Enqueue(shard_->LocalCoreId(), core_id, &req);, while many other multi‑core flows in this file use local_shards_.EnqueueCcRequest(...). If Enqueue is a newer convenience wrapper for cross‑core CC dispatch that preserves all the invariants (UnfinishedCoreCnt, per‑core state ordering), that’s fine; otherwise it might be safer to align with the existing pattern. Please confirm the intended API here.

If all of the above invariants hold under tests (especially with empty‑slice batches and split‑slice scenarios), the refactored coordination logic looks solid and easier to reason about than the previous ad‑hoc per‑core slice tracking.

Also applies to: 5421-5432, 5436-5717, 5719-5724

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4943d80 and b5982a7.

📒 Files selected for processing (4)
  • tx_service/include/cc/cc_request.h
  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/src/sk_generator.cpp
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • tx_service/src/sk_generator.cpp
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/cc_request.h
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • tx_service/src/sk_generator.cpp
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/template_cc_map.h
  • tx_service/include/cc/cc_request.h
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.

Applied to files:

  • tx_service/src/sk_generator.cpp
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/template_cc_map.h
  • tx_service/include/cc/cc_request.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/template_cc_map.h
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.

Applied to files:

  • tx_service/src/cc/local_cc_shards.cpp
🧬 Code graph analysis (1)
tx_service/src/cc/local_cc_shards.cpp (1)
tx_service/include/cc/local_cc_shards.h (1)
  • EnqueueLowPriorityCcRequestToShard (413-419)
🔇 Additional comments (4)
tx_service/src/sk_generator.cpp (1)

358-362: The cross-core dispatch coordination is properly implemented. The handler for RangePartitionDataSyncScanCc (executed on the first core) pins the slices via req.PinSlices() and then explicitly dispatches to remaining cores through a loop in template_cc_map.h (lines 3660-3671), ensuring parallel scanning after the slice is pinned. This pattern is consistently applied across normal scans, range partition data sync scans, and remote scans.

tx_service/src/cc/local_cc_shards.cpp (1)

3836-3839: Extra debug logging for delta-size scan is reasonable

The DLOG adds useful context (range ID, table, last/data sync timestamps) and is guarded by debug logging, so it should not impact release performance. No functional concerns here.

tx_service/include/cc/template_cc_map.h (2)

623-636: Additional DLOG on cc_page last_dirty_commit_ts looks safe

The new debug log when updating cc_page->last_dirty_commit_ts_ is side‑effect free and gated behind DLOG, so it won’t affect release behavior; it should be helpful when diagnosing data‑sync / delta‑size inconsistencies.


7968-8001: New diagnostic logging for inconsistent NeedCkpt vs last_dirty_commit_ts_ is helpful

The added DLOG in ScanSliceDeltaSizeCcForRangePartition when ccp->last_dirty_commit_ts_ <= req.LastDataSyncTs() but cce->NeedCkpt() is still true is a good safeguard: it makes the invariant enforced by the following assert(!cce->NeedCkpt()); observable, without changing behavior in release builds.

No functional issues spotted here.

Comment on lines +4245 to +4255
DLOG(INFO) << "SetFinish on core " << core_id << " unfinished_cnt_ "
<< unfinished_cnt_;
if (export_base_table_item_ && !pause_pos_[core_id].second)
{
// Only not drained on this core, should set the paused key.
UpdateMinPausedSlice(&pause_pos_[core_id].first);
}
else if (!export_base_table_item_)
{
UpdateMinPausedSlice(curr_slice_index_[core_id]);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Search for TxKey definition and operator< semantics
rg -n 'struct TxKey|class TxKey' --type cpp --type h -A 5

Repository: eloqdata/tx_service

Length of output: 2069


🏁 Script executed:

# Search for UpdateMinPausedSlice implementation and usage
rg -n 'UpdateMinPausedSlice' --type cpp --type h -B 2 -A 5

Repository: eloqdata/tx_service

Length of output: 2368


🏁 Script executed:

# Search for SliceCoordinator class definition
rg -n 'struct SliceCoordinator|class SliceCoordinator' --type cpp --type h -A 15

Repository: eloqdata/tx_service

Length of output: 1237


🏁 Script executed:

# Search for FixCurrentSliceIndex implementation
rg -n 'FixCurrentSliceIndex' --type cpp --type h -B 2 -A 10

Repository: eloqdata/tx_service

Length of output: 2119


🏁 Script executed:

# Get the full FixCurrentSliceIndex implementation
sed -n '4307,4330p' tx_service/include/cc/cc_request.h

Repository: eloqdata/tx_service

Length of output: 939


🏁 Script executed:

# Check TxKey default constructor and operator< details
sed -n '289,350p' tx_service/include/tx_key.h

Repository: eloqdata/tx_service

Length of output: 1684


🏁 Script executed:

# Look for operator< in TxKey
rg -n 'operator<' tx_service/include/tx_key.h -A 3

Repository: eloqdata/tx_service

Length of output: 1016


🏁 Script executed:

# Check how min_paused_slice_index_ is used and if it's reset
rg -n 'min_paused_slice_index_' tx_service/include/cc/cc_request.h -B 2 -A 2

Repository: eloqdata/tx_service

Length of output: 2303


🏁 Script executed:

# Check how pause_pos_ is set and its relationship to pinned_slices_
rg -n 'pause_pos_\[' tx_service/include/cc/cc_request.h -B 3 -A 1 | head -60

Repository: eloqdata/tx_service

Length of output: 1308


🏁 Script executed:

# Check how pinned_slices_ is populated
rg -n 'pinned_slices_' tx_service/include/cc/cc_request.h -B 2 -A 2 | head -80

Repository: eloqdata/tx_service

Length of output: 3129


🏁 Script executed:

# Check MoveNextSlice to understand min_paused_slice_index_ flow
sed -n '4524,4560p' tx_service/include/cc/cc_request.h

Repository: eloqdata/tx_service

Length of output: 1415


🏁 Script executed:

# Check how pause_pos_ is initialized and where it comes from
rg -n 'pause_pos_' tx_service/include/cc/cc_request.h | head -20

Repository: eloqdata/tx_service

Length of output: 1141


🏁 Script executed:

# Get initialization of pause_pos_
rg -n 'pause_pos_\(' tx_service/include/cc/cc_request.h -B 2 -A 2

Repository: eloqdata/tx_service

Length of output: 340


🏁 Script executed:

# Check the context around SetFinish to see when UpdateMinPausedSlice is actually called
sed -n '4240,4260p' tx_service/include/cc/cc_request.h

Repository: eloqdata/tx_service

Length of output: 780


🏁 Script executed:

# Check what calls FixCurrentSliceIndex and under what conditions
rg -n 'FixCurrentSliceIndex' tx_service/include/cc/ tx_service/src/cc/ --type cpp --type h -B 5 -A 2

Repository: eloqdata/tx_service

Length of output: 1202


🏁 Script executed:

# Examine the full initialization context around line 4086 to understand pause_pos_ lifecycle
sed -n '4080,4090p' tx_service/include/cc/cc_request.h

Repository: eloqdata/tx_service

Length of output: 556


🏁 Script executed:

# Check if there are any guards or contracts that ensure pause_pos_ is always within pinned_slices_
rg -n 'pause_pos_' tx_service/include/cc/template_cc_map.h -B 5 -A 5 | head -80

Repository: eloqdata/tx_service

Length of output: 2481


🏁 Script executed:

# Look for any documentation or comments about the invariants of pause_pos_ vs pinned_slices_
sed -n '4590,4630p' tx_service/include/cc/cc_request.h

Repository: eloqdata/tx_service

Length of output: 2060


🏁 Script executed:

# Let's check if there are bounds checks we might have missed in the while loop
sed -n '4307,4322p' tx_service/include/cc/cc_request.h

Repository: eloqdata/tx_service

Length of output: 655


🏁 Script executed:

# Check the context where pause_pos_ is set during scanning to understand the invariant
sed -n '5695,5710p' tx_service/include/cc/template_cc_map.h

Repository: eloqdata/tx_service

Length of output: 661


Bounds-checking vulnerability in FixCurrentSliceIndex requires immediate hardening

The overall refactor (per-core curr_slice_index_, FixCurrentSliceIndex, CurrentSlice*, TheBatchEnd, IsSlicePinned, and SliceCoordinator) centralises slice pinning and pause/batch state effectively. However, one critical issue and one semantic concern need attention:

  1. FixCurrentSliceIndex out-of-bounds vulnerability
    The loop walks pinned_slices_ without bounds checking:

    while (curr_slice->EndTxKey() < pause_pos_[core_id].first)
    {
        ++curr_slice_idx;
        curr_slice = slice_coordinator_.pinned_slices_[curr_slice_idx];
    }

    If pause_pos_[core_id].first is set to a key beyond the last pinned slice, this reads past the vector boundary. The paused key is set independently during scanning and there is no local invariant enforcing it falls within the pinned slice range. Add a bounds guard:

    while (curr_slice_idx + 1 < slice_coordinator_.pinned_slices_.size() &&
           curr_slice->EndTxKey() < pause_pos_[core_id].first)
  2. Non-continuous min_paused_slice_index_ semantics
    min_paused_slice_index_ is initialized to 0 but UpdateMinPausedSlice(size_t slice_index) only updates when slice_index < min_paused_slice_index_. This means a call with index 0 on the first batch will not update it. If the intent is to track the minimum paused slice index across cores for the next batch, the initialization logic should be reconsidered—either initialize to std::numeric_limits<size_t>::max() or adjust the update condition.

Comment on lines +359 to +361
// The scan slice request is dispatched to the first core. The first
// core tries to pin the slice if necessary and if succeeds, further
// dispatches the request to remaining cores for parallel scans.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Comment doesn't match the implementation.

The comment states the request is "dispatched to the first core," but Line 362 actually dispatches to a randomly selected core using core_rand % core_cnt.

🔎 Proposed fix
-        // The scan slice request is dispatched to the first core. The first
-        // core tries to pin the slice if necessary and if succeeds, further
-        // dispatches the request to remaining cores for parallel scans.
+        // The scan slice request is dispatched to a randomly selected core.
+        // That core tries to pin the slice if necessary and if succeeds,
+        // further dispatches the request to remaining cores for parallel scans.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// The scan slice request is dispatched to the first core. The first
// core tries to pin the slice if necessary and if succeeds, further
// dispatches the request to remaining cores for parallel scans.
// The scan slice request is dispatched to a randomly selected core.
// That core tries to pin the slice if necessary and if succeeds,
// further dispatches the request to remaining cores for parallel scans.

@yi-xmu yi-xmu requested a review from liunyl December 26, 2025 10:50
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tx_service/include/cc/cc_request.h (1)

4178-4210: Guard unfinished_cnt_ updates to avoid data races

unfinished_cnt_ is read/modified under mux_ in Wait(), SetError(), and SetFinish(), but SetUnfinishedCoreCnt() writes it without taking the same mutex. This creates a data race and formally undefined behaviour if SetUnfinishedCoreCnt() runs concurrently with any of these methods.

Wrap the assignment in the same mutex:

-    void SetUnfinishedCoreCnt(uint16_t core_cnt)
-    {
-        unfinished_cnt_ = core_cnt;
+    void SetUnfinishedCoreCnt(uint16_t core_cnt)
+    {
+        std::lock_guard<std::mutex> lk(mux_);
+        unfinished_cnt_ = core_cnt;
♻️ Duplicate comments (2)
tx_service/include/cc/cc_request.h (2)

4304-4318: Add bounds checks in FixCurrentSliceIndex to prevent out-of-bounds on pinned_slices_

FixCurrentSliceIndex walks slice_coordinator_.pinned_slices_ with an unbounded while:

while (curr_slice->EndTxKey() < pause_pos_[core_id].first)
{
    ++curr_slice_idx;
    curr_slice = slice_coordinator_.pinned_slices_[curr_slice_idx];
}

If pause_pos_[core_id].first is beyond the last pinned slice’s end key, this will read past pinned_slices_ and invoke undefined behaviour. There is no local invariant here that guarantees the pause key is ≤ last pinned slice end key.

Add a size guard and handle the empty-vector case defensively.

Suggested hardening of `FixCurrentSliceIndex`
     void FixCurrentSliceIndex(uint16_t core_id)
     {
         assert(export_base_table_item_);
         if (pause_pos_[core_id].first.KeyPtr() != nullptr)
         {
-            size_t curr_slice_idx = 0;
-            StoreSlice *curr_slice =
-                slice_coordinator_.pinned_slices_[curr_slice_idx];
-            while (curr_slice->EndTxKey() < pause_pos_[core_id].first)
-            {
-                ++curr_slice_idx;
-                curr_slice = slice_coordinator_.pinned_slices_[curr_slice_idx];
-            }
-            curr_slice_index_[core_id] = curr_slice_idx;
+            size_t curr_slice_idx = 0;
+            auto &pinned = slice_coordinator_.pinned_slices_;
+            if (pinned.empty())
+            {
+                // Nothing pinned – nothing to fix; bail out defensively.
+                return;
+            }
+
+            StoreSlice *curr_slice = pinned[curr_slice_idx];
+            while (curr_slice_idx + 1 < pinned.size() &&
+                   curr_slice->EndTxKey() < pause_pos_[core_id].first)
+            {
+                ++curr_slice_idx;
+                curr_slice = pinned[curr_slice_idx];
+            }
+            curr_slice_index_[core_id] = curr_slice_idx;
         }
     }

4386-4407: min_paused_slice_index_ initialization breaks minimum-tracking semantics and isn’t reset across runs

In non-continuous mode, SliceCoordinator does:

if (is_continuous_) {
    min_paused_key_ = TxKey();
} else {
    min_paused_slice_index_ = 0;
}

and Reset() does not reinitialize the union. UpdateMinPausedSlice(size_t slice_index) only updates when slice_index < min_paused_slice_index_. With an initial value of 0, the first and all subsequent calls with valid indices (≥0) will never update the minimum; across Reset() calls the old value is also preserved. This means min_paused_slice_index_ can get stuck at 0 and no longer reflects the true minimum paused slice index across cores/batches, which affects StartKey(), MoveNextSlice() and UpdateBatchEnd() semantics. This is the same issue previously called out and it still exists in this revision.

A more robust pattern is to initialize and reset to a sentinel “no index yet” value (e.g. std::numeric_limits<size_t>::max()) and then relax the logic accordingly.

Suggested fix for non-continuous mode initialization/reset
     struct SliceCoordinator
     {
         static constexpr uint16_t MaxBatchSliceCount = 512;
@@
         SliceCoordinator(bool is_continuous,
                          std::vector<std::pair<TxKey, bool>> *slices_keys)
             : is_continuous_(is_continuous), slices_keys_ptr_(slices_keys)
         {
             pinned_slices_.reserve(MaxBatchSliceCount);
             if (is_continuous_)
             {
-                min_paused_key_ = TxKey();
+                min_paused_key_ = TxKey();
             }
             else
             {
-                min_paused_slice_index_ = 0;
+                min_paused_slice_index_ =
+                    std::numeric_limits<size_t>::max();
             }
             batch_end_slice_index_ = 0;
         }
@@
         void Reset()
         {
             first_slice_id_.Reset();
             pinned_slices_.clear();
             prepared_slice_cnt_ = 0;
             ready_for_scan_.store(false, std::memory_order_relaxed);
             batch_end_slice_index_ = 0;
             is_end_slice_ = false;
+
+            if (is_continuous_)
+            {
+                min_paused_key_ = TxKey();
+            }
+            else
+            {
+                min_paused_slice_index_ =
+                    std::numeric_limits<size_t>::max();
+            }
         }
@@
     void UpdateMinPausedSlice(size_t slice_index)
     {
         assert(!export_base_table_item_);
         if (slice_index < slice_coordinator_.min_paused_slice_index_)
         {
             slice_coordinator_.min_paused_slice_index_ = slice_index;
         }
     }

Also applies to: 4418-4546

🧹 Nitpick comments (6)
tx_service/include/cc/template_cc_map.h (4)

5237-5305: Status handling in pin_range_slice could be made more explicit

The pin_range_slice lambda (Lines 5237–5305) is generally consistent with other PinRangeSlice call sites, but two aspects are worth tightening:

  • Line 5283 (RangeSliceOpStatus::NotOwner) both triggers assert("Dead branch") and sets succ = true. In non-assert builds this silently treats a “NotOwner” slice as success. If this state is truly impossible, consider using assert(false) without also marking success; otherwise, treat it as a real error (e.g., log and propagate PIN_RANGE_SLICE_FAILED) so mis-routing during failover/split doesn’t get masked.
  • Line 5241 hard-codes prefetch_size = 32. If the desired prefetch depth is already encoded elsewhere (e.g., on req or in a config), it would be cleaner to thread that through rather than baking in a magic constant here.

5321-5414: Guard assumptions around StoreRangePtr and MoveNextSlice semantics

The new helpers around slice preparation and coordination look structurally sound but rely on a couple of non-obvious invariants:

  • Line 5323 assumes req.StoreRangePtr() is non-null and that FindSlice(slice_key) never returns nullptr. If there is any path where the store range has not been pinned/set before this executes, this will dereference null. Adding an assert(req.StoreRangePtr() != nullptr); (and optionally asserting the FindSlice result) would make the contract explicit.
  • Lines 5336–5341 only populate req_end_key when req.export_base_table_item_ is true; for the index path it remains nullptr, yet it is still passed to MoveNextSlice<KeyT>(slice_end_key, req_end_key) on Line 5389. Please double-check that MoveNextSlice is defined to accept a nullptr end key (and treat it as unbounded); otherwise, this can skew batch-end calculation.
  • On Lines 5345–5353, prepared_slice_cnt is derived from slice_coordinator_.PreparedSliceCnt() and then incremented for each loop iteration, regardless of whether pin_range_slice eventually succeeds. On the RangeSliceOpStatus::Retry path, pin_range_slice enqueues the request and returns succ = false, while prepared_slice_cnt has already been incremented. Updating PreparedSliceCnt with this value (Lines 5371–5373) means the coordinator might count a slice as “prepared” even though the pin hasn’t actually been performed yet. This is subtle; please verify the coordinator’s semantics so slices are neither skipped nor double-counted across retries.

5424-5488: find_non_empty_slice relies on strong invariants for CurrentSlice

The new find_non_empty_slice helper (Lines 5426–5488) is a nice way to centralize discovery of the next non-empty slice, but it assumes:

  • Line 5464: req.CurrentSlice(shard_->core_id_) always returns a valid StoreSlice* whenever req.TheBatchEnd(...) is false. If CurrentSlice can ever be null (e.g., coordinator misalignment or range metadata races), this will crash before the TheBatchEnd check. A defensive assert(store_slice != nullptr); here (and/or earlier) would document the invariant.
  • Line 5438–5453: the lambda asserts that if search_key is beyond curr_start_key, it must still be strictly less than the slice’s end key derived via req.StoreRangePtr()->FindSlice(curr_start_tx_key). This depends on PausePos and CurrentSliceKey always being in sync with the underlying store-range layout (no concurrent range split/move that invalidates the mapping). If range splits can occur while a data-sync scan is in progress, it’s worth confirming this assertion cannot fire in production.

Functionally the control flow (advancing slices until it != end_it or batch end) looks correct for both empty and non-empty slices; the main concern is making the hidden assumptions explicit.


5510-5524: Scan loop pause/resume and finish conditions are subtle; consider clarifying/validating with tests

The main per-core scan loop and finish logic tie together several moving parts:

  • Lines 5510–5511: initial key_it/slice_end_it are obtained from find_non_empty_slice, and Line 5524 asserts key_it != slice_end_it || req.TheBatchEnd(core). Later, after the loop, Line 5636 reasserts the same invariant.
  • Lines 5617–5627: when a slice is exhausted, you clear slice_pinned, advance with MoveToNextSlice, and, if not at TheBatchEnd, call find_non_empty_slice again to continue scanning the next non-empty slice.
  • Lines 5653–5656: SetFinish(core) is triggered when any of these are true: heap full, no_more_data (only when IsLastBatch() and key_it == slice_end_it), accumulated_scan_cnt_ >= scan_batch_size_, or TheBatchEnd(core).

This appears consistent, but the interaction between:

  • no_more_data vs TheBatchEnd (per-batch vs overall),
  • PausePos (Line 5638–5647),
  • and IsDrained checked at entry (Lines 5417–5422),

is intricate enough that regressions are easy to introduce.

I’d recommend:

  • Adding a brief comment near the no_more_data computation (Lines 5639–5643) explaining the distinction between “end of current batch of slices” (TheBatchEnd) and “end of the entire scan” (IsLastBatch).
  • Validating with targeted tests covering: multi-batch scans over several slices (including empty slices), resume from PausePos mid-range, and the path where scan_heap_is_full_ forces early exit with remaining data.

Also applies to: 5617-5636, 5653-5656

tx_service/src/cc/local_cc_shards.cpp (1)

4066-4071: Randomized ‘first core’ selection looks fine; consider aligning the comment and factoring out core count

The change to route scan_cc to a randomly chosen shard each iteration is compatible with the “first core coordinates and fans out” contract, but the comment still reads as if there were a fixed “first” core. To reduce confusion for future readers, you might explicitly mention that this core is chosen randomly now, and optionally cache const auto core_count = cc_shards_.size(); outside the loop and reuse it for the modulo to avoid re-reading the vector size in a hot path.

tx_service/include/cc/cc_request.h (1)

4587-4592: Update slices_to_scan_ comment to match new “pinned slice” semantics

slices_to_scan_’s bool is now used as a per-slice state flag (set by SliceCoordinator::SlicePinned() and read by IsSlicePinned()), but the comment still says “mark if the slice need to be split”, which is misleading under the new coordinator model.

Consider updating the comment (or renaming the flag) to reflect that it tracks pinned/active slices for the current batch, not “need-split” state.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b5982a7 and 13803e9.

📒 Files selected for processing (3)
  • tx_service/include/cc/cc_request.h
  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/cc/local_cc_shards.cpp
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • tx_service/include/cc/cc_request.h
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/template_cc_map.h
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • tx_service/include/cc/cc_request.h
  • tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.

Applied to files:

  • tx_service/include/cc/cc_request.h
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/template_cc_map.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.

Applied to files:

  • tx_service/src/cc/local_cc_shards.cpp
🧬 Code graph analysis (1)
tx_service/src/cc/local_cc_shards.cpp (1)
tx_service/include/cc/local_cc_shards.h (1)
  • EnqueueLowPriorityCcRequestToShard (413-419)
🔇 Additional comments (1)
tx_service/include/cc/cc_request.h (1)

4055-4091: Slice list construction and per-core state look reasonable

The constructor’s initialization of slice_coordinator_, slices_to_scan_, and per-core vectors (data_sync_vec_, archive_vec_, mv_base_idx_vec_, pause_pos_, curr_slice_index_, counters) is coherent and consistent with the new coordinator-based design; no issues from a correctness perspective here.

@yi-xmu yi-xmu force-pushed the update_datasyncscan_execution branch from 13803e9 to b91e939 Compare December 30, 2025 03:57
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tx_service/include/cc/template_cc_map.h (1)

5247-5316: Guard RangeSliceOpStatus::NotOwner in pin_range_slice before treating the slice as pinned

In pin_range_slice, the NotOwner case sets succ = true and returns whatever RangeSliceId the underlying PinRangeSlice call produced. A few lines later, the caller unconditionally does:

  • req.slice_coordinator_.StorePinnedSlice(new_slice_id);
  • auto *slice = static_cast<const TemplateStoreSlice<KeyT> *>(new_slice_id.Slice());
  • const KeyT *slice_end_key = slice->EndKey();

(Line 5391–5396)

If PinRangeSlice represents NotOwner with an invalid/empty RangeSliceId (e.g., Slice() == nullptr), this will be UB / a hard crash. Other call sites in this file (e.g., ScanSliceDeltaSizeCcForRangePartition) explicitly avoid touching slice_id on NotOwner, which suggests it’s not safe to use.

Consider treating NotOwner as “no usable slice” and skipping pin-related work for that slice rather than flowing it through as a successfully pinned slice. One possible shape:

  • Return an “empty” RangeSliceId from pin_range_slice on NotOwner.
  • In the caller, detect new_slice_id.Slice() == nullptr and skip SlicePinned/StorePinnedSlice and MoveNextSlice(slice_end_key, ...) for that iteration.

Example tweak:

Proposed defensive handling for `NotOwner` and caller
@@
-        auto pin_range_slice =
-            [this, &req, &next_slice_func](
-                const KeyT &search_key) -> std::pair<RangeSliceId, bool>
+        auto pin_range_slice =
+            [this, &req, &next_slice_func](
+                const KeyT &search_key) -> std::pair<RangeSliceId, bool>
         {
@@
             switch (pin_status)
             {
@@
             case RangeSliceOpStatus::NotOwner:
             {
-                assert("Dead branch");
-                // The recovered cc entry does not belong to this ng
-                // anymore. This will happen if ng failover after a
-                // range split just finished but before checkpointer
-                // is able to truncate the log. In this case the log
-                // records of the data that now falls on another ng
-                // will still be replayed on the old ng on recover.
-                // Skip the cc entry and remove it at the end.
-                succ = true;
+                assert("Dead branch");
+                // Slice is no longer owned by this node-group; treat it as
+                // having no usable RangeSliceId so downstream logic can skip it.
+                slice_id = RangeSliceId{};
+                succ = true;
                 break;
             }
@@
             return {slice_id, succ};
         };
@@
-                // Execute the pinslice operation.
-                auto [new_slice_id, succ] = pin_range_slice(*start_key);
-                if (!succ)
-                {
-                    req.slice_coordinator_.UpdatePreparedSliceCnt(
-                        prepared_slice_cnt);
-                    return false;
-                }
+                // Execute the pinslice operation.
+                auto [new_slice_id, succ] = pin_range_slice(*start_key);
+                if (!succ)
+                {
+                    req.slice_coordinator_.UpdatePreparedSliceCnt(
+                        prepared_slice_cnt);
+                    return false;
+                }
+
+                // `NotOwner` (or similar) may yield an "empty" RangeSliceId;
+                // in that case just advance to the next slice without pinning.
+                if (new_slice_id.Slice() == nullptr)
+                {
+                    req.slice_coordinator_.MoveNextSlice<KeyT>();
+                    continue;
+                }
♻️ Duplicate comments (1)
tx_service/include/cc/cc_request.h (1)

4291-4305: Guard FixCurrentSliceIndex against out-of-bounds on pinned_slices_.

FixCurrentSliceIndex walks slice_coordinator_.pinned_slices_ without any bounds checks. If pause_pos_[core_id].first is greater than the last pinned slice’s end key (or pinned_slices_ is empty), this will index past the end of the vector and trigger undefined behaviour.

Please add an empty-check and cap the loop at pinned_slices_.size().

Proposed fix
     void FixCurrentSliceIndex(uint16_t core_id)
     {
         assert(export_base_table_item_);
-        if (pause_pos_[core_id].first.KeyPtr() != nullptr)
-        {
-            size_t curr_slice_idx = 0;
-            StoreSlice *curr_slice =
-                slice_coordinator_.pinned_slices_[curr_slice_idx];
-            while (curr_slice->EndTxKey() < pause_pos_[core_id].first)
-            {
-                ++curr_slice_idx;
-                curr_slice = slice_coordinator_.pinned_slices_[curr_slice_idx];
-            }
-            curr_slice_index_[core_id] = curr_slice_idx;
-        }
+        if (pause_pos_[core_id].first.KeyPtr() == nullptr)
+        {
+            return;
+        }
+
+        auto &pinned = slice_coordinator_.pinned_slices_;
+        if (pinned.empty())
+        {
+            return;
+        }
+
+        size_t curr_slice_idx = 0;
+        StoreSlice *curr_slice = pinned[curr_slice_idx];
+        while (curr_slice_idx + 1 < pinned.size() &&
+               curr_slice->EndTxKey() < pause_pos_[core_id].first)
+        {
+            ++curr_slice_idx;
+            curr_slice = pinned[curr_slice_idx];
+        }
+        curr_slice_index_[core_id] = curr_slice_idx;
     }
🧹 Nitpick comments (2)
tx_service/src/cc/local_cc_shards.cpp (1)

4067-4072: Random single-core dispatch is reasonable; clarify coordinator intent and dependency

The new logic on Line [4067]–Line [4072] to pick a random core as the “first”/coordinator core and enqueue RangePartitionDataSyncScanCc only there looks aligned with the coordinator comment and should work fine as long as RangePartitionDataSyncScanCc is truly agnostic to which shard is the coordinator.

Two small follow-ups:

  1. Clarify naming/comment & avoid magic modulo in-place

    Renaming the variable and comment to make the “coordinator” role explicit improves readability and avoids re-doing the modulo inline:

    Proposed readability tweak
  •    uint32_t core_rand = butil::fast_rand();
    
  •    // The scan slice request is dispatched to the first core. The first
    
  •    // core tries to pin the slice if necessary and if succeeds, further
    
  •    // dispatches the request to remaining cores for parallel scans.
    
  •    EnqueueLowPriorityCcRequestToShard(core_rand % cc_shards_.size(),
    
  •                                       &scan_cc);
    
  •    const size_t coordinator_core_idx =
    
  •        static_cast<size_t>(butil::fast_rand()) % cc_shards_.size();
    
  •    // Dispatch the scan slice request to a randomly chosen coordinator
    
  •    // core. That core tries to pin the slice if necessary and, on success,
    
  •    // further dispatches the request to remaining cores for parallel scans.
    
  •    EnqueueLowPriorityCcRequestToShard(coordinator_core_idx, &scan_cc);
    
    </details>
    
    
  1. Verify header and coordinator assumptions

    • Ensure the translation unit includes the header that declares butil::fast_rand explicitly (rather than relying on a transitive include), so this doesn’t become fragile if upstream headers change.
    • Double-check that RangePartitionDataSyncScanCc no longer assumes coordinator core 0 and is safe to be enqueued from any shard index, since we now randomize the coordinator.
tx_service/include/cc/template_cc_map.h (1)

5331-5424: Slice-coordinator batch preparation looks sane; consider making StoreRangePtr invariants explicit

The new helpers around slice preparation:

  • check_split_slice using TemplateStoreRange<KeyT>::FindSlice and PostCkptSize() to decide which slices actually need pinning.
  • The req.slice_coordinator_ loop that advances via StartKey/MoveNextSlice, pins slices via pin_range_slice, stores RangeSliceIds, calls UpdateBatchEnd(), and then broadcasts the request to all cores once SetReadyForScan() is set.

all line up with how other parts of this file reason about range slices and post‑ckpt sizes, and the control flow for PreparedSliceCnt, IsEndSlice, and SetUnfinishedCoreCnt is consistent with existing multi‑core scan patterns.

Two small points you may want to tighten up:

  • check_split_slice and CurrentSliceKey/CurrentSlice assume req.StoreRangePtr() is non‑null and that the range remains pinned for the whole scan. Other code paths here make the same assumption, but it’s implicit. Adding an early assert(req.StoreRangePtr() != nullptr); near the top of Execute(RangePartitionDataSyncScanCc &req) would document that contract and fail fast if a future caller violates it.
  • check_split_slice repeatedly fetches TemplateStoreRange<KeyT> *range_ptr and does a FindSlice for every candidate slice start. If you ever see this show up in profiles, hoisting range_ptr out of the lambda (capturing it by reference) would avoid redundant pointer casts / lookups.

Functionally this block looks correct; these are minor robustness / clarity tweaks.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 13803e9 and b91e939.

📒 Files selected for processing (4)
  • tx_service/include/cc/cc_request.h
  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/src/sk_generator.cpp
🚧 Files skipped from review as they are similar to previous changes (1)
  • tx_service/src/sk_generator.cpp
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/cc_request.h
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/cc_request.h
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/cc/local_cc_shards.cpp
  • tx_service/include/cc/cc_request.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • tx_service/src/cc/local_cc_shards.cpp
🧬 Code graph analysis (1)
tx_service/include/cc/template_cc_map.h (1)
tx_service/include/range_record.h (1)
  • `` (338-814)
🔇 Additional comments (2)
tx_service/include/cc/template_cc_map.h (1)

5434-5671: Non‑empty slice discovery and per‑core scan loop are consistent, but rely on coordinator invariants

The new per‑core scanning path:

  • Uses req.PausePos(core_id) to resume from a per‑core pause key and find_non_empty_slice to advance CurrentSlice(core_id) until it finds a slice whose [start, end) actually has keys in TemplateCcMap.
  • Derives (key_it, slice_end_it, slice_end_key) with deduce_iterator, reusing the same boundary semantics used elsewhere (start at first key ≥ slice start, stop at first key ≥ slice end).
  • Maintains slice_pinned = req.IsSlicePinned(core_id) and export_persisted_key_only = !export_base_table_item_ && slice_pinned, so that need_export = (commit_ts <= data_sync_ts) && (slice_pinned || cce->NeedCkpt()) behaves per the comment: split/pinned slices export all persisted keys, unsplit slices only export entries that still need ckpt.
  • On each slice boundary (key_it == slice_end_it), calls MoveToNextSlice(core_id) and recomputes (key_it, slice_end_it, slice_end_key) via find_non_empty_slice, while resetting slice_pinned/export_persisted_key_only for the new slice.
  • Terminates a batch when either memory is full, scan_batch_size_ is reached, or req.TheBatchEnd(core_id) is hit, and only marks no_more_data when key_it == slice_end_it && req.IsLastBatch().

This is coherent with the rest of the scan/datasync machinery in this file, but it assumes:

  • CurrentSlice(core_id) and CurrentSliceKey(core_id) have been correctly initialised by the coordinator before any core reaches this block, and
  • TheBatchEnd(core_id)/IsLastBatch() reflect the same batch/slice set that was used to prepare and pin slices in the earlier coordinator block.

Given how subtle this state machine is, it would be good to exercise it with tests that cover:

  • Batches where all slices in the batch are empty on a core (so find_non_empty_slice walks to TheBatchEnd and you exit the main loop immediately).
  • Resume‑from‑pause scenarios (PausePos non‑empty) crossing slice boundaries.
  • Mixes of pinned and unpinned slices, ensuring export_persisted_key_only toggles as intended and no_more_data only flips true on the final batch.

From the perspective of this file the logic is internally consistent; the remaining risk is ensuring RangePartitionDataSyncScanCc’s coordinator invariants hold across the whole pipeline.

tx_service/include/cc/cc_request.h (1)

4165-4196: The lifecycle management of unfinished_cnt_ is actually safe and does not require changes.

The concern about premature completion and negative counters cannot occur in practice:

  1. Reset() flow is correct: When Reset() sets unfinished_cnt_ = 1 and resets slice_coordinator_.ready_for_scan_ = false, the next invocation always goes through Execute().

  2. Execute() always calls SetUnfinishedCoreCnt() before dispatch: The code checks !req.slice_coordinator_.IsReadyForScan() and only proceeds to multi-core dispatch after calling SetUnfinishedCoreCnt(shard_->core_cnt_) at line 5414 in template_cc_map.h. There is no code path that dispatches work to multiple cores without first updating the counter.

  3. All reuse paths verified:

    • local_cc_shards.cpp (4 Reset calls): All follow the pattern of Reset → Wait → EnqueueLowPriorityCcRequestToShard → Execute
    • sk_generator.cpp (2 Reset calls): Same pattern
    • Early returns in Execute (schema mismatch, pin failure) call SetError() and exit without reaching dispatch code
  4. SetFinish/SetError are safe: Both methods decrement unfinished_cnt_ and properly check for zero before unpinning and notifying.

The design is correct but somewhat implicit—it relies on Execute() being the sole entry point for dispatch and always calling SetUnfinishedCoreCnt() when ready_for_scan_ is false. While the suggestion to move this responsibility into Reset() would improve clarity and make the API more robust against future misuse, the current implementation does not have the bug described.

Likely an incorrect or invalid review comment.

// Execute the pinslice operation.
auto [new_slice_id, succ] = pin_range_slice(*start_key);
if (!succ)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure to yield during the slice pin. This could take a while if you need to pin 512 slices

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a load slice operation occurs, it will automatically yield.

@@ -5254,7 +5245,7 @@ class TemplateCcMap : public CcMap
}

auto pin_range_slice =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comprehensive comment at the beginning of Exeucte() to explain the workflow of this request. This is request is too complicated and hard to follow.

for (; prepared_slice_cnt <
RangePartitionDataSyncScanCc::SliceCoordinator::
MaxBatchSliceCount &&
!req.slice_coordinator_.IsEndSlice();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have an estimate on how many slices we need to pin?

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
tx_service/include/cc/cc_request.h (2)

4353-4394: SliceCoordinator pause tracking and slices_to_scan_ flag semantics are still fragile

Two related points here, echoing earlier feedback:

  1. min_paused_key_ and min_paused_slice_index_ initialization vs update rules

    • Continuous mode initializes min_paused_key_ to a default-constructed TxKey, and UpdateMinPausedSlice(const TxKey *key) only updates when *key < min_paused_key_. If the default key is not explicitly treated as “+∞” by operator<, the first real paused key may never be recorded, leaving min_paused_key_ at its default and StartKey<KeyT>() returning nullptr.
    • Non‑continuous mode initializes min_paused_slice_index_ to 0, and UpdateMinPausedSlice(size_t) only updates when slice_index < min_paused_slice_index_. Without a separate “uninitialized” sentinel, a first paused index of > 0 won’t ever be reflected in min_paused_slice_index_, which is then used by StartKey<KeyT>() and UpdateBatchEnd().

    If the “prepare slices” phase guarantees these fields are pre‑initialized to correct minima before any UpdateMinPausedSlice calls, documenting that invariant (and/or adding assertions around it) would make this much less brittle. Otherwise, consider explicit uninitialized handling (e.g., checking min_paused_key_.KeyPtr() == nullptr before comparing, or using a dedicated sentinel for the index case).

  2. slices_to_scan_ comment doesn’t match actual usage
    slices_to_scan_ is documented as “bool is used to mark if the slice need to be split”, but in this struct it’s manipulated purely as a “pinned or not” flag (SliceCoordinator::SlicePinned() and IsSlicePinned()). If “need to split” is no longer the intended meaning here, the comment should be updated to avoid confusion for future readers.

Also applies to: 4405-4533, 4575-4579


4291-4305: FixCurrentSliceIndex can walk past pinned_slices_ bounds

The while-loop advances curr_slice_idx over slice_coordinator_.pinned_slices_ without checking curr_slice_idx against pinned_slices_.size(). If pause_pos_[core_id].first is greater than the end key of the last pinned slice, this will read past the vector boundary and invoke UB.

Also, if pinned_slices_ is ever empty, directly indexing [0] is immediately invalid.

Consider hardening this as follows:

  • Early‑out if pinned_slices_ is empty.
  • Bound the loop using curr_slice_idx + 1 < pinned_slices_.size() and stop at the last slice even if pause_pos_ lies beyond its end.

For example:

Proposed fix
void FixCurrentSliceIndex(uint16_t core_id)
{
    assert(export_base_table_item_);
    if (pause_pos_[core_id].first.KeyPtr() != nullptr &&
-       !slice_coordinator_.pinned_slices_.empty())
+       !slice_coordinator_.pinned_slices_.empty())
    {
        size_t curr_slice_idx = 0;
-        StoreSlice *curr_slice =
-            slice_coordinator_.pinned_slices_[curr_slice_idx];
-        while (curr_slice->EndTxKey() < pause_pos_[core_id].first)
-        {
-            ++curr_slice_idx;
-            curr_slice = slice_coordinator_.pinned_slices_[curr_slice_idx];
-        }
+        StoreSlice *curr_slice =
+            slice_coordinator_.pinned_slices_[curr_slice_idx];
+        while (curr_slice_idx + 1 < slice_coordinator_.pinned_slices_.size() &&
+               curr_slice->EndTxKey() < pause_pos_[core_id].first)
+        {
+            ++curr_slice_idx;
+            curr_slice = slice_coordinator_.pinned_slices_[curr_slice_idx];
+        }
        curr_slice_index_[core_id] = curr_slice_idx;
    }
}
tx_service/include/cc/template_cc_map.h (1)

5332-5510: Slice pinning/preparation phase: behavior looks sound; consider tightening a couple of edge cases

Overall the new pin_range_slice + coordinator-driven preparation logic is consistent with the two‑phase design and correctly handles Retry vs BlockedOnLoad vs hard errors, but a few details are worth tightening:

  • check_split_slice assumes req.StoreRangePtr() is non-null and that FindSlice(slice_key) always succeeds. That’s true for the normal checkpoint path, but if this request is ever constructed without a pinned TemplateStoreRange<KeyT> the static_cast and subsequent dereferences will UB. Adding an assert(req.StoreRangePtr() != nullptr); before the cast would make this assumption explicit and safer.
  • In pin_range_slice, the RangeSliceOpStatus::NotOwner branch is marked as “Dead branch” but still sets succ = true. That means the caller will treat the operation as successful and push the returned RangeSliceId into slice_coordinator_.pinned_slices_, even though ownership is wrong. If this branch ever fires in release builds (e.g., a rare failover window), that RangeSliceId may be invalid for later CurrentSlice()/LastPinnedSlice() usages. It would be safer either to:
    • Treat NotOwner like an error (set PIN_RANGE_SLICE_FAILED and succ = false), or
    • Return succ = false and ensure the caller skips storing the slice id in that case.
  • The preparation loop pins up to MaxBatchSliceCount slices in one go. With prefetch_size = 32 this is probably fine, but if MaxBatchSliceCount is ever raised again, you might want to periodically yield (e.g., via EnqueueLowPriorityCcRequest) instead of doing all pins in one tight loop on the TxProcessor thread.

Functionally this block looks correct; these changes would mostly harden edge cases and clarify intent.

🧹 Nitpick comments (4)
tx_service/include/cc/cc_request.h (3)

4042-4078: Constructor: guard old_slices_delta_size and clarify slices_to_scan_ bool semantics

The wiring of slice_coordinator_ and per-core curr_slice_index_ looks coherent. However:

  • In the !export_base_table_item_ branch you unconditionally dereference old_slices_delta_size; given it defaults to nullptr, it’s worth asserting (or otherwise enforcing) that it’s non-null on this path to avoid accidental UB later.
  • slices_to_scan_ is now used downstream as a (TxKey, pinned) container; consider renaming or at least documenting the .second semantic here to keep it in sync with how IsSlicePinned and SliceCoordinator::SlicePinned actually use it.

4167-4196: Reset now relies on SetUnfinishedCoreCnt – verify all call sites

Reset() sets unfinished_cnt_ = 1 and then expects SetUnfinishedCoreCnt(core_cnt) to be called before the next multi-core scan. That’s a behavioral change from initializing to core_cnt_ directly.

If any path forgets to call SetUnfinishedCoreCnt, Wait() could either unblock prematurely or block forever. Please double‑check the TemplateCcMap callers and consider a brief comment here documenting the required call order.

Resetting curr_slice_index_ (for export_base_table_item_) and slice_coordinator_.Reset() is otherwise consistent.


4308-4346: Accessor helpers over curr_slice_index_ look correct; consider asserting store_range_

The new helpers (CurrentSlice, CurrentSliceKey, MoveToNextSlice, TheBatchEnd, IsSlicePinned) are consistent with the design:

  • In continuous mode they index into slice_coordinator_.pinned_slices_.
  • In non‑continuous mode they use slices_to_scan_ and batch_end_slice_index_.
  • IsSlicePinned is effectively “always true” in continuous mode, which matches all‑pinned semantics.

One small robustness improvement: in CurrentSlice’s non‑continuous branch, it would be safer to assert store_range_ != nullptr before calling store_range_->FindSlice(...), since a null store_range_ would currently crash.

tx_service/include/cc/template_cc_map.h (1)

5512-5752: Scan loop, pause/resume, and termination semantics: mostly solid; add a couple of safety checks

The reworked per‑core scan logic around PausePos, find_non_empty_slice, and the termination condition is coherent and matches the new coordinator model, but a few points deserve attention:

  • In find_non_empty_slice, the non‑migration path calls req.CurrentSliceKey(shard_->core_id_) and immediately does curr_start_tx_key.GetKey<KeyT>() and dereferences it. This assumes the per‑core current-slice key is always initialized and non-null before the first scan on that core. If there is any path where curr_slice_index_[core_id] is unset, this becomes undefined behavior. A defensive assert(curr_start_tx_key.KeyPtr() != nullptr); (or early return) would make the invariant explicit.

  • The assert(key_it != slice_end_it || req.TheBatchEnd(shard_->core_id_)); after the first find_non_empty_slice call is good, but if all slices in a batch are empty and it is not the last batch, TheBatchEnd() must reliably return true or you’ll hit this assert. It would be useful to double‑check that slice_coordinator_.MoveNextSlice/UpdateBatchEnd guarantee TheBatchEnd() in that case.

  • The termination condition:

    bool no_more_data = (key_it == slice_end_it) && req.IsLastBatch();
    ...
    if (is_scan_mem_full || no_more_data ||
        req.accumulated_scan_cnt_[core] >= req.scan_batch_size_ ||
        req.TheBatchEnd(core)) {
        req.SetFinish(core);
        return false;
    } else {
        shard_->EnqueueLowPriorityCcRequest(&req);
        return false;
    }

    implies that SetFinish(core_id_) is called at most once per core per RangePartitionDataSyncScanCc, and any further progress for that core must come from a new request. That matches the usual pattern for these Cc requests, but it puts a lot of meaning into IsLastBatch() and TheBatchEnd(). Please verify that higher‑level driver code never re‑invokes this same request on a core after SetFinish(core_id_) has been called, otherwise unfinished_cnt_ book‑keeping could underflow.

These are mostly invariants and safety checks; the core pause/resume and slice‑advance logic reads correctly.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b91e939 and 39494dc.

📒 Files selected for processing (2)
  • tx_service/include/cc/cc_request.h
  • tx_service/include/cc/template_cc_map.h
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
  • tx_service/include/cc/cc_request.h
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
  • tx_service/include/cc/cc_request.h
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
  • tx_service/include/cc/cc_request.h
🧬 Code graph analysis (1)
tx_service/include/cc/template_cc_map.h (2)
tx_service/include/range_record.h (2)
  • KeyT (646-649)
  • `` (338-814)
tx_service/include/cc/ccm_scanner.h (1)
  • Iterator (678-704)
🔇 Additional comments (1)
tx_service/include/cc/cc_request.h (1)

4198-4214: Good centralization of finish/error handling; confirm paused-slice bookkeeping invariants

Routing both error and normal completion through SetError/SetFinish and unconditionally calling UnpinSlices() when unfinished_cnt_ reaches 0 is a solid improvement and should prevent leaked pins across batches.

The new SetFinish(size_t core_id) logic also updates the minimal paused position:

  • Continuous (export_base_table_item_) mode: uses pause_pos_[core_id].first via UpdateMinPausedSlice(const TxKey*).
  • Non‑continuous mode: uses curr_slice_index_[core_id] via UpdateMinPausedSlice(size_t).

This assumes that, at the moment SetFinish is called, each core’s pause_pos_/curr_slice_index_ is already pointing at “the first unscanned slice/key” and that every participating core calls SetFinish exactly once. Please confirm those invariants in the TemplateCcMap paths, or add assertions/comments here to make the contract explicit.

Also applies to: 4228-4246

Comment on lines +5186 to +5270
/*
* RangePartitionDataSyncScanCc Workflow:
* ======================================
*
* This function implements a two-phase execution model for
* range-partitioned table data synchronization scanning with two distinct
* execution modes:
*
* Execution Modes:
* ----------------
* 1. Checkpoint Mode (export_base_table_item_ = false):
* - Used during normal checkpoint operations
* - Exports only dirty data (keys that need checkpointing)
* - The `req.slices_to_scan_` vector determines which slices need to be
* scanned.
* - Only pins slices that require splitting(PostCkptSize <= upper_bound)
*
* 2. Data Migration Mode (export_base_table_item_ = true):
* - Used during range split operations or index creation
* - Exports ALL primary key data in the specified range
* - Processes slices sequentially (idx + 1)
* - Pins all slices in the target range for complete data export
*
* Two-Phase Execution:
* --------------------
* Phase 1 - Slice Preparation (Single Core):
* - Executed by the initial core that receives the request
* - Prepares up to MaxBatchSliceCount (128) slices for scanning
* - For each slice:
* * In checkpoint mode: Checks if slice needs splitting (PostCkptSize >
* upper_bound)
* - If not, skips pinning and only scans dirty keys
* - If yes, pins the slice for complete export
* * In migration mode: Always pins the slice
* - Stores pinned slice IDs in `slice_coordinator_.pinned_slices_`
* - Sets ready_for_scan flag when preparation completes
* - Distributes the request to all other cores for parallel execution
*
* Phase 2 - Parallel Data Export (All Cores):
* - All cores (including the initial one) execute in parallel
* - Each core maintains its own:
* * Current slice index (curr_slice_index_[core_id])
* * Pause position (pause_pos_[core_id])
* * Accumulated scan/flush counters
* - For each core's execution:
* 1. Finds the first non-empty slice to scan (from pause position or
* start key)
* 2. Iterates through keys in the slice using ForwardScanStart()
* 3. For each key:
* - Checks if export is needed (CommitTs <= data_sync_ts_)
* - In checkpoint mode: Also checks if slice is pinned or
* cce->NeedCkpt()
* - Calls ExportForCkpt() to export the key/record data
* - Handles MVCC archive record recycling if enabled
* 4. Processes up to DataSyncScanBatchSize (32) pages per round
* 5. Stops when:
* - Reaches scan_batch_size_ limit
* - Scan heap is full
* - Reaches end of current batch slices
* - All data is drained (IsLastBatch())
* 6. Updates pause position for resume on next execution
* 7. Re-enqueues itself if more work remains
*
* Key Data Structures:
* --------------------
* - slice_coordinator_: Coordinates slice preparation and tracks pinned
* slices
* - pause_pos_[core_id]: Tracks where each core paused for resumption
* - data_sync_vec_[core_id]: Accumulates flush records per core
* - archive_vec_[core_id]: Accumulates archive records for MVCC cleanup
*
* Termination:
* ------------
* - When all cores finish (unfinished_cnt_ == 0):
* * All pinned slices are unpinned
* * Condition variable is notified to wake waiting threads
* - Each core calls SetFinish() when its portion is complete
*
* Error Handling:
* ---------------
* - Schema version mismatches: Returns false with
* REQUESTED_TABLE_SCHEMA_MISMATCH
* - Pin slice failures: Returns false, request may be retried
* - Heap full: Sets scan_heap_is_full_ flag, pauses and waits for cleanup
*/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix minor inaccuracies in the RangePartitionDataSyncScanCc workflow comment

The new high-level comment is very helpful, but there are a couple of inaccuracies that can be misleading when debugging this path:

  • In the “Checkpoint Mode” section you say “Only pins slices that require splitting (PostCkptSize <= upper_bound)”, while the code in next_slice_func/check_split_slice pins only when PostCkptSize() > StoreSlice::slice_upper_bound. The inequality sign should be corrected to match the implementation.
  • Under “Error Handling”, the bullet “Pin slice failures: Returns false, request may be retried” doesn’t distinguish between retriable statuses (e.g. Retry, BlockedOnLoad) and hard failures where you set PIN_RANGE_SLICE_FAILED and will not retry. Clarifying that here would make the behavior easier to reason about.

Updating these lines to reflect the actual conditions will keep the documentation aligned with the execution semantics.

🤖 Prompt for AI Agents
In tx_service/include/cc/template_cc_map.h around lines 5186 to 5270, update the
RangePartitionDataSyncScanCc workflow comment: change the Checkpoint Mode
pinning condition to match the code (slice is pinned when PostCkptSize() >
StoreSlice::slice_upper_bound, not <=), and clarify Error Handling to
distinguish retriable pin failures (e.g. Retry, BlockedOnLoad) from
non-retriable failures that set PIN_RANGE_SLICE_FAILED and will not retry; make
both edits concise so the prose matches the implementation semantics.

@yi-xmu yi-xmu merged commit 42f15b9 into main Dec 31, 2025
4 checks passed
This was referenced Jan 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature]: Refine the execution of RangePartitionDataSyncScanCc request

2 participants