Conversation
WalkthroughThis pull request updates the Changes
Estimated code review effort🎯 1 (Trivial) | ⏱️ ~2 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings, 1 inconclusive)
✨ Finishing touches🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
tx_service/include/cc/template_cc_map.h (1)
5254-5734: Strengthen invariants around slice lookup and resume logic in RangePartitionDataSyncScanCcThe new slice-coordinator flow (pinning via
pin_range_slice,check_split_slice,find_non_empty_slice, andPausePos-based resume) looks coherent, but a few assumptions are implicit and worth making explicit:
check_split_sliceand the lambda infind_non_empty_sliceassumereq.StoreRangePtr()andrange_ptr->FindSlice(...)always return non-null and that the returned slice has a meaningfulPostCkptSize()/EndTxKey(). If this ever breaks (e.g., range map changes, miswired request), you’ll hit UB before any guard trips. Adding defensiveassert(req.StoreRangePtr() != nullptr)andassert(slice != nullptr)near these usages would make failures deterministic and easier to debug rather than silently corrupting scan state.find_non_empty_sliceand the main scan loop rely onreq.CurrentSlice(core_id_)/CurrentSliceKey(core_id_)being initialized consistently withslice_coordinator_beforeIsReadyForScan()flips. If someone later changes the coordinator or the dispatch path, it would be easy to introduce a mismatch that only shows up under specific ranges/cores. A brief comment at the top ofExecute(RangePartitionDataSyncScanCc &req)documenting this invariant (who initializesCurrentSlice*and when) would help future maintainers.- The
no_more_data/PausePosinterplay assumes that whenkey_it == slice_end_itandreq.TheBatchEnd(core_id_)is true butIsLastBatch()is false, the key returned bykey_it->firstis a safe “resume from here” key for the next batch. That seems consistent with thededuce_iteratorcontract, but this is subtle enough that a targeted test (multi-core, multiple slice batches, with both pinned and non-pinned slices) is advisable.These are mostly robustness and maintainability concerns; behavior as written looks correct under the intended invariants.
tx_service/include/cc/cc_request.h (2)
4294-4361: Harden slice index helpers against out‑of‑range access
FixCurrentSliceIndex,CurrentSlice,CurrentSliceKey, andIsSlicePinnedall assume thatcurr_slice_index_[core_id]is within the bounds ofpinned_slices_(continuous mode) orslices_to_scan_(non‑continuous mode), and thatpinned_slices_is non‑empty whenpause_pos_[core_id].firstis set. If any upstream invariant is ever violated (e.g., pause key beyond the last pinned slice, or zero pinned slices), these will step past the end of the container.Since this is on a fairly subtle code path, it’s worth adding defensive asserts to catch violations early in debug builds (without changing release behavior). For example:
Proposed assertions to guard slice indices
void FixCurrentSliceIndex(uint16_t core_id) { assert(export_base_table_item_); if (pause_pos_[core_id].first.KeyPtr() != nullptr) { - size_t curr_slice_idx = 0; - StoreSlice *curr_slice = - slice_coordinator_.pinned_slices_[curr_slice_idx]; - while (curr_slice->EndTxKey() < pause_pos_[core_id].first) - { - ++curr_slice_idx; - curr_slice = slice_coordinator_.pinned_slices_[curr_slice_idx]; - } + assert(!slice_coordinator_.pinned_slices_.empty()); + size_t curr_slice_idx = 0; + StoreSlice *curr_slice = + slice_coordinator_.pinned_slices_[curr_slice_idx]; + while (curr_slice->EndTxKey() < pause_pos_[core_id].first && + curr_slice_idx + 1 < slice_coordinator_.pinned_slices_.size()) + { + ++curr_slice_idx; + curr_slice = slice_coordinator_.pinned_slices_[curr_slice_idx]; + } + // Ensure we didn’t walk past the pinned slice set. + assert(curr_slice_idx < slice_coordinator_.pinned_slices_.size()); curr_slice_index_[core_id] = curr_slice_idx; } } StoreSlice *CurrentSlice(uint16_t core_id) const { size_t curr_slice_idx = curr_slice_index_[core_id]; @@ - if (export_base_table_item_) - { - assert(curr_slice_idx < slice_coordinator_.pinned_slices_.size()); - return slice_coordinator_.pinned_slices_.at(curr_slice_idx); - } + if (export_base_table_item_) + { + assert(curr_slice_idx < slice_coordinator_.pinned_slices_.size()); + return slice_coordinator_.pinned_slices_.at(curr_slice_idx); + } @@ - size_t curr_slice_index = curr_slice_index_[core_id]; - return slices_to_scan_[curr_slice_index].first; + size_t curr_slice_index = curr_slice_index_[core_id]; + assert(curr_slice_index < slices_to_scan_.size()); + return slices_to_scan_[curr_slice_index].first; @@ bool IsSlicePinned(uint16_t core_id) const { - return export_base_table_item_ - ? true - : slices_to_scan_[curr_slice_index_[core_id]].second; + if (export_base_table_item_) + { + return true; + } + size_t idx = curr_slice_index_[core_id]; + assert(idx < slices_to_scan_.size()); + return slices_to_scan_[idx].second; }
4387-4411: Clarify initialization and updates of min‑paused slice state in SliceCoordinator
SliceCoordinator’s continuous mode tracks progress viamin_paused_key_, and non‑continuous mode viamin_paused_slice_index_. Two small robustness improvements would make this less brittle:
- In
UpdateMinPausedSlice(const TxKey *key), you compare*keyagainstmin_paused_key_without checking whethermin_paused_key_is still the initial “null” sentinel. IfTxKey::operator<assumes non‑nullKeyPtr(), this can be problematic. Consider explicitly handling the uninitialized case:Suggested guard for initial min_paused_key_
void UpdateMinPausedSlice(const TxKey *key) { assert(key->KeyPtr() != nullptr); @@ - assert(export_base_table_item_); - if (*key < slice_coordinator_.min_paused_key_) - { - slice_coordinator_.min_paused_key_ = - std::move(key->GetShallowCopy()); - } + assert(export_base_table_item_); + if (slice_coordinator_.min_paused_key_.KeyPtr() == nullptr || + *key < slice_coordinator_.min_paused_key_) + { + slice_coordinator_.min_paused_key_ = + std::move(key->GetShallowCopy()); + } }
- For non‑continuous mode,
min_paused_slice_index_is default‑initialized once in theSliceCoordinatorctor and then updated viaMoveNextSlice/UpdateMinPausedSlice(size_t).Reset()intentionally preserves it across batches (so scans can resume), but this is subtle. A brief comment on that intent nearReset()or the union declaration would help future maintainers avoid “resetting it back to 0” and breaking resume semantics.These are small changes but make the slice‑coordination state more obviously correct and easier to reason about.
Also applies to: 4422-4563
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
log_servicetx_service/include/cc/cc_request.htx_service/include/cc/template_cc_map.htx_service/src/cc/local_cc_shards.cpptx_service/src/sk_generator.cpp
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
tx_service/src/sk_generator.cpptx_service/src/cc/local_cc_shards.cpptx_service/include/cc/cc_request.h
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
tx_service/src/sk_generator.cpptx_service/src/cc/local_cc_shards.cpptx_service/include/cc/template_cc_map.htx_service/include/cc/cc_request.h
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.
Applied to files:
tx_service/src/sk_generator.cpptx_service/src/cc/local_cc_shards.cpptx_service/include/cc/template_cc_map.htx_service/include/cc/cc_request.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
🧬 Code graph analysis (2)
tx_service/src/cc/local_cc_shards.cpp (1)
tx_service/include/cc/local_cc_shards.h (1)
EnqueueLowPriorityCcRequestToShard(413-419)
tx_service/include/cc/template_cc_map.h (1)
tx_service/include/cc/local_cc_shards.h (1)
void(2079-2095)
🔇 Additional comments (6)
log_service (1)
1-1: Verify the submodule commit update and compatibility with tx_service changes.This PR updates the log_service submodule commit reference. To ensure correctness and compatibility, please verify:
- What changes are included in the new commit
7d7e6dcb39a820de909e8e0a3e82cdce2ab403ad?- Do these changes align with the PR objectives (pinslice updates, debug logging, scan fixes)?
- Are there any breaking changes or new dependencies introduced in the log_service update?
- Have the corresponding changes in tx_service been tested with this new log_service version?
You may want to inspect the log_service repository directly or run tests (
./mtr --suite=mono_main,mono_multi,mono_basicas mentioned in the PR checklist) to confirm compatibility.tx_service/src/cc/local_cc_shards.cpp (2)
3837-3841: Debug logging for slice-delta scan parameters looks goodThe added
DLOGgives useful context (range id, table name, last/data sync ts) at the start of the delta-size scan without affecting release performance. No issues here.
4071-4076: Single-core dispatch of RangePartitionDataSyncScanCc relies on new coordinator semanticsRouting each
RangePartitionDataSyncScanCcbatch through a single randomly chosen shard (core_rand % cc_shards_.size()) instead of broadcasting to all shards is consistent with the comment that the “first core” will pin slices and then fan out work to remaining cores. The rest of the function still aggregates per-core results viaaccumulated_flush_data_size_,DataSyncVec(i),ArchiveVec(i), andIsDrained(i), so this change assumes:
RangePartitionDataSyncScanCcnow internally coordinates multi-core scanning starting from the chosen shard, and- It remains safe for only one shard to receive the top-level enqueue per iteration.
Two follow-ups to double-check:
Header for
butil::fast_rand()
Ensure this TU (orcc/local_cc_shards.h) includes the correct header forbutil::fast_rand(typicallybutil/fast_rand.h), otherwise this will fail to compile.Optional include tweak if needed
#include <bthread/mutex.h> #include <butil/time.h>+#include <butil/fast_rand.h>
</details> 2. **Behavioral verification** Given the complexity of `DataSyncForRangePartition` and the new SliceCoordinator-based flow, it’s worth re-running the range-partition DataSync tests (including split-range and pause/resume cases) to confirm: - all cores still participate as intended (no regression to single-core scanning), and - `scan_data_drained`/`IsDrained(i)` semantics are unchanged. </blockquote></details> <details> <summary>tx_service/include/cc/template_cc_map.h (2)</summary><blockquote> `623-637`: **PostWriteCc last-dirty-TS logging looks safe** The new `DLOG(INFO)` for `cc_page->last_dirty_commit_ts_` is correctly guarded by the `commit_ts > cc_page->last_dirty_commit_ts_` check and uses only already-initialized local state. No functional or concurrency impact beyond extra debug noise in non-release builds. --- `5796-5811`: **Additional diagnostics in ScanSliceDeltaSizeCcForRangePartition are reasonable** The new `DLOG(INFO)` when `ccp->last_dirty_commit_ts_ <= req.LastDataSyncTs()` but `cce->NeedCkpt()` is still true is a good sanity check and doesn’t affect behavior (assert remains the real guard). Since this is DLOG-only, it won’t impact release performance and should help catch state inconsistencies during debugging. </blockquote></details> <details> <summary>tx_service/include/cc/cc_request.h (1)</summary><blockquote> `4011-4080`: **RangePartitionDataSyncScanCc slice coordination refactor looks consistent with existing patterns** The new `slice_coordinator_`, per-core `curr_slice_index_`, and the updated `Reset/SetError/SetFinish/UnpinSlices` wiring mirror the patterns used elsewhere (e.g., `ScanSliceCc` / data‑sync requests) and keep ownership and unpinning localized to the request with proper `mux_` protection around `unfinished_cnt_`. The constructor’s initialization and per‑core bookkeeping (including `scan_heap_is_full_` reuse) look coherent with the intended multi‑core data‑sync scan lifecycle. Also applies to: 4164-4215, 4228-4251, 4367-4385, 4595-4610 </blockquote></details> </blockquote></details> </details> <!-- This is an auto-generated comment by CodeRabbit for review status -->
tx_service/src/sk_generator.cpp
Outdated
|
|
||
| uint32_t core_rand = butil::fast_rand(); | ||
| // The scan slice request is dispatched to the first core. The first | ||
| // core tries to pin the slice if necessary and if succeeds, further | ||
| // dispatches the request to remaining cores for parallel scans. | ||
| cc_shards->EnqueueToCcShard(core_rand % core_cnt, &scan_req); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for SliceCoordinator definition
echo "=== Searching for SliceCoordinator ==="
fd -type f -name "*.h" -o -name "*.cpp" | xargs rg -l "SliceCoordinator" | head -20
# Search for RangePartitionDataSyncScanCc definition
echo "=== Searching for RangePartitionDataSyncScanCc ==="
rg -n "class RangePartitionDataSyncScanCc|struct RangePartitionDataSyncScanCc" --type=cpp --type=cc
# Search for ScanSliceCc references
echo "=== Searching for ScanSliceCc ==="
rg -n "class ScanSliceCc|struct ScanSliceCc" --type=cpp --type=cc
# Search for EnqueueToCcShard implementation
echo "=== Searching for EnqueueToCcShard ==="
rg -n "EnqueueToCcShard" --type=cpp --type=cc --type=h -A 3 | head -50Repository: eloqdata/tx_service
Length of output: 569
🏁 Script executed:
#!/bin/bash
# Look at the actual file to understand the dispatch context
echo "=== Content around line 357-362 in sk_generator.cpp ==="
find . -name sk_generator.cpp -type f | head -1 | xargs cat -n | sed -n '350,370p'
# Search for the Wait() call and its implementation
echo "=== Searching for Wait() implementation context ==="
rg -n "Wait\(\)" --type=cpp --type=cc -B 2 -A 2 | grep -A 10 -B 10 "363"Repository: eloqdata/tx_service
Length of output: 1184
🏁 Script executed:
#!/bin/bash
# Broader search for parallel scan coordination logic
echo "=== Search for coordination patterns in cc_request.h ==="
find . -name cc_request.h -type f | head -1 | xargs cat -n | head -100
# Search for how single core dispatch to multiple cores works
echo "=== Search for core dispatch patterns ==="
rg -n "first.*core|randomly.*core|dispatch.*remaining.*core" --type=cpp --type=h -C 3Repository: eloqdata/tx_service
Length of output: 21285
🏁 Script executed:
#!/bin/bash
# Find RangePartitionDataSyncScanCc and understand its structure
echo "=== Finding RangePartitionDataSyncScanCc definition ==="
rg -n "RangePartitionDataSyncScanCc" --type=cpp --type=h | head -20
# Look for the actual implementation of the scan request handling
echo "=== Searching for scan slice handling ==="
rg -n "ScanSliceCc|RangePartitionDataSyncScanCc" include/cc/cc_request.h -A 10 | head -50Repository: eloqdata/tx_service
Length of output: 1720
🏁 Script executed:
#!/bin/bash
# Find SliceCoordinator definition in RangePartitionDataSyncScanCc
echo "=== SliceCoordinator in RangePartitionDataSyncScanCc ==="
rg -n "struct SliceCoordinator|class SliceCoordinator" --type=h -A 20 | head -80
# Look for unfinished_core_cnt mechanism
echo "=== Unfinished core count mechanism ==="
rg -n "unfinished.*core|SetUnfinishedCoreCnt" --type=h -B 2 -A 2 | head -50
# Search for how remaining cores are dispatched to
echo "=== Core dispatch in execute function ==="
rg -n "for.*core_id|EnqueueCcRequest.*core" --type=h -C 2 | head -60Repository: eloqdata/tx_service
Length of output: 9109
🏁 Script executed:
#!/bin/bash
# Get RangePartitionDataSyncScanCc definition from cc_request.h
echo "=== RangePartitionDataSyncScanCc definition ==="
find . -name "cc_request.h" -type f | xargs sed -n '3990,4100p'Repository: eloqdata/tx_service
Length of output: 4034
🏁 Script executed:
#!/bin/bash
# Search for the Execute function for RangePartitionDataSyncScanCc in template_cc_map.h
echo "=== Execute for RangePartitionDataSyncScanCc ==="
find . -name "template_cc_map.h" -type f | xargs sed -n '5192,5250p'
# Look for the dispatch to remaining cores
echo "=== Dispatch to remaining cores ==="
find . -name "template_cc_map.h" -type f | xargs sed -n '3660,3700p'Repository: eloqdata/tx_service
Length of output: 3851
🏁 Script executed:
#!/bin/bash
# Search for SliceCoordinator specifically in template_cc_map.h around line 5365
echo "=== SliceCoordinator context ==="
find . -name "template_cc_map.h" -type f | xargs sed -n '5350,5420p'
# Search for the Wait() implementation in cc_request.h
echo "=== Wait() method ==="
rg -n "void Wait\(\)|bool Wait\(\)" --type=h -A 15 | head -50Repository: eloqdata/tx_service
Length of output: 6505
The SliceCoordinator mechanism correctly handles parallel scan coordination.
The single random core dispatch is intentional and works as designed. The selected core pins the slice(s) via SliceCoordinator, then re-dispatches the same request to all remaining cores for parallel scanning (see template_cc_map.h:3671-3677). The unfinished_cnt_ atomic counter is set to the total core count before dispatch and decremented by each core upon completion. When unfinished_cnt_ reaches zero, Wait() unblocks, ensuring all parallel work completes successfully.
However, the comment at lines 359-361 says "dispatched to the first core" but the code uses core_rand % core_cnt, selecting a random core. Consider clarifying the comment to say "dispatched to a randomly selected core" for accuracy.
🤖 Prompt for AI Agents
In tx_service/src/sk_generator.cpp around lines 357 to 362, the explanatory
comment incorrectly says "dispatched to the first core" though the code
dispatches to a randomly selected core via core_rand % core_cnt; update the
comment to state that the scan slice request is dispatched to a randomly
selected core which will pin the slice(s) then re-dispatch to remaining cores
for parallel scans, and keep the rest of the explanation about unfinished_cnt_
and Wait() behavior unchanged.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
core/src/storage_init.cppstore_handler/eloq_data_store_service/rocksdb_config.cpp
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
🔇 Additional comments (1)
store_handler/eloq_data_store_service/rocksdb_config.cpp (1)
282-283: LGTM! Proper fix for multiple definition issue.Changing from
DECLARE_stringtoDEFINE_stringcorrectly establishes this file as the definition site for AWS credential flags whenDATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3is defined. This pairs with the correspondingDECLARE_stringinstorage_init.cppto resolve the multiple definition linker error mentioned in the PR.
26a01c7 to
580c947
Compare
Here are some reminders before you submit the pull request
fixes eloqdb/tx_service#issue_id./mtr --suite=mono_main,mono_multi,mono_basicSummary by CodeRabbit
No user-facing changes in this release.
✏️ Tip: You can customize this high-level summary in your review settings.