Conversation
WalkthroughThe PR removes prewarm lifecycle and related APIs, adds snapshot and standby-checkpoint event handlers (OnSnapshotReceived, OnUpdateStandbyCkptTs, OnShutdown), refactors cloud-mode detection and data-store interfaces (IsCloudMode, ReloadDataFromCloud), adds single-node detection, and propagates a data-store-write flag through checkpoint broadcasts. Changes
Sequence Diagram(s)sequenceDiagram
participant Primary as Primary Node
participant Checkpointer
participant DataSyncStatus
participant StandbyService as Standby Service
participant StandbyNode as Standby Node
participant SnapshotMgr as Snapshot Manager
participant DataStoreHandler
participant DataStore
Primary->>Checkpointer: flush completes (with persisted KV)
Checkpointer->>DataSyncStatus: MarkDataStoreWrite()
Checkpointer->>StandbyService: BrocastPrimaryCkptTs(..., has_data_store_write=true)
StandbyService->>StandbyNode: send UpdateStandbyCkptTs(has_data_store_write)
StandbyNode->>DataStoreHandler: OnUpdateStandbyCkptTs(ng_id, term)
DataStoreHandler->>DataStore: ReloadDataFromCloud(term)
DataStore-->>DataStoreHandler: reload result
SnapshotMgr->>StandbyService: snapshot ready
StandbyService->>Primary: OnSnapshotSyncedRequest(ng_id, snapshot_path)
Primary->>DataStoreHandler: OnSnapshotReceived(ng_id, bucket_ids, snapshot_path)
DataStoreHandler->>DataStore: open/reload snapshot
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
fae35f1 to
6d82100
Compare
Implement DataStoreService::OnSnapshotReceived and add func DataStoreHandler::StandbyReloadData Implement api to call datastore reload data from cloud update DataStoreServiceClient::IsSharedStorage() Complete the EloqStoreDataStore::ReloadDataFromCloud For single node, always start eloqstore with term 0 Add arg in DataSyncStatus to indentify whether has data flushed to datastore in this period of checkpoint Refactor OnSnapshotSyncedRequest: add ng_id.
9add740 to
c955b90
Compare
| ${ELOQ_STORE_SOURCE_DIR}/src/tasks/task.cpp | ||
| ${ELOQ_STORE_SOURCE_DIR}/src/tasks/task_manager.cpp | ||
| ${ELOQ_STORE_SOURCE_DIR}/src/tasks/write_task.cpp | ||
| ${ELOQ_STORE_SOURCE_DIR}/src/tasks/reopen_task.cpp |
There was a problem hiding this comment.
already have, remove
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
store_handler/data_store_service_client.cpp (1)
3835-3859:⚠️ Potential issue | 🔴 CriticalGuard snapshot apply with standby-term validation before reload.
OnSnapshotReceivedapplies/reloads snapshot data without checking whetherreq->standby_node_term()is still current. A delayed/stale callback can reload old data into the shard. The RocksDB path already guards this (store_handler/rocksdb_handler.cpp, Line 3528 onward).🔧 Suggested fix
bool DataStoreServiceClient::OnSnapshotReceived( const txservice::remote::OnSnapshotSyncedRequest *req) { - // TODO(lzx): implement this for eloqstore data store. - if (data_store_service_ != nullptr) - { - uint32_t ng_id = req->ng_id(); + if (req == nullptr) + { + LOG(ERROR) << "OnSnapshotReceived got null request"; + return false; + } + + if (txservice::Sharder::Instance().CandidateStandbyNodeTerm() != + req->standby_node_term()) + { + LOG(WARNING) << "Ignore stale snapshot callback. standby_term=" + << req->standby_node_term(); + return false; + } + + if (data_store_service_ != nullptr) + { + uint32_t ng_id = req->ng_id(); std::unordered_set<uint16_t> bucket_ids; for (auto &[bucket_id, bucket_info] : bucket_infos_) { if (bucket_info->BucketOwner() == ng_id) { bucket_ids.insert(bucket_id); } } int64_t term = txservice::PrimaryTermFromStandbyTerm(req->standby_node_term()); data_store_service_->OnSnapshotReceived( ng_id, term, std::move(bucket_ids), req->snapshot_path()); return true; } - - return true; + return false; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/data_store_service_client.cpp` around lines 3835 - 3859, In DataStoreServiceClient::OnSnapshotReceived, guard the snapshot apply by validating req->standby_node_term() is still current before calling data_store_service_->OnSnapshotReceived: compute term using txservice::PrimaryTermFromStandbyTerm(req->standby_node_term()) (as already done), obtain the current standby/primary term from the local state (the same source used by RocksDB handler), compare them and only proceed to build bucket_ids and call data_store_service_->OnSnapshotReceived(ng_id, term, std::move(bucket_ids), req->snapshot_path()) if the terms match; otherwise skip applying the snapshot to avoid reloading stale data.
🧹 Nitpick comments (2)
store_handler/eloq_data_store_service/eloq_store_data_store.h (1)
99-105: Consider adding null-check toScanDeleteOperationData::Clear()for consistency.
EloqStoreOperationData::Clear()now guards against nulldata_store_request_ptr_, butScanDeleteOperationData::Clear()(lines 99-105) does not. IfScanDeleteOperationData::Clear()can be called whendata_store_request_ptr_is null, this could cause a null dereference.♻️ Suggested fix
void Clear() override { + if (data_store_request_ptr_) + { - data_store_request_ptr_->Clear(); - data_store_request_ptr_->Free(); - data_store_request_ptr_ = nullptr; + data_store_request_ptr_->Clear(); + data_store_request_ptr_->Free(); + data_store_request_ptr_ = nullptr; + } entries_.clear(); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/eloq_data_store_service/eloq_store_data_store.h` around lines 99 - 105, ScanDeleteOperationData::Clear currently calls data_store_request_ptr_->Clear() and ->Free() without checking for null; add the same null-guard used in EloqStoreOperationData::Clear by testing if (data_store_request_ptr_) before invoking Clear() and Free(), then set data_store_request_ptr_ = nullptr and call entries_.clear(); ensure you only call Clear/Free when data_store_request_ptr_ is non-null to avoid a potential null dereference.store_handler/eloq_data_store_service/eloq_store_data_store.cpp (1)
882-883: Remove stale TODO in an already-implemented path.The TODO now conflicts with the actual implementation and can mislead future maintenance.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp` around lines 882 - 883, Remove the stale TODO comment in the EloqStoreDataStore::ReloadDataFromCloud implementation: delete the line "// TODO(lzx): implement this for eloqstore data store." that sits immediately before the LOG(INFO) << "EloqStoreDataStore::ReloadDataFromCloud, term: " << term; so the code no longer claims the path is unimplemented while it logs the reload action.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 3861-3869: The method
DataStoreServiceClient::OnUpdateStandbyCkptTs currently always returns true even
when data_store_service_ is null, hiding failures; change it to return false
when data_store_service_ == nullptr and only return success when the update was
actually invoked: check data_store_service_ and if null return false, otherwise
call data_store_service_->OnUpdateStandbyCkptTs(ng_id, ng_term) and return its
boolean result if that method returns a bool, or return true only after
successfully invoking it; update callers/tests if they rely on the previous
always-true behavior.
In `@store_handler/eloq_data_store_service/build_eloq_store.cmake`:
- Line 127: ELOQ_STORE_SOURCES contains a duplicated entry for
src/tasks/reopen_task.cpp; open build_eloq_store.cmake, find the
ELOQ_STORE_SOURCES list and remove the redundant occurrence of reopen_task.cpp
(the duplicate at the later line), leaving only the single original entry so the
list is deduplicated and the build generator/toolchain won’t hit edge-case
issues.
In `@store_handler/eloq_data_store_service/data_store_service_config.cpp`:
- Around line 210-215: The public Topology::GetShardCount() (and its caller
DataStoreServiceClusterManager::GetShardCount()) should not assert(false);
instead mark the methods deprecated and leave them safe to call: remove the
assert(false) from Topology::GetShardCount(), add a [[deprecated("...")]]
attribute to the declarations/definitions of Topology::GetShardCount and
DataStoreServiceClusterManager::GetShardCount, and have Topology::GetShardCount
return a valid value (e.g., return GetAllShards().size() or shard_count_) so
future callers won’t crash while the API is being phased out.
In `@store_handler/eloq_data_store_service/data_store_service.cpp`:
- Around line 2259-2269: The OnSnapshotReceived function currently aborts the
process for local data store mode via LOG(FATAL); instead change this branch to
fail gracefully by returning an error status or logging an error without
crashing: in the OnSnapshotReceived implementation (referencing
OnSnapshotReceived, data_store_factory_, data_shards_) replace the LOG(FATAL)
path with a non-fatal handling that logs the missing implementation (use
LOG(ERROR) or VLOG) and returns a controlled failure (e.g., status code, bool,
or exception consistent with surrounding API) so callers can handle the error
until local snapshot loading is implemented; ensure any resources are left
consistent and add a TODO comment pointing to the future local-mode snapshot
replacement logic.
- Around line 2280-2286: The call sites invoking
ds_ref.data_store_->ReloadDataFromCloud(term) (e.g., in OnSnapshotReceived) must
be guarded: first null-check ds_ref.data_store_ before dereferencing, and only
call ReloadDataFromCloud when the shard’s state is stable (check ds_ref.state
equals ReadOnly or ReadWrite) to avoid triggering reloads while the shard is
transitioning (e.g., Starting); apply the same null- and state-guard to the
other reload site noted (around the other ReloadDataFromCloud call).
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp`:
- Around line 924-926: In OnReLoaded replace the incorrect address-of logging
(&global_reopen_req) with the actual request pointer or its contents: use
DLOG(INFO) << "... reopen request: " << global_reopen_req (or, if you want the
object's fields, check for null and log *global_reopen_req or a suitable
toString()/DebugString() method). Update the DLOG call in
EloqStoreDataStore::OnReLoaded to avoid taking the address of the local pointer
variable and instead emit the pointer value or serialized request for correct
diagnostics.
In `@store_handler/eloq_data_store_service/rocksdb_data_store_common.h`:
- Around line 286-290: Change the default log-only implementation of
RocksDBDataStoreCommon::ReloadDataFromCloud(int64_t term) to a pure virtual
declaration so derived classes must implement it; replace the existing body that
logs the not-implemented message with a pure-virtual signature (e.g., virtual
void ReloadDataFromCloud(int64_t term) = 0;) and then update any derived classes
missing an override to implement the method (or explicitly provide a no-op) to
satisfy the new abstract requirement.
---
Outside diff comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 3835-3859: In DataStoreServiceClient::OnSnapshotReceived, guard
the snapshot apply by validating req->standby_node_term() is still current
before calling data_store_service_->OnSnapshotReceived: compute term using
txservice::PrimaryTermFromStandbyTerm(req->standby_node_term()) (as already
done), obtain the current standby/primary term from the local state (the same
source used by RocksDB handler), compare them and only proceed to build
bucket_ids and call data_store_service_->OnSnapshotReceived(ng_id, term,
std::move(bucket_ids), req->snapshot_path()) if the terms match; otherwise skip
applying the snapshot to avoid reloading stale data.
---
Nitpick comments:
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp`:
- Around line 882-883: Remove the stale TODO comment in the
EloqStoreDataStore::ReloadDataFromCloud implementation: delete the line "//
TODO(lzx): implement this for eloqstore data store." that sits immediately
before the LOG(INFO) << "EloqStoreDataStore::ReloadDataFromCloud, term: " <<
term; so the code no longer claims the path is unimplemented while it logs the
reload action.
In `@store_handler/eloq_data_store_service/eloq_store_data_store.h`:
- Around line 99-105: ScanDeleteOperationData::Clear currently calls
data_store_request_ptr_->Clear() and ->Free() without checking for null; add the
same null-guard used in EloqStoreOperationData::Clear by testing if
(data_store_request_ptr_) before invoking Clear() and Free(), then set
data_store_request_ptr_ = nullptr and call entries_.clear(); ensure you only
call Clear/Free when data_store_request_ptr_ is non-null to avoid a potential
null dereference.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (27)
store_handler/data_store_service_client.cppstore_handler/data_store_service_client.hstore_handler/eloq_data_store_service/build_eloq_store.cmakestore_handler/eloq_data_store_service/data_store.hstore_handler/eloq_data_store_service/data_store_factory.hstore_handler/eloq_data_store_service/data_store_service.cppstore_handler/eloq_data_store_service/data_store_service.hstore_handler/eloq_data_store_service/data_store_service_config.cppstore_handler/eloq_data_store_service/data_store_service_config.hstore_handler/eloq_data_store_service/eloq_store_data_store.cppstore_handler/eloq_data_store_service/eloq_store_data_store.hstore_handler/eloq_data_store_service/eloq_store_data_store_factory.hstore_handler/eloq_data_store_service/rocksdb_cloud_data_store_factory.hstore_handler/eloq_data_store_service/rocksdb_data_store_common.hstore_handler/eloq_data_store_service/rocksdb_data_store_factory.htx_service/include/cc/object_cc_map.htx_service/include/data_sync_task.htx_service/include/proto/cc_request.prototx_service/include/standby.htx_service/include/store/data_store_handler.htx_service/src/cc/local_cc_shards.cpptx_service/src/checkpointer.cpptx_service/src/data_sync_task.cpptx_service/src/fault/cc_node.cpptx_service/src/remote/cc_node_service.cpptx_service/src/standby.cpptx_service/src/store/snapshot_manager.cpp
| bool DataStoreServiceClient::OnUpdateStandbyCkptTs(uint32_t ng_id, | ||
| int64_t ng_term) | ||
| { | ||
| if (data_store_service_ != nullptr) | ||
| { | ||
| data_store_service_->OnUpdateStandbyCkptTs(ng_id, ng_term); | ||
| } | ||
| return true; | ||
| } |
There was a problem hiding this comment.
Don’t report success when standby checkpoint update is skipped.
OnUpdateStandbyCkptTs always returns true, even when data_store_service_ == nullptr and no update is sent. This hides failures from callers and weakens retry/error handling.
🔧 Suggested fix
bool DataStoreServiceClient::OnUpdateStandbyCkptTs(uint32_t ng_id,
int64_t ng_term)
{
- if (data_store_service_ != nullptr)
- {
- data_store_service_->OnUpdateStandbyCkptTs(ng_id, ng_term);
- }
- return true;
+ if (data_store_service_ == nullptr)
+ {
+ LOG(WARNING) << "OnUpdateStandbyCkptTs skipped: data_store_service_ is null";
+ return false;
+ }
+ data_store_service_->OnUpdateStandbyCkptTs(ng_id, ng_term);
+ return true;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@store_handler/data_store_service_client.cpp` around lines 3861 - 3869, The
method DataStoreServiceClient::OnUpdateStandbyCkptTs currently always returns
true even when data_store_service_ is null, hiding failures; change it to return
false when data_store_service_ == nullptr and only return success when the
update was actually invoked: check data_store_service_ and if null return false,
otherwise call data_store_service_->OnUpdateStandbyCkptTs(ng_id, ng_term) and
return its boolean result if that method returns a bool, or return true only
after successfully invoking it; update callers/tests if they rely on the
previous always-true behavior.
| int Topology::GetShardCount() const | ||
| { | ||
| // deprecated, use GetAllShards().size() instead | ||
| assert(false); | ||
| return shard_count_; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd store_handler/eloq_data_store_service && wc -l data_store_service_config.cppRepository: eloqdata/tx_service
Length of output: 97
🏁 Script executed:
cd store_handler/eloq_data_store_service && sed -n '210,230p' data_store_service_config.cppRepository: eloqdata/tx_service
Length of output: 563
🏁 Script executed:
cd store_handler/eloq_data_store_service && sed -n '520,530p' data_store_service_config.cppRepository: eloqdata/tx_service
Length of output: 333
🏁 Script executed:
rg -n "GetShardCount\s*\(" --type cpp store_handler/eloq_data_store_service/Repository: eloqdata/tx_service
Length of output: 612
🏁 Script executed:
rg -rn "GetShardCount\s*\(" --type cppRepository: eloqdata/tx_service
Length of output: 746
🏁 Script executed:
rg -n "GetShardCount" --type cpp --type h -B 2 -A 2 | head -100Repository: eloqdata/tx_service
Length of output: 3403
🏁 Script executed:
rg -rn "\.GetShardCount\(\)|->GetShardCount\(\)" --type cpp --type hRepository: eloqdata/tx_service
Length of output: 237
🏁 Script executed:
cd tx_service/src/remote && sed -n '1375,1385p' remote_cc_request.cppRepository: eloqdata/tx_service
Length of output: 511
🏁 Script executed:
rg -n "cluster_manager.*GetShardCount\(\)|manager.*GetShardCount\(\)|dss_manager.*GetShardCount\(\)" --type cpp --type hRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
grep -r "GetShardCount" --include="*.cpp" --include="*.h" tx_service/ | grep -v "test" | grep -v "^Binary"Repository: eloqdata/tx_service
Length of output: 320
🏁 Script executed:
cd tx_service/src/remote && sed -n '1370,1390p' remote_cc_request.cpp | head -30Repository: eloqdata/tx_service
Length of output: 897
🏁 Script executed:
cd tx_service/include/cc && grep -n "class.*CCRequest" cc_request.h | head -5Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
cd tx_service/include/cc && sed -n '2740,2755p' cc_request.hRepository: eloqdata/tx_service
Length of output: 401
🏁 Script executed:
rg -rn "DataStoreServiceClusterManager" --type cpp --type h | grep -i "getshardcount"Repository: eloqdata/tx_service
Length of output: 159
🏁 Script executed:
rg -rn "manager\s*->\s*GetShardCount\(\)|manager\s*\.\s*GetShardCount\(\)" --type cpp --type hRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
cd store_handler && grep -rn "GetShardCount" --include="*.cpp" --include="*.h"Repository: eloqdata/tx_service
Length of output: 542
🏁 Script executed:
rg -rn "DataStoreServiceClusterManager" --type cpp --type h | head -20Repository: eloqdata/tx_service
Length of output: 1821
🏁 Script executed:
rg -n "DataStoreServiceClusterManager" store_handler/eloq_data_store_service/data_store_service_config.h | head -10Repository: eloqdata/tx_service
Length of output: 349
🏁 Script executed:
cd store_handler && rg -n "\.GetShardCount\(\)" --type cpp --type h -B 3Repository: eloqdata/tx_service
Length of output: 444
🏁 Script executed:
cd store_handler && rg -n "GetShardCount" --type h data_store_service_client.hRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
cd store_handler && grep -rn "\.GetShardCount()" --include="*.cpp" --include="*.h" .Repository: eloqdata/tx_service
Length of output: 160
Deprecation via assert(false) is problematic even though currently unused.
The assert(false) will crash any future callers in debug builds. While no external callers of DataStoreServiceClusterManager::GetShardCount() exist in the codebase today, the method is public (declared in the header at line 399) and the deprecation approach invites runtime crashes rather than compile-time warnings.
The pattern of using assert(false) for a deprecated public method is poor: it creates a time bomb for any future code that calls it. Consider either:
- Removing both methods (
DataStoreServiceClusterManager::GetShardCount()andTopology::GetShardCount()) if truly deprecated - Using compiler deprecation attributes (e.g.,
[[deprecated]]) instead of assert - Keeping only the deprecation comment without the assert if removal is premature
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@store_handler/eloq_data_store_service/data_store_service_config.cpp` around
lines 210 - 215, The public Topology::GetShardCount() (and its caller
DataStoreServiceClusterManager::GetShardCount()) should not assert(false);
instead mark the methods deprecated and leave them safe to call: remove the
assert(false) from Topology::GetShardCount(), add a [[deprecated("...")]]
attribute to the declarations/definitions of Topology::GetShardCount and
DataStoreServiceClusterManager::GetShardCount, and have Topology::GetShardCount
return a valid value (e.g., return GetAllShards().size() or shard_count_) so
future callers won’t crash while the API is being phased out.
| const std::string &snapshot_path) | ||
| { | ||
| #ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE | ||
| auto &ds_ref = data_shards_.at(shard_id); | ||
|
|
||
| if (!data_store_factory_->IsCloudMode()) | ||
| { | ||
| // TODO(lzx): Handle the case of local data store mode. (replace the | ||
| // data with the snapshot) | ||
| LOG(FATAL) | ||
| << "OnSnapshotReceived not implemented for local data store mode"; |
There was a problem hiding this comment.
Avoid aborting the process for local snapshot handling.
LOG(FATAL) here turns an unimplemented branch into a service crash path. This should fail gracefully and preserve process availability.
💡 Safer fallback until local-mode snapshot load is implemented
- LOG(FATAL)
- << "OnSnapshotReceived not implemented for local data store mode";
+ LOG(ERROR)
+ << "OnSnapshotReceived local mode is not implemented yet, shard "
+ << shard_id << ", snapshot_path: " << snapshot_path;
+ return;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@store_handler/eloq_data_store_service/data_store_service.cpp` around lines
2259 - 2269, The OnSnapshotReceived function currently aborts the process for
local data store mode via LOG(FATAL); instead change this branch to fail
gracefully by returning an error status or logging an error without crashing: in
the OnSnapshotReceived implementation (referencing OnSnapshotReceived,
data_store_factory_, data_shards_) replace the LOG(FATAL) path with a non-fatal
handling that logs the missing implementation (use LOG(ERROR) or VLOG) and
returns a controlled failure (e.g., status code, bool, or exception consistent
with surrounding API) so callers can handle the error until local snapshot
loading is implemented; ensure any resources are left consistent and add a TODO
comment pointing to the future local-mode snapshot replacement logic.
| else | ||
| { | ||
| LOG(INFO) | ||
| << "OnSnapshotReceived, reload data from cloud for DSS shard " | ||
| << shard_id << " and term " << term; | ||
| ds_ref.data_store_->ReloadDataFromCloud(term); | ||
| return; |
There was a problem hiding this comment.
Guard cloud reload calls to stable shard states.
Both call sites can invoke ReloadDataFromCloud() while the shard is transitioning (e.g., Starting). Gate reloads to ReadOnly/ReadWrite and null-check data_store_ before dereference.
🔒 Suggested state/pointer guard
- else
+ else if (ds_ref.shard_status_.load(std::memory_order_acquire) ==
+ DSShardStatus::ReadOnly ||
+ ds_ref.shard_status_.load(std::memory_order_acquire) ==
+ DSShardStatus::ReadWrite)
{
LOG(INFO)
<< "OnSnapshotReceived, reload data from cloud for DSS shard "
<< shard_id << " and term " << term;
+ if (ds_ref.data_store_ == nullptr)
+ {
+ LOG(ERROR) << "OnSnapshotReceived skipped: data_store_ is null for shard "
+ << shard_id;
+ return;
+ }
ds_ref.data_store_->ReloadDataFromCloud(term);
return;
}
+ else
+ {
+ LOG(INFO) << "OnSnapshotReceived skipped while shard is transitioning, shard "
+ << shard_id;
+ }- if (ds_ref.data_store_ != nullptr &&
- ds_ref.shard_status_.load() != DSShardStatus::Closed)
+ auto shard_status = ds_ref.shard_status_.load(std::memory_order_acquire);
+ if ((shard_status == DSShardStatus::ReadOnly ||
+ shard_status == DSShardStatus::ReadWrite) &&
+ ds_ref.data_store_ != nullptr)Also applies to: 2298-2305
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@store_handler/eloq_data_store_service/data_store_service.cpp` around lines
2280 - 2286, The call sites invoking
ds_ref.data_store_->ReloadDataFromCloud(term) (e.g., in OnSnapshotReceived) must
be guarded: first null-check ds_ref.data_store_ before dereferencing, and only
call ReloadDataFromCloud when the shard’s state is stable (check ds_ref.state
equals ReadOnly or ReadWrite) to avoid triggering reloads while the shard is
transitioning (e.g., Starting); apply the same null- and state-guard to the
other reload site noted (around the other ReloadDataFromCloud call).
| int Topology::GetShardCount() const | ||
| { | ||
| // deprecated, use GetAllShards().size() instead | ||
| assert(false); |
There was a problem hiding this comment.
if deprecated, why not just delete
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
store_handler/data_store_service_client.cpp (1)
3835-3859:⚠️ Potential issue | 🟠 MajorDon’t return success when snapshot sync is skipped (Lines 3838-3859).
OnSnapshotReceivedreportstrueeven whendata_store_service_is null and no snapshot is applied, which masks failure and weakens retry/error handling.🔧 Suggested fix
bool DataStoreServiceClient::OnSnapshotReceived( const txservice::remote::OnSnapshotSyncedRequest *req) { `#ifdef` DATA_STORE_TYPE_ELOQDSS_ELOQSTORE - if (data_store_service_ != nullptr) + if (req == nullptr || data_store_service_ == nullptr) + { + LOG(WARNING) << "OnSnapshotReceived skipped: " + << (req == nullptr ? "request is null" + : "data_store_service_ is null"); + return false; + } + if (data_store_service_ != nullptr) { uint32_t ng_id = req->ng_id(); std::unordered_set<uint16_t> bucket_ids; @@ data_store_service_->OnSnapshotReceived( ng_id, term, std::move(bucket_ids), req->snapshot_path()); return true; } `#endif` - return true; + return false; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/data_store_service_client.cpp` around lines 3835 - 3859, The method DataStoreServiceClient::OnSnapshotReceived currently always returns true even when data_store_service_ is null and no snapshot is applied; change the control flow so that if data_store_service_ is nullptr the function returns false (indicating the snapshot was not handled) instead of true, and only return true after successfully calling data_store_service_->OnSnapshotReceived with the computed ng_id, term (from txservice::PrimaryTermFromStandbyTerm(req->standby_node_term())), and the moved bucket_ids set built from bucket_infos_; ensure you still return true on success and false when skipping/applying fails.
🧹 Nitpick comments (1)
store_handler/data_store_service_client.cpp (1)
3831-3833: Define explicit shutdown behavior forOnShutdown()(Lines 3831-3833).A no-op shutdown hook is easy to forget and can hide resource/lifecycle gaps. Either perform minimal cleanup here (for example, close bound data stores) or add a clear invariant comment explaining why no cleanup is required.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/data_store_service_client.cpp` around lines 3831 - 3833, The OnShutdown() method in class DataStoreServiceClient is currently a no-op; either implement explicit minimal cleanup (e.g., close or flush bound data stores, release network/IPC handles, cancel timers, and set a shutdown flag) inside DataStoreServiceClient::OnShutdown(), or add a clear invariant comment inside the method explaining why no cleanup is required (e.g., lifetime managed elsewhere, all resources are RAII and already closed, and this method is never called after resource release). Reference the DataStoreServiceClient class and OnShutdown() to locate where to add the logic or the explanatory comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 3822-3823: The call to Connect() during the follower transition is
unchecked, so failures leave follower initialization partially applied; update
the follower transition code that calls Connect() to capture its return/result,
handle errors by logging via the existing logger (include the error) and
propagate failure (return an error status or throw) instead of continuing, and
ensure any partial state changes are rolled back or marked as not-initialized
when Connect() fails; reference the Connect() call and the follower
initialization/transition path to locate and fix the flow.
---
Outside diff comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 3835-3859: The method DataStoreServiceClient::OnSnapshotReceived
currently always returns true even when data_store_service_ is null and no
snapshot is applied; change the control flow so that if data_store_service_ is
nullptr the function returns false (indicating the snapshot was not handled)
instead of true, and only return true after successfully calling
data_store_service_->OnSnapshotReceived with the computed ng_id, term (from
txservice::PrimaryTermFromStandbyTerm(req->standby_node_term())), and the moved
bucket_ids set built from bucket_infos_; ensure you still return true on success
and false when skipping/applying fails.
---
Nitpick comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 3831-3833: The OnShutdown() method in class DataStoreServiceClient
is currently a no-op; either implement explicit minimal cleanup (e.g., close or
flush bound data stores, release network/IPC handles, cancel timers, and set a
shutdown flag) inside DataStoreServiceClient::OnShutdown(), or add a clear
invariant comment inside the method explaining why no cleanup is required (e.g.,
lifetime managed elsewhere, all resources are RAII and already closed, and this
method is never called after resource release). Reference the
DataStoreServiceClient class and OnShutdown() to locate where to add the logic
or the explanatory comment.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
store_handler/data_store_service_client.cpptx_service/src/fault/cc_node.cpp
🚧 Files skipped from review as they are similar to previous changes (1)
- tx_service/src/fault/cc_node.cpp
| Connect(); | ||
| } |
There was a problem hiding this comment.
Handle Connect() failure in follower transition (Line 3822).
Connect() can fail, but this path ignores the result, leaving follower initialization partially applied without a failure signal/log.
🔧 Suggested fix
- Connect();
+ if (!Connect())
+ {
+ LOG(ERROR) << "OnStartFollowing failed to connect data store, ng_id="
+ << ng_id << ", term=" << term;
+ return;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Connect(); | |
| } | |
| if (!Connect()) | |
| { | |
| LOG(ERROR) << "OnStartFollowing failed to connect data store, ng_id=" | |
| << ng_id << ", term=" << term; | |
| return; | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@store_handler/data_store_service_client.cpp` around lines 3822 - 3823, The
call to Connect() during the follower transition is unchecked, so failures leave
follower initialization partially applied; update the follower transition code
that calls Connect() to capture its return/result, handle errors by logging via
the existing logger (include the error) and propagate failure (return an error
status or throw) instead of continuing, and ensure any partial state changes are
rolled back or marked as not-initialized when Connect() fails; reference the
Connect() call and the follower initialization/transition path to locate and fix
the flow.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
store_handler/eloq_data_store_service/eloq_store_data_store.cpp (1)
924-926:⚠️ Potential issue | 🟡 MinorFix pointer logging in
OnReLoaded.Line [926] logs
&global_reopen_req, which is the address of the local pointer variable, not the request pointer value.Suggested fix
DLOG(INFO) << "===============EloqStoreDataStore::OnReLoaded, reopen request: " - << &global_reopen_req << " from EloqStore"; + << global_reopen_req << " from EloqStore";🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp` around lines 924 - 926, The log in EloqStoreDataStore::OnReLoaded is printing the address of the local pointer variable (&global_reopen_req) instead of the actual request pointer; change the DLOG call to log global_reopen_req (or its contents) rather than &global_reopen_req so the logged value is the request pointer (or a meaningful representation), e.g., use global_reopen_req or format its fields when calling DLOG in OnReLoaded.store_handler/eloq_data_store_service/data_store_service.cpp (2)
2264-2269:⚠️ Potential issue | 🟠 MajorDo not abort the process for unimplemented local snapshot handling.
Line [2268] uses
LOG(FATAL), which crashes the service for a currently unimplemented branch. Return gracefully instead.Suggested fix
- LOG(FATAL) - << "OnSnapshotReceived not implemented for local data store mode"; + LOG(ERROR) + << "OnSnapshotReceived local mode is not implemented yet, shard " + << shard_id << ", snapshot_path: " << snapshot_path; + return;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/eloq_data_store_service/data_store_service.cpp` around lines 2264 - 2269, The OnSnapshotReceived path currently aborts the process when data_store_factory_->IsCloudMode() is false; replace the LOG(FATAL) crash with graceful handling: remove the LOG(FATAL) call in the non-cloud branch, log a warning or info (e.g., via LOG(WARNING)) that local snapshot handling is unimplemented, preserve the TODO comment, and return early from OnSnapshotReceived so the service does not terminate; ensure the change is made in the branch guarded by data_store_factory_->IsCloudMode().
2283-2294:⚠️ Potential issue | 🟠 MajorGuard cloud reload calls to stable shard states and bound the wait loop.
ReloadDataFromCloud()is invoked without a strict stable-state guard, and theStartingwait loop is unbounded. This can block indefinitely and trigger reload during transitions.Suggested fix
- while (ds_ref.shard_status_.load(std::memory_order_acquire) == - DSShardStatus::Starting) + constexpr int kMaxWaitMs = 5000; + int waited_ms = 0; + while (ds_ref.shard_status_.load(std::memory_order_acquire) == + DSShardStatus::Starting && + waited_ms < kMaxWaitMs) { bthread_usleep(1000); - LOG(INFO) << "OnSnapshotReceived, data store is starting, " - "waiting for data store to be ready"; + waited_ms += 1; } - LOG(INFO) - << "OnSnapshotReceived, reload data from cloud for DSS shard " - << shard_id << " and term " << term; - ds_ref.data_store_->ReloadDataFromCloud(term); + auto status = ds_ref.shard_status_.load(std::memory_order_acquire); + if ((status == DSShardStatus::ReadOnly || + status == DSShardStatus::ReadWrite) && + ds_ref.data_store_ != nullptr) + { + LOG(INFO) << "OnSnapshotReceived, reload data from cloud for DSS shard " + << shard_id << " and term " << term; + ds_ref.data_store_->ReloadDataFromCloud(term); + } + else + { + LOG(WARNING) << "OnSnapshotReceived skipped reload for shard " + << shard_id << ", status: " + << static_cast<int>(status); + } return;- if (ds_ref.data_store_ != nullptr && - ds_ref.shard_status_.load() != DSShardStatus::Closed) + auto status = ds_ref.shard_status_.load(std::memory_order_acquire); + if ((status == DSShardStatus::ReadOnly || + status == DSShardStatus::ReadWrite) && + ds_ref.data_store_ != nullptr) { LOG(INFO) << "StandbySyncAndReloadData reload data from cloud for DSS shard " << shard_id; ds_ref.data_store_->ReloadDataFromCloud(term);Also applies to: 2307-2314
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/eloq_data_store_service/data_store_service.cpp` around lines 2283 - 2294, The current OnSnapshotReceived path calls ds_ref.data_store_->ReloadDataFromCloud(term) without ensuring the shard is in a stable running/ready state and uses an unbounded wait loop on ds_ref.shard_status_ == DSShardStatus::Starting; change this by replacing the infinite wait with a bounded wait (e.g., max iterations or timeout using bthread_usleep) and only call ReloadDataFromCloud when the shard_status_ is a stable state such as DSShardStatus::Running or DSShardStatus::Ready (check your enum for the exact stable value), and if the timeout expires log and abort the reload; apply the same guarded, timed-wait pattern to the other occurrence around lines 2307-2314 that also uses ds_ref.shard_status_ and ReloadDataFromCloud.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp`:
- Around line 886-889: EloqStoreDataStore::ReloadDataFromCloud currently uses
LOG(FATAL) on a term mismatch which aborts the process; change this to a
non-fatal failure path: replace the LOG(FATAL) usage with a safer log level
(e.g., LOG(ERROR) or LOG(WARNING)) that includes the same context (expected term
and eloq_store_service_->Term()) and then return from ReloadDataFromCloud so the
reload fails gracefully without crashing the process.
---
Duplicate comments:
In `@store_handler/eloq_data_store_service/data_store_service.cpp`:
- Around line 2264-2269: The OnSnapshotReceived path currently aborts the
process when data_store_factory_->IsCloudMode() is false; replace the LOG(FATAL)
crash with graceful handling: remove the LOG(FATAL) call in the non-cloud
branch, log a warning or info (e.g., via LOG(WARNING)) that local snapshot
handling is unimplemented, preserve the TODO comment, and return early from
OnSnapshotReceived so the service does not terminate; ensure the change is made
in the branch guarded by data_store_factory_->IsCloudMode().
- Around line 2283-2294: The current OnSnapshotReceived path calls
ds_ref.data_store_->ReloadDataFromCloud(term) without ensuring the shard is in a
stable running/ready state and uses an unbounded wait loop on
ds_ref.shard_status_ == DSShardStatus::Starting; change this by replacing the
infinite wait with a bounded wait (e.g., max iterations or timeout using
bthread_usleep) and only call ReloadDataFromCloud when the shard_status_ is a
stable state such as DSShardStatus::Running or DSShardStatus::Ready (check your
enum for the exact stable value), and if the timeout expires log and abort the
reload; apply the same guarded, timed-wait pattern to the other occurrence
around lines 2307-2314 that also uses ds_ref.shard_status_ and
ReloadDataFromCloud.
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp`:
- Around line 924-926: The log in EloqStoreDataStore::OnReLoaded is printing the
address of the local pointer variable (&global_reopen_req) instead of the actual
request pointer; change the DLOG call to log global_reopen_req (or its contents)
rather than &global_reopen_req so the logged value is the request pointer (or a
meaningful representation), e.g., use global_reopen_req or format its fields
when calling DLOG in OnReLoaded.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f9a1fb7a-feb8-4c11-a076-ade3a0bf63f2
📒 Files selected for processing (2)
store_handler/eloq_data_store_service/data_store_service.cppstore_handler/eloq_data_store_service/eloq_store_data_store.cpp
| LOG(FATAL) << "EloqStoreDataStore::ReloadDataFromCloud, term mismatch, " | ||
| "expected: " | ||
| << term << ", actual: " << eloq_store_service_->Term(); | ||
| return; |
There was a problem hiding this comment.
Avoid crashing the process on term mismatch in reload path.
Line [886] uses LOG(FATAL), which turns a stale/late reload event into a full process abort. This should fail gracefully and return.
Suggested fix
- LOG(FATAL) << "EloqStoreDataStore::ReloadDataFromCloud, term mismatch, "
- "expected: "
- << term << ", actual: " << eloq_store_service_->Term();
+ LOG(ERROR) << "EloqStoreDataStore::ReloadDataFromCloud, term mismatch, "
+ "expected: "
+ << term << ", actual: " << eloq_store_service_->Term();
return;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| LOG(FATAL) << "EloqStoreDataStore::ReloadDataFromCloud, term mismatch, " | |
| "expected: " | |
| << term << ", actual: " << eloq_store_service_->Term(); | |
| return; | |
| LOG(ERROR) << "EloqStoreDataStore::ReloadDataFromCloud, term mismatch, " | |
| "expected: " | |
| << term << ", actual: " << eloq_store_service_->Term(); | |
| return; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp` around lines
886 - 889, EloqStoreDataStore::ReloadDataFromCloud currently uses LOG(FATAL) on
a term mismatch which aborts the process; change this to a non-fatal failure
path: replace the LOG(FATAL) usage with a safer log level (e.g., LOG(ERROR) or
LOG(WARNING)) that includes the same context (expected term and
eloq_store_service_->Term()) and then return from ReloadDataFromCloud so the
reload fails gracefully without crashing the process.
Here are some reminders before you submit the pull request
fixes eloqdb/tx_service#issue_id./mtr --suite=mono_main,mono_multi,mono_basicSummary by CodeRabbit
New Features
Bug Fixes & Improvements