feat: implement EloqStore backup snapshots and rename partition filter#378
feat: implement EloqStore backup snapshots and rename partition filter#378
Conversation
|
Caution Review failedThe pull request is closed. Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughRenames InitializePrewarmFilter → InitializePartitionFilter across factory header, factory implementation, and call site; implements a blocking, synchronous snapshot workflow in EloqStoreDataStore::CreateSnapshotForBackup; updates eloqstore submodule pointer. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant DataStore as EloqStoreDataStore
participant Exec as ExecSyncQueue
participant GlobalArchive as GlobalArchiveRequest
participant Guard as RequestGuard
Caller->>DataStore: CreateSnapshotForBackup(timestamp, req)
DataStore->>GlobalArchive: construct request with timestamp
DataStore->>Exec: enqueue & block on request
Exec->>GlobalArchive: execute request
GlobalArchive->>Guard: complete with KvError
Guard->>DataStore: notify completion / pass KvError
DataStore->>DataStore: map KvError → DataStoreError
DataStore->>Caller: SetFinish / return result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
store_handler/eloq_data_store_service/eloq_store_data_store.cpp
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp`:
- Around line 587-644: Replace the unbounded wait in SnapshotSyncData's usage
with a bounded wait_for: in the function creating SnapshotSyncData and queuing
global_archive_req (symbols: SnapshotSyncData, sync_data, callback,
eloq_store_service_->ExecAsyn), change the wait loop that calls
sync_data->cv_.wait(lk) to use sync_data->cv_.wait_for(lk, timeout) (choose a
reasonable timeout, e.g. a few seconds) and handle the timed-out case by
treating it as a snapshot failure—set sync_data->error_ or a local flag to
indicate timeout, break the wait loop, and ensure req->SetFinish returns
CREATE_SNAPSHOT_ERROR with an appropriate message when the wait_for expires so
the request doesn't block indefinitely and accounting is released.
🧹 Nitpick comments (1)
store_handler/eloq_data_store_service/data_store_factory.h (1)
114-119: Update the comment to match the renamed API.The doc still references “prewarm filter” and a
prewarm_filterparam, but the API now usesInitializePartitionFilterwithbucket_ids. Please align the brief/param names to avoid confusion.
| PoolableGuard req_guard(req); | ||
|
|
||
| struct SnapshotSyncData | ||
| { | ||
| bthread::Mutex mtx_; | ||
| bthread::ConditionVariable cv_; | ||
| bool finished_{false}; | ||
| ::eloqstore::KvError error_{::eloqstore::KvError::NoError}; | ||
| }; | ||
|
|
||
| auto sync_data = std::make_shared<SnapshotSyncData>(); | ||
| ::eloqstore::GlobalArchiveRequest global_archive_req; | ||
| global_archive_req.SetSnapshotTimestamp(req->GetBackupTs()); | ||
|
|
||
| auto callback = [sync_data](::eloqstore::KvRequest *kv_req) | ||
| { | ||
| ::eloqstore::KvError err = kv_req->Error(); | ||
| std::unique_lock<bthread::Mutex> lk(sync_data->mtx_); | ||
| sync_data->error_ = err; | ||
| sync_data->finished_ = true; | ||
| sync_data->cv_.notify_one(); | ||
| }; | ||
|
|
||
| bool queued = | ||
| eloq_store_service_->ExecAsyn(&global_archive_req, 0, callback); | ||
| if (!queued) | ||
| { | ||
| req->SetFinish(::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR, | ||
| "Failed to queue snapshot request"); | ||
| return; | ||
| } | ||
|
|
||
| std::unique_lock<bthread::Mutex> lk(sync_data->mtx_); | ||
| while (!sync_data->finished_) | ||
| { | ||
| sync_data->cv_.wait(lk); | ||
| } | ||
|
|
||
| ::EloqDS::remote::DataStoreError ds_error; | ||
| std::string error_msg; | ||
| switch (sync_data->error_) | ||
| { | ||
| case ::eloqstore::KvError::NoError: | ||
| ds_error = ::EloqDS::remote::DataStoreError::NO_ERROR; | ||
| break; | ||
| case ::eloqstore::KvError::NotRunning: | ||
| ds_error = ::EloqDS::remote::DataStoreError::DB_NOT_OPEN; | ||
| error_msg = "EloqStore not running"; | ||
| break; | ||
| default: | ||
| ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR; | ||
| error_msg = "Snapshot failed with error code: " + | ||
| std::to_string(static_cast<int>(sync_data->error_)); | ||
| break; | ||
| } | ||
|
|
||
| req->SetFinish(ds_error, error_msg); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Look for existing timeout patterns / GlobalArchiveRequest usage to align behavior
rg -n "GlobalArchiveRequest|CreateSnapshotForBackup|snapshot.*timeout" -g'*.{h,cc,cpp}'Repository: eloqdata/tx_service
Length of output: 10393
🏁 Script executed:
#!/bin/bash
# Search for ConditionVariable::wait_for usage patterns
rg -n "ConditionVariable::wait_for|cv_\.wait_for|\.wait_for.*Mutex" -g'*.{h,cc,cpp}'Repository: eloqdata/tx_service
Length of output: 1737
🏁 Script executed:
#!/bin/bash
# Search for other async callback patterns with timeouts
rg -n "ExecAsyn|ExecAsync" -g'*.{h,cc,cpp}' -A 10 | head -100Repository: eloqdata/tx_service
Length of output: 9826
Add a bounded wait to avoid indefinite blocking on snapshot completion.
The wait loop at line 620 uses cv_.wait() without a timeout. This pattern is inconsistent with the rest of the codebase, which uses wait_for() with explicit timeouts for similar async callback scenarios. If the callback never fires (shutdown, internal failure, lost callback), this call will block indefinitely and keep the write-request accounting elevated. Use wait_for() with a reasonable timeout and return a snapshot error when exceeded.
🤖 Prompt for AI Agents
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp` around lines
587 - 644, Replace the unbounded wait in SnapshotSyncData's usage with a bounded
wait_for: in the function creating SnapshotSyncData and queuing
global_archive_req (symbols: SnapshotSyncData, sync_data, callback,
eloq_store_service_->ExecAsyn), change the wait loop that calls
sync_data->cv_.wait(lk) to use sync_data->cv_.wait_for(lk, timeout) (choose a
reasonable timeout, e.g. a few seconds) and handle the timed-out case by
treating it as a snapshot failure—set sync_data->error_ or a local flag to
indicate timeout, break the wait loop, and ensure req->SetFinish returns
CREATE_SNAPSHOT_ERROR with an appropriate message when the wait_for expires so
the request doesn't block indefinitely and accounting is released.
802ee13 to
dd9abaf
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
store_handler/eloq_data_store_service/data_store_factory.h (1)
114-121: Update the documentation to reflect the rename.The Doxygen comment still references "prewarm filter" and has an incorrect
@param prewarm_filterthat doesn't match any actual parameter. The parameters areng_idandbucket_ids.📝 Proposed documentation fix
/** - * `@brief` Set the prewarm filter function for eloqstore - * `@param` prewarm_filter The prewarm filter function + * `@brief` Initialize the partition filter for eloqstore + * `@param` ng_id The node group ID + * `@param` bucket_ids Set of bucket IDs to include in the filter */ virtual void InitializePartitionFilter( uint32_t ng_id, std::unordered_set<uint16_t> &&bucket_ids) { }
8fd3b43 to
4f98be5
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp`:
- Around line 589-611: The callback captures stack-local SnapshotSyncData
(sync_data) and ExecAsyn is passed a pointer to stack-local global_archive_req,
which can cause use-after-free/dangling-pointer when the function returns;
change sync_data to a std::shared_ptr<SnapshotSyncData> and capture it by value
in the lambda, and allocate global_archive_req on the heap (or use the existing
EloqStoreOperationData pool pattern used by Read/BatchWriteRecords) so ExecAsyn
receives a pointer to a stable object; update any waiting logic and accesses
from sync_data.<member> to sync_data-> and ensure proper ownership/lifetime
semantics so the callback can safely run after the caller returns.
♻️ Duplicate comments (1)
store_handler/eloq_data_store_service/eloq_store_data_store.cpp (1)
619-623: Unbounded wait without timeout — already flagged in prior reviews.This wait loop lacks a timeout, which can cause indefinite blocking if the callback is never invoked. Prior review comments have already identified this issue and recommended using
wait_for()with a bounded timeout.
| struct SnapshotSyncData | ||
| { | ||
| bthread::Mutex mtx_; | ||
| bthread::ConditionVariable cv_; | ||
| bool finished_{false}; | ||
| ::eloqstore::KvError error_{::eloqstore::KvError::NoError}; | ||
| }; | ||
|
|
||
| SnapshotSyncData sync_data; | ||
| ::eloqstore::GlobalArchiveRequest global_archive_req; | ||
| global_archive_req.SetSnapshotTimestamp(req->GetBackupTs()); | ||
|
|
||
| auto callback = [&sync_data](::eloqstore::KvRequest *kv_req) | ||
| { | ||
| ::eloqstore::KvError err = kv_req->Error(); | ||
| std::unique_lock<bthread::Mutex> lk(sync_data.mtx_); | ||
| sync_data.error_ = err; | ||
| sync_data.finished_ = true; | ||
| sync_data.cv_.notify_one(); | ||
| }; | ||
|
|
||
| bool queued = | ||
| eloq_store_service_->ExecAsyn(&global_archive_req, 0, callback); |
There was a problem hiding this comment.
Lifetime hazard: stack-local objects captured by async callback.
Both sync_data and global_archive_req are stack-local, while the callback captures sync_data by reference and ExecAsyn receives a pointer to global_archive_req. This is currently safe only because the indefinite wait prevents the function from returning before the callback fires.
However, when the timeout fix (flagged in prior reviews) is applied, the function may return while the callback is still pending, causing:
- Use-after-free on
sync_datawhen the callback accessessync_data.mtx_,sync_data.error_, etc. - Dangling pointer in
kv_reqpointing to destroyedglobal_archive_req
Consider using std::shared_ptr<SnapshotSyncData> captured by value in the callback, and either heap-allocating global_archive_req or using the pool-based EloqStoreOperationData pattern consistent with other async operations in this file (e.g., Read, BatchWriteRecords).
🔧 Suggested approach using shared_ptr
- struct SnapshotSyncData
- {
- bthread::Mutex mtx_;
- bthread::ConditionVariable cv_;
- bool finished_{false};
- ::eloqstore::KvError error_{::eloqstore::KvError::NoError};
- };
-
- SnapshotSyncData sync_data;
+ struct SnapshotSyncData
+ {
+ bthread::Mutex mtx_;
+ bthread::ConditionVariable cv_;
+ bool finished_{false};
+ ::eloqstore::KvError error_{::eloqstore::KvError::NoError};
+ ::eloqstore::GlobalArchiveRequest req_;
+ };
+
+ auto sync_data = std::make_shared<SnapshotSyncData>();
+ sync_data->req_.SetSnapshotTimestamp(req->GetBackupTs());
- ::eloqstore::GlobalArchiveRequest global_archive_req;
- global_archive_req.SetSnapshotTimestamp(req->GetBackupTs());
- auto callback = [&sync_data](::eloqstore::KvRequest *kv_req)
+ auto callback = [sync_data](::eloqstore::KvRequest *kv_req)
{
::eloqstore::KvError err = kv_req->Error();
- std::unique_lock<bthread::Mutex> lk(sync_data.mtx_);
- sync_data.error_ = err;
- sync_data.finished_ = true;
- sync_data.cv_.notify_one();
+ std::unique_lock<bthread::Mutex> lk(sync_data->mtx_);
+ sync_data->error_ = err;
+ sync_data->finished_ = true;
+ sync_data->cv_.notify_one();
};
bool queued =
- eloq_store_service_->ExecAsyn(&global_archive_req, 0, callback);
+ eloq_store_service_->ExecAsyn(&sync_data->req_, 0, callback);Then update the wait loop and error handling to use sync_data-> instead of sync_data..
🤖 Prompt for AI Agents
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp` around lines
589 - 611, The callback captures stack-local SnapshotSyncData (sync_data) and
ExecAsyn is passed a pointer to stack-local global_archive_req, which can cause
use-after-free/dangling-pointer when the function returns; change sync_data to a
std::shared_ptr<SnapshotSyncData> and capture it by value in the lambda, and
allocate global_archive_req on the heap (or use the existing
EloqStoreOperationData pool pattern used by Read/BatchWriteRecords) so ExecAsyn
receives a pointer to a stable object; update any waiting logic and accesses
from sync_data.<member> to sync_data-> and ensure proper ownership/lifetime
semantics so the callback can safely run after the caller returns.
4f98be5 to
3f4aef5
Compare
Implement CreateSnapshotForBackup in EloqStoreDataStore using GlobalArchiveRequest for consistent snapshot creation. Rename InitializePrewarmFilter to InitializePartitionFilter to reflect generalized partition filtering beyond prewarming.
3f4aef5 to
1940600
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| #include "eloq_store_data_store.h" | ||
|
|
||
| #include <bthread/condition_variable.h> | ||
| #include <bthread/mutex.h> |
There was a problem hiding this comment.
Unused bthread includes added to file
Low Severity
The includes <bthread/condition_variable.h> and <bthread/mutex.h> were added but are not used anywhere in eloq_store_data_store.cpp. The new CreateSnapshotForBackup implementation uses synchronous ExecSync without any bthread synchronization primitives. These unused includes add clutter and may slightly impact compilation time.
| break; | ||
| } | ||
|
|
||
| req->SetFinish(ds_error, error_msg); |
There was a problem hiding this comment.
Backup files never populated in snapshot response
Medium Severity
The CreateSnapshotForBackup implementation never calls AddBackupFile() to populate the backup files list in the response. The RocksDB Cloud implementation shows the expected pattern: call req->AddBackupFile(...) before SetFinish() to inform callers what was created. Callers that pass a backup_files vector (as shown in DataStoreServiceClient::CreateSnapshotForBackup) will receive an empty list even on success, which could cause downstream issues for backup management or restore operations.
Implement CreateSnapshotForBackup in EloqStoreDataStore using GlobalArchiveRequest for consistent snapshot creation. Rename InitializePrewarmFilter to InitializePartitionFilter to reflect generalized partition filtering beyond prewarming.
Here are some reminders before you submit the pull request
fixes eloqdb/tx_service#issue_id./mtr --suite=mono_main,mono_multi,mono_basicNote
CreateSnapshotForBackupinEloqStoreDataStoreusingGlobalArchiveRequest, with mapped error handling (NO_ERROR,DB_NOT_OPEN,CREATE_SNAPSHOT_ERROR).InitializePrewarmFiltertoInitializePartitionFilterinDataStoreFactory, updates call site inDataStoreService::OpenDataStore, and setspartition_filterinEloqStoreDataStoreFactoryto gate tables by computedbucket_id(range vs hash partition handling).Written by Cursor Bugbot for commit 1940600. This will update automatically on new commits. Configure here.
Summary by CodeRabbit
Bug Fixes
Refactor
Chores
✏️ Tip: You can customize this high-level summary in your review settings.