feat: move HTTP work (curl) from shard threads to dedicated threads#295
feat: move HTTP work (curl) from shard threads to dedicated threads#295thweetkomputer merged 3 commits intomainfrom
Conversation
WalkthroughAdds a CloudStorageService and integrates it across EloqStore, CloudStoreMgr, ObjectStore, AsyncHttpManager, shards, and file-GC; introduces cloud concurrency/thread options, moves HTTP submission to service-managed worker queues, and adds cloud-slot coordination and shard-aware task routing. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant CloudMgr as CloudStoreMgr
participant ObjStore as ObjectStore
participant CloudSvc as CloudStorageService
participant HttpMgr as AsyncHttpManager
Client->>CloudMgr: AcquireCloudSlot(task)
CloudMgr->>CloudMgr: allocate/check inflight slots
Client->>ObjStore: SubmitTask(task, owner_shard)
ObjStore->>CloudSvc: Submit(store, task)
CloudSvc->>CloudSvc: enqueue job to worker queue (pending_jobs)
CloudSvc->>CloudSvc: RunWorker -> dequeue job
CloudSvc->>ObjStore: request StartHttpRequest(task)
ObjStore->>HttpMgr: SubmitRequest(task)
HttpMgr->>HttpMgr: RunHttpWork / perform HTTP
HttpMgr->>CloudSvc: OnTaskFinished(task)
CloudSvc->>CloudMgr: NotifyTaskFinished -> EnqueueCloudReadyTask(task)
CloudMgr->>CloudMgr: ProcessCloudReadyTasks(shard) -> ReleaseCloudSlot
CloudMgr->>Client: Finish/continue original KvTask
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
e2b985f to
359c506
Compare
18f6269 to
5c7451a
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/cloud_storage_service.cpp`:
- Around line 108-121: In CloudStorageService::NotifyTaskFinished, replace the
unsafe reinterpret_cast<CloudStoreMgr *>(owner->IoManager()) with a type-safe
static_cast because CloudStoreMgr is a derived class in the AsyncIoManager
hierarchy; locate the cast expression in NotifyTaskFinished (where owner is a
Shard* and IoManager() is called), change to
static_cast<CloudStoreMgr*>(owner->IoManager()), and keep the subsequent call to
cloud_mgr->EnqueueCloudReadyTask(task->kv_task_) unchanged.
In `@src/file_gc.cpp`:
- Around line 139-141: The SubmitTask calls use an undefined identifier shard;
fetch a valid Shard* and pass it into SubmitTask instead. Locate the call sites
around cloud_mgr->AcquireCloudSlot(current_task),
cloud_mgr->GetObjectStore().SubmitTask(&list_task, shard),
current_task->WaitIo() and replace shard with a retrieved pointer (e.g., obtain
via a member or accessor on current_task such as current_task->GetShard() or via
CloudStoreMgr like cloud_mgr->GetShardForTask(current_task) /
cloud_mgr->GetActiveShard()), and apply the same fix to the other occurrences
(around lines 220-222 and 432-434) so SubmitTask receives a real Shard*.
In `@src/kv_options.cpp`:
- Around line 199-208: The config loader may set max_cloud_concurrency to 0
causing CloudStoreMgr::AcquireCloudSlot to deadlock; after reading both
max_cloud_concurrency and cloud_request_threads, clamp max_cloud_concurrency to
at least 1 and optionally cap it to cloud_request_threads if that value is
positive. Locate the code that reads values via reader.HasValue/GetUnsigned and
the variables max_cloud_concurrency and cloud_request_threads, then add logic to
enforce max_cloud_concurrency = max(1u, max_cloud_concurrency) and if
cloud_request_threads > 0 set max_cloud_concurrency = min(max_cloud_concurrency,
cloud_request_threads) so max_cloud_slots_ can never be zero.
🧹 Nitpick comments (1)
src/cloud_storage_service.cpp (1)
53-72: Sentinels enqueued before checking service state.Sentinel jobs are enqueued to all queues (lines 56-59) before checking if the service was actually running (line 60). If
Stop()is called without a priorStart(), sentinels accumulate in queues with no consumers.This is benign since the queues will be destroyed with the service, but consider reordering for clarity:
♻️ Suggested reordering
void CloudStorageService::Stop() { bool was_running = !stopping_.exchange(true, std::memory_order_acq_rel); + if (!was_running && workers_.empty()) + { + return; + } for (auto &queue : job_queues_) { queue.enqueue({nullptr, nullptr}); } - if (!was_running && workers_.empty()) - { - return; - } for (auto &worker : workers_) {
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
CMakeLists.txtinclude/async_io_manager.hinclude/cloud_storage_service.hinclude/eloq_store.hinclude/kv_options.hinclude/storage/object_store.hinclude/storage/shard.hsrc/async_io_manager.cppsrc/cloud_storage_service.cppsrc/eloq_store.cppsrc/file_gc.cppsrc/kv_options.cppsrc/storage/object_store.cppsrc/storage/shard.cppsrc/tasks/background_write.cpp
✅ Files skipped from review due to trivial changes (1)
- src/tasks/background_write.cpp
🚧 Files skipped from review as they are similar to previous changes (3)
- include/storage/shard.h
- src/storage/shard.cpp
- include/eloq_store.h
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-16T08:55:51.185Z
Learnt from: thweetkomputer
Repo: eloqdata/eloqstore PR: 293
File: src/tasks/write_task.cpp:393-413
Timestamp: 2026-01-16T08:55:51.185Z
Learning: In C++ code, when you store raw pointers to objects managed by std::shared_ptr (e.g., MappingSnapshot*) in a container for iteration (via .get()), ensure the owning shared_ptrs remain alive for the duration of the iteration and any use of shared_from_this. In TriggerFileGC() in src/tasks/write_task.cpp, mapping_snapshots_ contains MappingSnapshot* obtained with .get(), and the underlying MappingSnapshot instances are owned by PageMapper::mapping_ inside RootMeta, which stays alive during GC. Prefer documenting the ownership and lifetime, and consider using weak_ptr or a safe dereference pattern if lifetime could end, to avoid use-after-free or dangling pointers.
Applied to files:
src/eloq_store.cppsrc/file_gc.cppsrc/kv_options.cppsrc/cloud_storage_service.cppsrc/async_io_manager.cppsrc/storage/object_store.cpp
🧬 Code graph analysis (6)
src/eloq_store.cpp (2)
include/async_io_manager.h (1)
cloud_service_(649-659)include/storage/object_store.h (2)
cloud_service_(207-208)cloud_service_(310-316)
include/async_io_manager.h (4)
src/cloud_storage_service.cpp (2)
CloudStorageService(19-32)CloudStorageService(34-37)include/storage/shard.h (1)
Shard(31-222)src/async_io_manager.cpp (9)
CloudStoreMgr(1964-1984)AcquireCloudSlot(2339-2350)AcquireCloudSlot(2339-2339)ReleaseCloudSlot(2352-2361)ReleaseCloudSlot(2352-2352)EnqueueCloudReadyTask(2363-2370)EnqueueCloudReadyTask(2363-2363)ProcessCloudReadyTasks(2372-2392)ProcessCloudReadyTasks(2372-2372)include/storage/object_store.h (2)
cloud_service_(207-208)cloud_service_(310-316)
src/cloud_storage_service.cpp (5)
include/cloud_storage_service.h (2)
store_(44-55)worker_count_(48-48)src/async_io_manager.cpp (6)
Stop(2233-2244)Stop(2233-2233)Submit(805-820)Submit(805-805)Submit(2246-2249)Submit(2246-2246)src/storage/shard.cpp (4)
Stop(164-169)Stop(164-164)Start(129-162)Start(129-129)src/eloq_store.cpp (4)
Stop(477-514)Stop(477-477)Start(170-245)Start(170-170)include/async_io_manager.h (1)
Submit(666-666)
include/cloud_storage_service.h (3)
include/kv_options.h (1)
eloqstore(11-239)include/eloq_store.h (1)
EloqStore(406-420)src/storage/object_store.cpp (2)
ObjectStore(854-874)ObjectStore(876-887)
src/storage/object_store.cpp (3)
include/async_io_manager.h (1)
cloud_service_(649-659)include/storage/object_store.h (2)
cloud_service_(207-208)cloud_service_(310-316)src/storage/shard.cpp (2)
OnTaskFinished(462-488)OnTaskFinished(462-462)
include/storage/object_store.h (4)
src/storage/shard.cpp (3)
Shard(25-35)OnTaskFinished(462-488)OnTaskFinished(462-462)src/cloud_storage_service.cpp (2)
CloudStorageService(19-32)CloudStorageService(34-37)src/storage/object_store.cpp (14)
ObjectStore(854-874)ObjectStore(876-887)SubmitTask(909-917)SubmitTask(909-909)StartHttpRequest(919-923)StartHttpRequest(919-919)RunHttpWork(925-933)RunHttpWork(925-925)HttpWorkIdle(935-942)HttpWorkIdle(935-935)AsyncHttpManager(944-965)AsyncHttpManager(967-974)OnTaskFinished(1484-1487)OnTaskFinished(1484-1484)include/async_io_manager.h (1)
cloud_service_(649-659)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (39)
src/kv_options.cpp (1)
347-348: Equality update looks good.Including the new fields in
operator==keeps state comparisons accurate.include/kv_options.h (1)
118-127: Defaults and docs align with new cloud request model.include/cloud_storage_service.h (1)
35-52: Confirm PendingJob pointer lifetimes across worker threads.
PendingJobholds rawObjectStore*andTask*. Please ensureStop()drains queues and that stores/tasks outlive worker threads; otherwise a UAF is possible. If the lifetime isn’t strict, consider stronger ownership or explicit lifetime docs.include/async_io_manager.h (4)
38-39: No issues to flag.
465-467: Constructor injection for CloudStorageService looks good.
523-526: Cloud slot / ready-task APIs are clear and cohesive.
633-655: Queue + slot tracking fields align with the new flow.include/storage/object_store.h (7)
37-38: No issues to flag.
44-63: New ObjectStore/HTTP APIs look consistent with the service split.
99-103: Verify owner_shard_ lifetime across async HTTP completion.
owner_shard_is a raw pointer set before submission and then used after HTTP completion. Please confirm the shard always outlives the task completion path and that the handoff provides the necessary synchronization.
207-207: No issues to flag.
253-253: Constructor signature update is coherent with the new service.
293-293: Task-finished hook declaration looks good.
310-310: No issues to flag.CMakeLists.txt (1)
98-103: Build integration looks good.Adding
src/cloud_storage_service.cppensures the new service is compiled into the library.src/eloq_store.cpp (5)
19-20: No review needed for the include change.
66-82: Solid option validation for cloud mode.The constraints on
max_cloud_concurrencyandmax_upload_batchhelp prevent invalid configs.
156-159: Cloud service initialization in cloud mode looks good.
207-210: Service start hook is correctly gated.
498-501: Verify shutdown ordering with cloud service.Line 498 stops shard threads before
cloud_service_->Stop(). If the cloud service can still deliver task completions back to shards, this ordering can race with shard teardown. Please confirmCloudStorageService::Stop()drains/joins worker threads without touching shards after they stop, or consider stopping the cloud service before stopping shards.src/storage/object_store.cpp (3)
29-31: No review needed for the include change.
854-874: Constructor wiring looks correct.
909-942: Verify completion path always signals KvTask waiters.
SubmitTaskincrementskv_task_->inflight_io_, and completion is now delegated viaCloudStorageService::NotifyTaskFinished()(including early error paths and cleanup). Please confirmNotifyTaskFinished()always invokes the equivalent ofFinishIo()(or otherwise decrements/informs waiters) for all paths to avoid hangingWaitIo()calls.Also applies to: 1103-1155, 1456-1456, 1478-1487
src/async_io_manager.cpp (8)
36-38: No review needed for include additions.
89-91: CloudStoreMgr creation now correctly passes the cloud service.
1964-1975: Constructor updates look good.
1992-1995: ObjectStore registration on init is correct.
2240-2243: ObjectStore unregistration on stop looks appropriate.
2339-2392: Cloud slot throttling APIs look consistent.
2931-2941: Batched read loop looks fine.
3059-3076: Upload path integrates cloud slots correctly.src/cloud_storage_service.cpp (8)
1-17: LGTM!The includes and namespace setup are appropriate. The
kIdleWait{10}constant provides a reasonable polling interval for balancing responsiveness and CPU usage.
19-32: LGTM!Constructor properly validates the store pointer, handles edge cases for shard count and worker count, and initializes data structures appropriately.
34-37: LGTM!The destructor delegates to
Stop()which is idempotent, ensuring proper cleanup regardless of prior state.
39-51: LGTM!The
Start()method correctly uses compare-exchange to ensure idempotent startup and spawns the configured number of worker threads.
74-95: LGTM!Registration and unregistration are properly bounds-checked and mutex-protected. The asymmetric error handling (logging on registration failure, silent return on unregistration) is reasonable given that registration failures indicate configuration issues.
97-106: LGTM!The
Submit()method validates inputs with CHECK macros, correctly routes tasks to workers based on shard ownership for locality, and uses appropriate memory ordering for the counter.
180-199: LGTM!The strided iteration efficiently distributes shard processing across workers. The short critical section (only pointer read under lock) maximizes concurrency. The design correctly assumes
ObjectStorelifetime is managed externally and outlives the service.
123-178: Shutdown ordering is correctly enforced; no changes needed.The verification confirms that
CloudStorageService::Stop()properly waits for worker threads to complete viaworker.join()before the shards (and their associatedObjectStoreinstances) are destroyed. InEloqStore::Stop(), the sequence is: (1) stop shards, (2) callcloud_service_->Stop()which joins all workers, (3) clear shards. This ensures the worker drain loop and finalProcessHttpWork()calls complete while threads are still running, andObjectStoreinstances remain valid for the entire worker lifetime.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
5c7451a to
d680360
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/cloud_storage_service.cpp`:
- Around line 97-106: CloudStorageService::Submit currently increments
pending_jobs_ and enqueues tasks without checking whether the service is
stopping, which can leak pending_jobs_ if Stop() has shut down workers; modify
Submit to check the service state (e.g., a boolean stopping_ or is_stopped()
used by Stop()) before incrementing pending_jobs_ and enqueuing: return/abort
submission when stopping_ is true, or if you must accept late submissions ensure
you decrement pending_jobs_ and do not enqueue when stopping_ is set. Update
Submit to read stopping_ under the same synchronization used by Stop(), and
reference CloudStorageService::Stop, pending_jobs_, job_queues_, and
owner->shard_id_ when implementing the guard.
♻️ Duplicate comments (1)
src/cloud_storage_service.cpp (1)
108-121: Changereinterpret_casttostatic_castfor type-safe downcast.Since
CloudStoreMgris a derived class ofAsyncIoManager, usestatic_castfor the downcast instead ofreinterpret_cast.♻️ Suggested fix
Shard *owner = task->owner_shard_; - auto *cloud_mgr = reinterpret_cast<CloudStoreMgr *>(owner->IoManager()); + auto *cloud_mgr = static_cast<CloudStoreMgr *>(owner->IoManager()); cloud_mgr->EnqueueCloudReadyTask(task->kv_task_);
🧹 Nitpick comments (5)
src/tasks/background_write.cpp (1)
282-282: Minor style inconsistency with line 97.Line 97 uses
this->tbl_ident_while this line now usestbl_ident_. Consider using a consistent member access style within the same function.Suggested fix for consistency
Either remove
this->from line 97:- LOG(INFO) << "begin compaction on " << this->tbl_ident_; + LOG(INFO) << "begin compaction on " << tbl_ident_;Or keep
this->on line 282:- LOG(INFO) << "finish compaction on " << tbl_ident_; + LOG(INFO) << "finish compaction on " << this->tbl_ident_;src/cloud_storage_service.cpp (3)
53-72: Minor inefficiency: sentinels enqueued before early-return check.When Stop() is called on an already-stopped service, sentinels are still enqueued to all queues (lines 56-59) before the early-return check on line 60. Consider moving the early-return check before enqueueing.
♻️ Suggested optimization
void CloudStorageService::Stop() { bool was_running = !stopping_.exchange(true, std::memory_order_acq_rel); + if (!was_running && workers_.empty()) + { + return; + } for (auto &queue : job_queues_) { queue.enqueue({nullptr, nullptr}); } - if (!was_running && workers_.empty()) - { - return; - } for (auto &worker : workers_) {
87-95: Consider consistent error handling with RegisterObjectStore.
RegisterObjectStorelogs an error whenshard_idis out of range, butUnregisterObjectStoresilently returns. Consider adding a similar log for consistency.
168-178: Remove commented-out code.Dead/commented code adds noise. If this alternative implementation is needed for reference, consider documenting the design decision elsewhere or removing it entirely.
♻️ Suggested fix
job.store->StartHttpRequest(job.task); } } - - // PendingJob job; - // while (job_queues_[worker_index].try_dequeue(job)) - // { - // if (job.store != nullptr && job.task != nullptr) - // { - // pending_jobs_.fetch_sub(1, std::memory_order_relaxed); - // job.store->StartHttpRequest(job.task); - // } - // } - // ProcessHttpWork(worker_index); }include/storage/object_store.h (1)
99-103: Consider documenting the ownership semantics ofowner_shard_.The raw pointer
owner_shard_requires careful lifetime management. Based on learnings from this codebase, documenting the ownership invariant (e.g., "must outlive the Task") helps prevent use-after-free issues.
5160c27 to
18fb225
Compare
70f4cb0 to
b5094b5
Compare
Here are some reminders before you submit the pull request
fixes eloqdb/eloqstore#issue_idctest --test-dir build/tests/Summary by CodeRabbit
New Features
Improvements
Chores
Tests
✏️ Tip: You can customize this high-level summary in your review settings.