Use brpc Module Interface to refactor ThreadGroup loop#232
Use brpc Module Interface to refactor ThreadGroup loop#232xiexiaoy merged 8 commits intoeloqdata:mainfrom
Conversation
WalkthroughCentralizes thread identity via new Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant LocalThread
participant BThreadTLS as bthread::tls_task_group
Caller->>LocalThread: ID()
alt ELOQ_MODULE_ENABLED
LocalThread->>BThreadTLS: inspect tls_task_group
alt has tls_task_group
LocalThread-->>Caller: return group_id_
else no tls
LocalThread-->>Caller: return -1
end
else not enabled
LocalThread-->>Caller: return thread-local static id (or -1)
end
sequenceDiagram
autonumber
participant App
participant ServiceExec as ServiceExecutorCoroutine
participant Module as MongoModule
participant TG as ThreadGroup
participant Worker
App->>ServiceExec: start(reservedThreads)
alt ELOQ_MODULE_ENABLED
ServiceExec->>Module: Init(threadGroups)
loop per worker
ServiceExec->>Worker: spawn & LocalThread::SetID
Worker->>Module: ExtThdStart(id)
loop work loop
Module->>Worker: HasTask/Process(id)
Worker->>TG: TryDequeueBulk(resume/tasks)
Worker-->>Worker: execute tasks (use resume/lifecycle functors)
end
Worker->>Module: ExtThdEnd(id)
end
else legacy path
loop workers
ServiceExec->>Worker: spawn legacy loop (CV wake)
Worker->>TG: dequeue/resume via legacy queues
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🔇 Additional comments (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
fc3ee7e to
97ae802
Compare
There was a problem hiding this comment.
Actionable comments posted: 13
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/mongo/transport/service_executor_coroutine.cpp (2)
300-318: Add bounds check for threadGroupId in schedule().Indexing _threadGroups without validation can OOB.
Apply:
Status ServiceExecutorCoroutine::schedule(Task task, ScheduleFlags flags, ServiceExecutorTaskName taskName, uint16_t threadGroupId) { MONGO_LOG(3) << "schedule with group id: " << threadGroupId; if (!_stillRunning.load(std::memory_order_relaxed)) { return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; } + if (threadGroupId >= _threadGroups.size()) { + return Status{ErrorCodes::BadValue, "Invalid threadGroupId"}; + } _threadGroups[threadGroupId].enqueueTask(std::move(task)); return Status::OK(); }
321-325: Avoid dangling reference in coroutine resume functors; capture task by value.Current lambdas capture const Task& which can dangle.
Apply:
std::function<void()> ServiceExecutorCoroutine::coroutineResumeFunctor(uint16_t threadGroupId, const Task& task) { invariant(threadGroupId < _threadGroups.size()); - return [thd_group = &_threadGroups[threadGroupId], &task]() { thd_group->resumeTask(task); }; + return [thd_group = &_threadGroups[threadGroupId], t = task]() { thd_group->resumeTask(t); }; } std::function<void()> ServiceExecutorCoroutine::coroutineLongResumeFunctor(uint16_t threadGroupId, const Task& task) { invariant(threadGroupId < _threadGroups.size()); - return [thd_group = &_threadGroups[threadGroupId], &task]() { thd_group->enqueueTask(task); }; + return [thd_group = &_threadGroups[threadGroupId], t = task]() { thd_group->enqueueTask(t); }; }Also applies to: 327-331
🧹 Nitpick comments (5)
src/mongo/util/concurrency/thread_name.cpp (1)
146-147: Formatting change seems unrelated to PR objectives.This multi-line formatting adjustment is unrelated to the thread ID refactoring described in the PR title. Consider keeping formatting changes separate from functional refactors for clearer commit history.
src/mongo/base/local_thread_state.h (1)
32-38: Consider adding documentation for the LocalThread API.The
LocalThreadstruct introduces a new threading abstraction but lacks documentation explaining:
- When
ID()returns -1 and what that signifies- Why
SetID()is conditionally compiled (only whenELOQ_MODULE_ENABLEDis not defined)- The purpose and usage of
IsBThread()- Thread safety guarantees
Adding brief comments would help future maintainers understand the threading model.
src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp (1)
274-277: Consider error handling for gflags configuration.The
SetCommandLineOptioncalls can fail if the flags don't exist or the values are invalid. Consider:
- Checking return values and logging warnings/errors if configuration fails
- Documenting why these specific values are needed when
ELOQ_MODULE_ENABLEDis definedExample:
#ifdef ELOQ_MODULE_ENABLED - GFLAGS_NAMESPACE::SetCommandLineOption("use_pthread_event_dispatcher", "true"); - GFLAGS_NAMESPACE::SetCommandLineOption("worker_polling_time_us", "100000"); // 100ms + if (!GFLAGS_NAMESPACE::SetCommandLineOption("use_pthread_event_dispatcher", "true")) { + log() << "Warning: Failed to set use_pthread_event_dispatcher flag"; + } + if (!GFLAGS_NAMESPACE::SetCommandLineOption("worker_polling_time_us", "100000")) { + log() << "Warning: Failed to set worker_polling_time_us flag"; + } #endifsrc/mongo/base/local_thread_state.cpp (1)
33-37: Document asymmetric API: SetID() only available in non-ELOQ mode.
LocalThread::SetID()is only available whenELOQ_MODULE_ENABLEDis not defined, whileLocalThread::ID()is available in both modes. This asymmetry may confuse users.Consider either:
- Adding a comment explaining why SetID() is not needed in ELOQ mode (presumably because the bthread task group manages IDs automatically).
- Providing a no-op or error-throwing SetID() implementation for ELOQ mode to maintain API consistency.
src/mongo/transport/service_executor_coroutine.cpp (1)
108-119: Minor: memory order choice for _extWorkerActive.Consider acquire load in notifyIfAsleep to pair with release store in ExtThdStart/End, though relaxed is likely fine here.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (24)
src/mongo/SConscript(1 hunks)src/mongo/base/local_thread_state.cpp(1 hunks)src/mongo/base/local_thread_state.h(1 hunks)src/mongo/base/string_data.h(2 hunks)src/mongo/db/catalog/database_holder_impl.cpp(5 hunks)src/mongo/db/coro_sync.cpp(4 hunks)src/mongo/db/kill_sessions_local.cpp(3 hunks)src/mongo/db/modules/eloq/CMakeLists.txt(1 hunks)src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp(1 hunks)src/mongo/db/modules/eloq/src/eloq_recovery_unit.cpp(1 hunks)src/mongo/db/modules/eloq/store_handler(1 hunks)src/mongo/db/server_options_server_helpers.cpp(1 hunks)src/mongo/db/session.cpp(1 hunks)src/mongo/db/session_catalog.cpp(2 hunks)src/mongo/db/stats/top.cpp(3 hunks)src/mongo/db/storage/kv/kv_storage_engine.cpp(2 hunks)src/mongo/scripting/mozjs/proxyscope.cpp(3 hunks)src/mongo/scripting/mozjs/proxyscope.h(2 hunks)src/mongo/transport/SConscript(1 hunks)src/mongo/transport/service_executor_coroutine.cpp(7 hunks)src/mongo/transport/service_executor_coroutine.h(2 hunks)src/mongo/transport/service_state_machine.cpp(4 hunks)src/mongo/transport/service_state_machine.h(1 hunks)src/mongo/util/concurrency/thread_name.cpp(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (11)
src/mongo/db/coro_sync.cpp (1)
src/mongo/base/local_thread_state.cpp (2)
ID(21-31)ID(21-21)
src/mongo/db/session.cpp (1)
src/mongo/base/local_thread_state.cpp (2)
ID(21-31)ID(21-21)
src/mongo/db/modules/eloq/src/eloq_recovery_unit.cpp (1)
src/mongo/base/local_thread_state.cpp (2)
ID(21-31)ID(21-21)
src/mongo/db/stats/top.cpp (1)
src/mongo/base/local_thread_state.cpp (2)
ID(21-31)ID(21-21)
src/mongo/scripting/mozjs/proxyscope.h (1)
src/mongo/db/coro_sync.h (2)
Mutex(20-33)ConditionVariable(35-92)
src/mongo/db/kill_sessions_local.cpp (1)
src/mongo/db/client.cpp (2)
setCurrent(169-172)setCurrent(169-169)
src/mongo/db/catalog/database_holder_impl.cpp (1)
src/mongo/base/local_thread_state.cpp (2)
ID(21-31)ID(21-21)
src/mongo/db/storage/kv/kv_storage_engine.cpp (1)
src/mongo/base/local_thread_state.cpp (2)
ID(21-31)ID(21-21)
src/mongo/db/session_catalog.cpp (1)
src/mongo/base/local_thread_state.cpp (2)
ID(21-31)ID(21-21)
src/mongo/transport/service_executor_coroutine.cpp (3)
src/mongo/util/concurrency/thread_name.cpp (2)
setThreadName(114-158)setThreadName(114-114)src/mongo/transport/service_executor_coroutine.h (1)
_ongoingCoroutineCnt(104-104)src/mongo/base/local_thread_state.cpp (2)
SetID(34-36)SetID(34-34)
src/mongo/transport/service_executor_coroutine.h (3)
src/mongo/base/local_thread_state.h (1)
mongo(9-30)src/mongo/transport/service_executor.h (6)
mongo(39-126)transport(43-122)std(108-110)std(112-115)void(117-119)Status(83-88)src/mongo/transport/service_executor_coroutine.cpp (27)
ExtThdStart(44-52)ExtThdStart(44-44)ExtThdEnd(54-59)ExtThdEnd(54-54)Process(61-81)Process(61-61)HasTask(83-87)HasTask(83-83)ServiceExecutorCoroutine(179-183)init(90-96)init(90-90)enqueueTask(98-101)enqueueTask(98-98)resumeTask(103-106)resumeTask(103-103)notifyIfAsleep(108-119)notifyIfAsleep(108-108)trySleep(130-170)trySleep(130-130)terminate(172-176)terminate(172-172)setTxServiceFunctors(121-123)setTxServiceFunctors(121-121)isBusy(125-127)isBusy(125-125)_startWorker(204-283)_startWorker(204-204)
🔇 Additional comments (40)
src/mongo/db/server_options_server_helpers.cpp (1)
598-599: String literal split is valid Splitting the long log message into adjacent literals is purely cosmetic—C++ concatenates them at compile time with no functional change.src/mongo/transport/service_state_machine.h (2)
286-286: LGTM! Caching page size improves performance.Adding
_osPageSizeas a member avoids repeatedgetpagesize()calls in the coroutine stack setup path. Theconstqualifier is appropriate since page size is fixed for the process lifetime.
285-286: AI summary inconsistency:kCoroStackSizewas not removed.The AI summary claims
kCoroStackSizewas removed from the header, but line 285 shows it's still present and in use. The actual change is the addition of_osPageSizeat line 286 to cache the page size value.src/mongo/transport/service_state_machine.cpp (4)
262-262: Consider validating the page size value.While
getpagesize()rarely fails, it could return 0 in edge cases. Consider adding a sanity check to ensure a valid page size (typically 4096 or larger).- _osPageSize{::getpagesize()}, + _osPageSize{[] { + auto pageSize = ::getpagesize(); + invariant(pageSize > 0 && pageSize <= kCoroStackSize); + return pageSize; + }()},
272-272: LGTM! Using cached page size is more efficient.Replacing the direct
getpagesize()call with the cached_osPageSizeimproves performance by avoiding repeated syscalls.
577-577: LGTM! Init-capture is more idiomatic.The lambda capture change from a separate
weak_ptrdeclaration to an init-capture[wssm = weak_from_this(), &guard]is cleaner and follows modern C++14 best practices.
736-736: LGTM! Using cached page size is more efficient.Replacing the direct
getpagesize()call with the cached_osPageSizeeliminates redundant syscalls in the stack context setup.src/mongo/base/string_data.h (3)
1-364: Major discrepancy: PR title does not match the code changes.The PR is titled "Use brpc Module Interface to refactor ThreadGroup loop" but the actual changes in this file add a
std::hashspecialization formongo::StringData. There is no code related to ThreadGroup, brpc, or loop refactoring in these changes.Please verify:
- Is this the correct file for this PR?
- Should the PR title/description be updated to reflect the actual changes?
- Are there other files in this PR that match the stated objectives?
179-181: LGTM: Style consistency improvement.The added space before the opening brace aligns with the formatting style used elsewhere in the file.
357-364: Add unit tests for std::hashmongo::StringData
No existing tests exercise this specialization withstd::unordered_maporstd::unordered_set; please add coverage to verify it works as intended.src/mongo/transport/SConscript (1)
7-17: AI summary inconsistency detected.The AI-generated summary claims that
transport_layer_commonnow declaresSYSLIBDEPS=["brpc", "gflags"], but no such change is visible in the provided code. Only theservice_executorlibrary at line 128 has this addition.src/mongo/scripting/mozjs/proxyscope.h (1)
204-210: LGTM! Consistent application of conditional synchronization primitives.The conditional member declarations correctly mirror the include guards, maintaining API compatibility while allowing backend selection. Both
stdx::mutex/coro::Mutexandstdx::condition_variable/coro::ConditionVariableexpose compatible interfaces that work with standard lock types.src/mongo/scripting/mozjs/proxyscope.cpp (2)
282-299: Approve state machine logic; C++17 support confirmedProject build configurations enforce C++17 (
-std=c++17in SConstruct andCMAKE_CXX_STANDARD 17), so CTAD usage is valid and the state transitions remain correct.
274-276: Remove manual same-thread check verification. The NSPR shim’sPR_GetCurrentThread()uses thread-local storage and is consistently bound for both created and embedded threads, so comparing_threadtoPR_GetCurrentThread()is reliable.src/mongo/db/session.cpp (1)
897-897: LGTM: Thread ID validation updated correctly.The assertion correctly uses
LocalThread::ID()to validate thread context. The -1 sentinel check aligns with the implementation inlocal_thread_state.cppwhich returns -1 when no valid task group exists.src/mongo/db/modules/eloq/CMakeLists.txt (1)
23-23: LGTM: Build option added appropriately.The
ELOQ_MODULE_ENABLEDoption provides a clear toggle for the Eloq module functionality with a sensible default (OFF).src/mongo/db/modules/eloq/src/eloq_recovery_unit.cpp (1)
928-928: LGTM: Thread ID correctly passed to transaction initialization.The change correctly uses
LocalThread::ID()to obtain the thread identifier for transaction initialization.src/mongo/db/coro_sync.cpp (2)
11-11: LGTM: Thread ID checks updated correctly.The coroutine synchronization logic correctly uses
LocalThread::ID()to determine if the current thread is part of a thread group. The -1 check ensures proper fallback to standard mutex operations when not in a managed thread context.Also applies correctly to lines 41 for condition variable wait.
25-25: LGTM: Log messages updated consistently.The log messages correctly report thread group IDs using
LocalThread::ID(). The pattern is applied consistently across all log statements in this file (lines 30, 51, 58).src/mongo/db/stats/top.cpp (1)
99-99: Confirm +1 offset matches vector sizing. Vectors_usageMutexVector,_usageVector,_histogramMutexVector, and_histogramVectorare initialized toserverGlobalParams.reservedThreadNum + 1, covering index 0 (forLocalThread::ID() == -1) throughreservedThreadNum. EnsureLocalThread::ID()never exceedsreservedThreadNum - 1and consider adding a debug invariant such asinvariant(id >= 0 && id < _usageVector.size()).src/mongo/db/catalog/database_holder_impl.cpp (5)
126-126: Same concern: verify _dbMapVector bounds for index 0.The same indexing pattern is used here. Confirm that
_dbMapVector[0]is a valid access whenLocalThread::ID()returns -1.
151-151: Same concern: verify _dbMapVector bounds for index 0.The same indexing pattern is used here. Confirm that
_dbMapVector[0]is a valid access whenLocalThread::ID()returns -1.
225-225: Same concern: verify _dbMapVector bounds for index 0.The same indexing pattern is used here. Confirm that
_dbMapVector[0]is a valid access whenLocalThread::ID()returns -1.
251-251: Same concern: verify _dbMapVector bounds for index 0.The same indexing pattern is used here. Confirm that
_dbMapVector[0]is a valid access whenLocalThread::ID()returns -1.
101-101: No OOB risk:_dbMapVectoris initialized withserverGlobalParams.reservedThreadNum + 1, soid = LocalThread::ID() + 1(even whenID() == -1→id == 0) always indexes within [0…reservedThreadNum].src/mongo/db/session_catalog.cpp (2)
134-135: LGTM! Proper invariant check before using LocalThread::ID().The invariant ensures that
LocalThread::ID()returns a valid non-negative value before it's used asthreadGroupId. This prevents the -1 case (when no task group is available) from causing issues downstream.
171-172: LGTM! Consistent invariant check pattern.The same defensive invariant pattern is applied here, ensuring
LocalThread::ID()returns a valid non-negative value.src/mongo/db/storage/kv/kv_storage_engine.cpp (2)
543-543: Consider adding invariant check for LocalThread::ID().Unlike
src/mongo/db/session_catalog.cpp(lines 134, 171), this code doesn't verify thatLocalThread::ID()returns a non-negative value before using it for indexing. WhenLocalThread::ID()returns -1,idbecomes 0, which may cause incorrect indexing into_dbMapVector.Consider adding an invariant similar to the pattern in session_catalog.cpp:
+ invariant(LocalThread::ID() >= 0); auto id = static_cast<int16_t>(LocalThread::ID() + 1);Additionally, verify that
_dbMapVectoris sized to handle index 0 safely.
576-576: Same concern: add invariant check for LocalThread::ID().Same issue as line 543. Consider adding an invariant check to ensure
LocalThread::ID()returns a non-negative value before indexing into_dbMapVector.src/mongo/db/kill_sessions_local.cpp (3)
80-80: LGTM! Added resumeTask member to CoroCtx.This change promotes
resumeTaskfrom a local variable to a struct member, improving lifetime management and allowing shared access through thecoroCtxshared_ptr.
113-122: LGTM! Refactored to use member resumeTask.The coroutine resume functors now correctly reference
coroCtx->resumeTaskinstead of capturing a local variable, ensuring proper lifetime and access semantics.
139-144: LGTM! Added coroutine lifecycle tracking.The addition of
ongoingCoroutineCountUpdatewithMakeGuardensures that the service executor can track active coroutines and properly clean up when the task completes or exits, even in exception paths.src/mongo/base/local_thread_state.cpp (2)
21-31: Document when LocalThread::ID() returns -1.
LocalThread::ID()can return -1 whenbthread::tls_task_groupis null in ELOQ_MODULE_ENABLED mode. However, several call sites (e.g.,src/mongo/db/catalog/database_holder_impl.cpp) computeLocalThread::ID() + 1and use it directly as an array index without checking for -1, which would result in index 0.Please clarify:
- Under what conditions is
tls_task_groupexpected to be null?- Should callers always check for -1 before using the ID for indexing?
- Should this method include a precondition check (invariant/uassert) to prevent -1 returns in contexts where it's invalid?
Consider adding documentation or assertions to make the contract clear.
39-45: LGTM! IsBThread() provides useful runtime detection.This method allows callers to determine if they're running in a bthread context, which can be useful for conditional logic.
src/mongo/transport/service_executor_coroutine.cpp (3)
98-107: LGTM: enqueue/resume path with notifyIfAsleep.Use of concurrent queues and notification looks good.
90-96: LGTM: per‑group init and thread name.Initialization and naming are straightforward.
179-184: Minor: setting gflags at runtime.Ensure this runs early enough and is acceptable for your environment; it affects global bthread concurrency.
src/mongo/transport/service_executor_coroutine.h (3)
79-81: LGTM: setTxServiceFunctors() API using internal group id.Simplifies callers and matches usage.
88-91: LGTM: _extWorkerActive flag for module path.Appropriate for NotifyWorker gating.
148-151: LGTM: hide _startWorker when module enabled.Prevents dead code in module builds.
src/mongo/SConscript
Outdated
| LIBDEPS_PRIVATE=[ | ||
| 'util/debugger', | ||
| ], | ||
| SYSLIBDEPS=["brpc"], |
There was a problem hiding this comment.
🧩 Analysis chain
Critical: Violates base library dependency constraints.
Adding brpc as a system library dependency to the base library directly violates the explicit architectural constraint stated at lines 139-140:
NOTE: This library must not depend on any libraries than the ones declared here. Do not add to this list.
The base library is fundamental and used throughout the codebase. Adding external dependencies like brpc to it:
- Breaks architectural boundaries: The base library should remain minimal with only essential dependencies.
- Creates widespread coupling: All code depending on
base(which is most of the codebase) will now transitively depend onbrpc. - Increases maintenance burden: Changes to brpc could ripple through the entire system.
- Ignores documented constraints: The comment explicitly forbids this addition.
Recommended approach: Instead of adding brpc to the base library, consider:
- Adding it only to the specific modules that need brpc functionality (e.g., transport layer, service executor)
- Creating a separate
brpc_integrationorbrpc_supportlibrary that modules can opt into - Re-architecting to minimize the scope of brpc dependencies
Run the following script to assess the impact:
🏁 Script executed:
#!/bin/bash
# Description: Assess the impact of adding brpc to base library
echo "=== Finding all libraries that depend on base ==="
rg -n --type=python "LIBDEPS.*'base'" src/mongo/
echo "=== Checking if brpc symbols are used in base library sources ==="
rg -n --type=cpp 'brpc::' src/mongo/base/ src/mongo/bson/ src/mongo/logger/ src/mongo/platform/ src/mongo/util/
echo "=== Checking for brpc includes in base library sources ==="
rg -n --type=cpp '#include.*brpc' src/mongo/base/ src/mongo/bson/ src/mongo/logger/ src/mongo/platform/ src/mongo/util/Length of output: 205
🏁 Script executed:
#!/bin/bash
echo "=== SYSLIBDEPS definitions in SConscript files ==="
rg -n "SYSLIBDEPS" src/mongo
echo "=== brpc symbol usage in C++ sources ==="
rg -n "brpc::" -g '*.cpp' -g '*.h' src/mongo
echo "=== brpc includes in C++ sources ==="
rg -n "#include.*brpc" -g '*.cpp' -g '*.h' src/mongoLength of output: 744
Critical: Don’t add brpc to the base library.
The NOTE at lines 139–140 in src/mongo/SConscript forbids expanding base’s dependencies. Neither src/mongo/base/ nor its callers reference brpc:: or include any <brpc/...> headers, so this addition is unnecessary and breaks the architectural boundary. Remove SYSLIBDEPS=["brpc"] from src/mongo/SConscript and keep brpc only in the modules that actually use it (e.g. transport/SConscript and db/modules/eloq/SConscript).
🤖 Prompt for AI Agents
In src/mongo/SConscript around line 152, the base library erroneously adds
SYSLIBDEPS=["brpc"] which violates the NOTE at lines 139–140 and the
architectural boundary; remove the SYSLIBDEPS=["brpc"] entry from
src/mongo/SConscript so that brpc is not pulled into the base library, and
ensure brpc remains declared only in the SConscript files for modules that
actually use it (e.g. transport/SConscript and db/modules/eloq/SConscript).
| #ifndef D_USE_CORO_SYNC | ||
| #include "mongo/stdx/condition_variable.h" | ||
| #include "mongo/stdx/mutex.h" | ||
| #else | ||
| #include "mongo/db/coro_sync.h" | ||
| #endif |
There was a problem hiding this comment.
🧩 Analysis chain
Verify that D_USE_CORO_SYNC is consistently defined across the entire build.
The conditional compilation approach is sound, but inconsistent definition of D_USE_CORO_SYNC across translation units would cause ODR violations. Ensure this macro is set consistently at the build-system level (e.g., CMake).
Consider adding a brief comment above the conditional block explaining what D_USE_CORO_SYNC controls, e.g.:
// Use coroutine-aware synchronization primitives when D_USE_CORO_SYNC is defined,
// otherwise fall back to standard stdx primitives.
#ifndef D_USE_CORO_SYNCRun this script to verify that D_USE_CORO_SYNC is defined consistently through build configuration:
🏁 Script executed:
#!/bin/bash
# Check if D_USE_CORO_SYNC is defined in build system files
rg -g '*.cmake' -g 'CMakeLists.txt' -g '*.bzl' -g 'BUILD' -nC3 'D_USE_CORO_SYNC'
# Check for any conditional definitions that might cause inconsistency
rg -n '#define\s+D_USE_CORO_SYNC' --type=cpp --type=cLength of output: 123
Define D_USE_CORO_SYNC via build configuration
- Move
D_USE_CORO_SYNCinto your CMakeLists.txt (e.g., viatarget_compile_definitions) and remove the header-based#defineinsrc/mongo/db/service_context.h:54. - Add a brief comment above the
#ifndef D_USE_CORO_SYNCblock insrc/mongo/scripting/mozjs/proxyscope.hexplaining that it toggles coroutine-aware vs. stdx synchronization.
🤖 Prompt for AI Agents
In src/mongo/scripting/mozjs/proxyscope.h around lines 39-44, D_USE_CORO_SYNC
must be provided by the build system rather than by a header; remove the
header-based #define from src/mongo/db/service_context.h line 54, add
target_compile_definitions (or equivalent) in the appropriate CMakeLists.txt for
the target(s) that build this code to define D_USE_CORO_SYNC where needed
(handle Debug/Release or platform-specific variants as required), and add a
brief comment immediately above the #ifndef D_USE_CORO_SYNC block in
proxyscope.h stating that this macro toggles coroutine-aware (coro_sync) vs stdx
synchronization so future readers know why the conditional exists.
src/mongo/transport/SConscript
Outdated
| '$BUILD_DIR/third_party/shim_asio', | ||
| 'transport_layer_common', | ||
| ], | ||
| SYSLIBDEPS=["brpc", "gflags"], |
There was a problem hiding this comment.
🧩 Analysis chain
Verify necessity of system library dependencies and confirm source code changes.
The addition of brpc and gflags as system library dependencies raises several questions:
-
Missing source changes: The PR title mentions "refactor ThreadGroup loop" but only build configuration changes are provided. Please verify that corresponding source code changes exist that actually use brpc APIs.
-
gflags dependency: Is
gflagsdirectly used byservice_executorsources, or is it only a transitive dependency ofbrpc? If it's transitive, consider whether it needs to be explicitly listed. -
Scope verification: Confirm that only
service_executorneeds these dependencies and not other transport layer modules.
Run the following script to verify the dependencies and usage:
🏁 Script executed:
#!/bin/bash
# Description: Verify brpc and gflags usage in service_executor sources
echo "=== Checking service_executor source files for brpc usage ==="
rg -n --type=cpp -C3 'brpc::' src/mongo/transport/service_executor*.cpp
echo "=== Checking service_executor source files for gflags usage ==="
rg -n --type=cpp -C3 'gflags::' src/mongo/transport/service_executor*.cpp
echo "=== Checking for DEFINE_* macros (gflags) ==="
rg -n --type=cpp -C3 'DEFINE_(string|int|bool|double)' src/mongo/transport/service_executor*.cpp
echo "=== Checking ThreadGroup references ==="
rg -n --type=cpp -C3 'ThreadGroup' src/mongo/transport/service_executor*.cppLength of output: 7800
Remove unused system library dependencies
No occurrences of brpc::, gflags::, or DEFINE_* macros were found in the service_executor sources—these SYSLIBDEPS entries appear unnecessary. Remove brpc and gflags unless you’re adding code that directly depends on them.
🤖 Prompt for AI Agents
In src/mongo/transport/SConscript around line 128, the SYSLIBDEPS list includes
"brpc" and "gflags" which are unused by the service_executor sources; remove
"brpc" and "gflags" from the SYSLIBDEPS array so the build system no longer
links against unnecessary system libraries and to keep dependencies accurate.
| #ifdef ELOQ_MODULE_ENABLED | ||
| void MongoModule::ExtThdStart(int thd_id) { | ||
| MONGO_LOG(3) << "MongoModule::ExtThdStart " << thd_id; | ||
| ThreadGroup& threadGroup = _threadGroups[thd_id]; | ||
| if (!threadGroup._initialized) { | ||
| threadGroup.init(thd_id); | ||
| } | ||
| invariant(threadGroup._threadGroupId == thd_id); | ||
| threadGroup._extWorkerActive.store(true, std::memory_order_release); | ||
| } |
There was a problem hiding this comment.
Module thread start: set LocalThread ID, init Tx functors, notify Tx proc, and guard bounds.
- No LocalThread::SetID for ELOQ path.
- _txProcessorExec/_updateExtProc never initialized in module path.
- No bounds check on thd_id before indexing.
Apply:
void MongoModule::ExtThdStart(int thd_id) {
MONGO_LOG(3) << "MongoModule::ExtThdStart " << thd_id;
- ThreadGroup& threadGroup = _threadGroups[thd_id];
+ invariant(thd_id >= 0 && static_cast<size_t>(thd_id) < _threadGroupCount);
+ LocalThread::SetID(static_cast<int16_t>(thd_id));
+ ThreadGroup& threadGroup = _threadGroups[thd_id];
if (!threadGroup._initialized) {
threadGroup.init(thd_id);
}
invariant(threadGroup._threadGroupId == thd_id);
+ threadGroup.setTxServiceFunctors();
+#ifdef EXT_TX_PROC_ENABLED
+ threadGroup._updateExtProc(1);
+#endif
threadGroup._extWorkerActive.store(true, std::memory_order_release);
}Requires header changes: MongoModule stores _threadGroupCount and Init() takes count. See header comment.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/mongo/transport/service_executor_coroutine.cpp around lines 43-52, the
ELOQ module thread start path is missing LocalThread::SetID, doesn't initialize
or notify _txProcessorExec/_updateExtProc, and indexes _threadGroups without
validating thd_id; update the function to first bounds-check thd_id against the
module's _threadGroupCount (return/log on out-of-range), call
LocalThread::SetID(thd_id) for the ELOQ path before any thread-local usage,
ensure threadGroup.init(thd_id) also sets up _txProcessorExec and _updateExtProc
(or call their initialization routines) and explicitly notify the transaction
processor after init so external workers are aware, and update the header to
store _threadGroupCount and change Init() to accept a count parameter as noted
in the header comment so the bounds check can use that stored value.
| void MongoModule::ExtThdEnd(int thd_id) { | ||
| MONGO_LOG(3) << "MongoModule::ExtThdEnd " << thd_id; | ||
| ThreadGroup& threadGroup = _threadGroups[thd_id]; | ||
| invariant(threadGroup._threadGroupId == thd_id); | ||
| threadGroup._extWorkerActive.store(false, std::memory_order_release); | ||
| } |
There was a problem hiding this comment.
Module thread end: notify Tx processor stop and add bounds check.
Apply:
void MongoModule::ExtThdEnd(int thd_id) {
MONGO_LOG(3) << "MongoModule::ExtThdEnd " << thd_id;
- ThreadGroup& threadGroup = _threadGroups[thd_id];
+ invariant(thd_id >= 0 && static_cast<size_t>(thd_id) < _threadGroupCount);
+ ThreadGroup& threadGroup = _threadGroups[thd_id];
invariant(threadGroup._threadGroupId == thd_id);
+#ifdef EXT_TX_PROC_ENABLED
+ threadGroup._updateExtProc(-1);
+#endif
threadGroup._extWorkerActive.store(false, std::memory_order_release);
}🤖 Prompt for AI Agents
In src/mongo/transport/service_executor_coroutine.cpp around lines 54 to 59, add
a bounds check for thd_id before indexing _threadGroups and ensure the
transaction processor is notified when an external thread ends: validate thd_id
is within [0, _threadGroups.size()) and log/return if out of range, then proceed
to set threadGroup._extWorkerActive to false and, if the module has a Tx
processor pointer/member, call its thread-stop notification method (e.g.,
notifyThreadStopped/OnThreadEnd) with thd_id after clearing the flag, guarding
the call with a null check.
| Status ServiceExecutorCoroutine::start() { | ||
| MONGO_LOG(0) << "ServiceExecutorCoroutine::start"; | ||
| { | ||
| // stdx::unique_lock<stdx::mutex> lk(_mutex); | ||
| _stillRunning.store(true, std::memory_order_release); | ||
| } | ||
|
|
||
| #ifndef ELOQ_MODULE_ENABLED | ||
| for (size_t i = 0; i < _reservedThreads; i++) { | ||
| auto status = _startWorker(static_cast<int16_t>(i)); | ||
| if (!status.isOK()) { | ||
| return status; | ||
| } | ||
| } | ||
|
|
||
| #else | ||
| MongoModule::Instance()->Init(_threadGroups.data()); | ||
| int rc = eloq::register_module(MongoModule::Instance()); | ||
| invariant(rc == 0); | ||
| #endif | ||
| _stillRunning.store(true, std::memory_order_release); | ||
| return Status::OK(); | ||
| } |
There was a problem hiding this comment.
start(): pass thread group count to MongoModule::Init.
So MongoModule can bounds‑check thread IDs.
Apply:
#else
- MongoModule::Instance()->Init(_threadGroups.data());
+ MongoModule::Instance()->Init(_threadGroups.data(), _threadGroups.size());
int rc = eloq::register_module(MongoModule::Instance());
invariant(rc == 0);
#endif📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Status ServiceExecutorCoroutine::start() { | |
| MONGO_LOG(0) << "ServiceExecutorCoroutine::start"; | |
| { | |
| // stdx::unique_lock<stdx::mutex> lk(_mutex); | |
| _stillRunning.store(true, std::memory_order_release); | |
| } | |
| #ifndef ELOQ_MODULE_ENABLED | |
| for (size_t i = 0; i < _reservedThreads; i++) { | |
| auto status = _startWorker(static_cast<int16_t>(i)); | |
| if (!status.isOK()) { | |
| return status; | |
| } | |
| } | |
| #else | |
| MongoModule::Instance()->Init(_threadGroups.data()); | |
| int rc = eloq::register_module(MongoModule::Instance()); | |
| invariant(rc == 0); | |
| #endif | |
| _stillRunning.store(true, std::memory_order_release); | |
| return Status::OK(); | |
| } | |
| Status ServiceExecutorCoroutine::start() { | |
| MONGO_LOG(0) << "ServiceExecutorCoroutine::start"; | |
| #ifndef ELOQ_MODULE_ENABLED | |
| for (size_t i = 0; i < _reservedThreads; i++) { | |
| auto status = _startWorker(static_cast<int16_t>(i)); | |
| if (!status.isOK()) { | |
| return status; | |
| } | |
| } | |
| #else | |
| MongoModule::Instance()->Init(_threadGroups.data(), _threadGroups.size()); | |
| int rc = eloq::register_module(MongoModule::Instance()); | |
| invariant(rc == 0); | |
| #endif | |
| _stillRunning.store(true, std::memory_order_release); | |
| return Status::OK(); | |
| } |
🤖 Prompt for AI Agents
In src/mongo/transport/service_executor_coroutine.cpp around lines 185–201, the
call MongoModule::Instance()->Init(_threadGroups.data()) passes a pointer to
thread group data instead of the number of thread groups; change this to pass
the thread group count so MongoModule can bounds‑check thread IDs (e.g. call
Init(static_cast<int>(_threadGroups.size())) or another appropriate integer type
matching Init's signature), and update any related call sites or signatures if
needed to accept the count.
| #ifdef ELOQ_MODULE_ENABLED | ||
| class MongoModule final : public eloq::EloqModule { | ||
| public: | ||
| static MongoModule* Instance() { | ||
| static MongoModule mongoModule; | ||
| return &mongoModule; | ||
| } | ||
|
|
||
| void Init(ThreadGroup* threadGroups) { | ||
| _threadGroups = threadGroups; | ||
| } | ||
|
|
||
| void ExtThdStart(int thd_id) override; | ||
|
|
||
| void ExtThdEnd(int thd_id) override; | ||
|
|
||
| void Process(int thd_id) override; | ||
|
|
||
| bool HasTask(int thd_id) const override; | ||
|
|
||
| private: | ||
| MongoModule() = default; | ||
|
|
||
| private: | ||
| ThreadGroup* _threadGroups{nullptr}; | ||
| }; | ||
| #endif |
There was a problem hiding this comment.
MongoModule should track group count and validate thread IDs.
Store count in Init and use for bounds checks in all methods.
Apply:
class MongoModule final : public eloq::EloqModule {
public:
- void Init(ThreadGroup* threadGroups) {
- _threadGroups = threadGroups;
- }
+ void Init(ThreadGroup* threadGroups, size_t count) {
+ _threadGroups = threadGroups;
+ _threadGroupCount = count;
+ }
@@
private:
MongoModule() = default;
private:
ThreadGroup* _threadGroups{nullptr};
+ size_t _threadGroupCount{0};
};And add invariants in .cpp before indexing (see cpp comments).
🤖 Prompt for AI Agents
In src/mongo/transport/service_executor_coroutine.h around lines 25 to 51,
MongoModule currently only stores a ThreadGroup* and does not track the number
of groups nor validate thread IDs; update Init to accept and store the group
count (e.g., an unsigned int or size_t member) and add a private member for it,
then modify ExtThdStart, ExtThdEnd, Process and HasTask to validate thd_id
against the stored count (return/handle out-of-range IDs safely). In addition,
add appropriate invariant checks in the corresponding .cpp before any
array/index access (use assertions or guards) so methods never index
out-of-bounds; ensure Init sets the count when called and that all methods
early-return or log/error on invalid thd_id.
| uint16_t _ongoingCoroutineCnt{0}; | ||
|
|
There was a problem hiding this comment.
Make _ongoingCoroutineCnt atomic signed; adjust isBusy().
Required to fix data race and underflow. Pair with cpp changes.
Apply:
- uint16_t _ongoingCoroutineCnt{0};
+ std::atomic<int32_t> _ongoingCoroutineCnt{0};And declare isBusy() to use atomic load (cpp already adjusted).
Also applies to: 82-83
🤖 Prompt for AI Agents
In src/mongo/transport/service_executor_coroutine.h around lines 104 (and also
apply same change at lines 82-83), _ongoingCoroutineCnt must be changed from a
non-atomic unsigned type to an atomic signed type to prevent data races and
underflow; declare it as std::atomic<int32_t> (or std::atomic<int>) and ensure
all increments/decrements use atomic operations (e.g., fetch_add/fetch_sub) and
that isBusy() is implemented to use .load() on the atomic; make these header
changes to match the corresponding cpp adjustments already made.
137f1cc to
e226281
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp (1)
1224-1243: Thread synchronization issue flagged in previous review remains unresolved.The implementation still has the threading issues raised in the previous review:
- The thread is detached (Line 1239) but the main thread blocks waiting on it via
cv.wait(), making the detach semantics unclear- Stack-allocated variables (
done,mux,cv) are captured by reference in the lambda, but the thread is detached, creating potential lifetime issues- No timeout on the
cv.wait()callAs suggested in the previous review, consider:
- Using
thd.join()instead ofthd.detach()- Adding a timeout to prevent indefinite hangs:
if (!cv.wait_for(lk, std::chrono::seconds(30), [&done]() { return done; })) { error() << "Timeout waiting for tx service shutdown"; } thd.join();
🧹 Nitpick comments (1)
src/mongo/db/modules/eloq/src/eloq_global_options.cpp (1)
792-804: Consider aligning option and field names for clarity.The user-facing option is named
txlogAsyncFsync(Line 179) but the internal field israftlogAsyncFsync(Line 802). While the implementation is correct, this naming mismatch could cause confusion during debugging or maintenance.Consider either:
- Renaming the option to match the internal field:
raftlogAsyncFsync- Or renaming the field to match the option:
txlogAsyncFsyncThis same inconsistency exists in
eloq_global_options.hat Line 69.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
concourse/scripts/build_tarball.bash(0 hunks)concourse/scripts/build_tarball_open.bash(0 hunks)concourse/scripts/common.sh(4 hunks)docs/how-to-compile.md(0 hunks)src/mongo/SConscript(1 hunks)src/mongo/db/modules/eloq/src/eloq_global_options.cpp(3 hunks)src/mongo/db/modules/eloq/src/eloq_global_options.h(1 hunks)src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp(9 hunks)src/mongo/transport/service_state_machine.cpp(4 hunks)
💤 Files with no reviewable changes (3)
- docs/how-to-compile.md
- concourse/scripts/build_tarball.bash
- concourse/scripts/build_tarball_open.bash
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-25T11:58:50.446Z
Learnt from: githubzilla
PR: eloqdata/eloqdoc#211
File: src/mongo/db/modules/eloq/cmake/build_log_service.cmake:116-119
Timestamp: 2025-09-25T11:58:50.446Z
Learning: The build_log_service.cmake file is specifically for "open log" functionality and only supports the ROCKSDB log state type (LOG_STATE_TYPE_RKDB). The full log state configuration with cloud variants (ROCKSDB_CLOUD_S3, ROCKSDB_CLOUD_GCS) is handled in build_eloq_log_service.cmake.
Applied to files:
concourse/scripts/common.sh
🧬 Code graph analysis (1)
src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp (1)
src/mongo/db/coro_sync.h (1)
coro(28-113)
🔇 Additional comments (11)
concourse/scripts/common.sh (2)
66-66: CMake flag addition aligns with module gatingEnabling
-DELOQ_MODULE_ENABLED=ONhere matches the new module hooks and keeps the CMake path in sync with the rest of the refactor. Looks good.
83-83: SCons define keeps legacy build in stepPassing
CPPDEFINES="ELOQ_MODULE_ENABLED"ensures the SCons build sees the same module guard as CMake. Thanks for keeping both build paths consistent.src/mongo/transport/service_state_machine.cpp (4)
262-262: LGTM! Good optimization to cache page size.Caching
getpagesize()result at construction time eliminates redundant syscalls. Thestatic_cast<size_t>is appropriate sincegetpagesize()returnsintbut size should be unsigned.
272-272: LGTM! Consistent use of cached page size.The cached
_osPageSizeis correctly used in both memory protection and stack sizing calculations, replacing directgetpagesize()calls.Also applies to: 736-736
577-577: LGTM! Improved lambda capture style.Capturing
weak_from_this()directly by value in the capture list is cleaner and safer than capturing a reference to a local variable. This is more idiomatic modern C++ and eliminates potential lifetime issues.
266-266: kCoroStackSize is defined as a static constexpr in service_state_machine.h (line 285) and is accessible to all its usages—no action required.src/mongo/db/modules/eloq/src/eloq_global_options.cpp (1)
164-183: LGTM! Option declarations are well-structured.The two new boolean options (
enableIOuringandtxlogAsyncFsync) are properly declared with sensible defaults and clear documentation.src/mongo/db/modules/eloq/src/eloq_global_options.h (1)
68-69: LGTM! Field declarations are consistent with the implementation.The two new boolean fields are properly declared with sensible defaults. Note that the naming inconsistency between
raftlogAsyncFsync(internal field) andtxlogAsyncFsync(user-facing option) was already flagged in the review ofeloq_global_options.cpp.src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp (3)
274-287: LGTM! Robust validation and configuration.The validation logic correctly enforces that
txlogAsyncFsyncrequiresenableIOuringto be enabled, with a clear error message. The GFLAGS configuration for brpc behavior and I/O completion is appropriately set based on the global options.
1212-1212: LGTM! Proper extraction of shutdown logic.Refactoring to call
shutdownTxService()improves code organization and enables conditional behavior based on build flags.
848-849: Client::getCurrent() is non-null in these server-thread contexts and coroutine functors are initialized on thread creation; no additional null-checks are required.
50458a9 to
d457e92
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/mongo/db/coro_sync.h (1)
4-5: LGTM! Required headers now included.The missing
<functional>and<cstdint>headers have been added as requested in the previous review, resolving the compilation issues for consumers that don't already include them.
🧹 Nitpick comments (1)
src/mongo/db/coro_sync.h (1)
12-28: Add documentation explaining struct's purpose, ownership semantics, and pointer-identity comparison design.The struct's purpose is clear from usage, but critical details lack documentation. The static
Unavailablemember is correctly defined insrc/mongo/db/coro_sync.cpp:10. However, the header should document:
- Ownership: These pointers reference externally-owned coroutine callback functions. Who establishes and manages their lifetime?
- Pointer-identity design: The equality operators intentionally compare pointer addresses (e.g.,
lhs.yieldFuncPtr == rhs.yieldFuncPtr), treating all pointers as a set. This works as a sentinel pattern whereUnavailable(all nullptr) indicates no coroutine context. This design is reasonable but should be explicitly stated.- Validity: When are null pointers valid vs. required to be non-null? Code shows null-checks before dereferencing (e.g.,
if (_coro.yieldFuncPtr && *_coro.yieldFuncPtr)).Add a comment explaining the struct's role, pointer ownership, and why pointer identity (not semantic equivalence) is compared.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/mongo/SConscript(1 hunks)src/mongo/db/coro_sync.h(2 hunks)src/mongo/db/modules/eloq/SConscript(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/mongo/SConscript
🧰 Additional context used
🧬 Code graph analysis (1)
src/mongo/db/coro_sync.h (2)
src/mongo/db/local_thread_state.h (1)
mongo(9-22)src/mongo/db/client.h (1)
mongo(59-232)
🔇 Additional comments (1)
src/mongo/db/coro_sync.h (1)
30-30: The original review comment is incorrect—no breaking changes exist.Verification of the codebase reveals:
- No references to
mongo::Mutexormongo::ConditionVariableexist in MongoDB source code (all search results are third-party libraries only)- All 20+ usages of
ConditionVariablein the codebase already use the properly qualifiedcoro::ConditionVariable(e.g., in notification.h, service_context.h, operation_context.h, session_killer.h, and elsewhere)- No unqualified
MutexorConditionVariablereferences exist in MongoDB source code- The namespace reorganization appears to have been coordinated across the entire codebase with complete updates
The concern that
mongo::Mutexandmongo::ConditionVariableusages need updating is not substantiated by the codebase evidence.Likely an incorrect or invalid review comment.
Summary by CodeRabbit