Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/mongo/db/kill_sessions_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
}

struct CoroCtx {
static constexpr size_t kCoroStackSize = 3200 * 1024;
// Coroutine stack size: Increased from 3.2MB to 8MB to prevent stack overflow
// in complex database operations (deep recursion, large BSON processing, etc.)
static constexpr size_t kCoroStackSize = 8192 * 1024; // 8MB
boost::context::protected_fixedsize_stack salloc{kCoroStackSize};
boost::context::continuation source;
std::function<void()> resumeTask;
Expand Down Expand Up @@ -111,7 +113,7 @@ void killAllExpiredTransactions(OperationContext* opCtx) {
getGlobalServiceContext()->getServiceEntryPoint()->getServiceExecutor();

coroCtx->resumeTask = [&source = coroCtx->source, &client] {
log() << "abortArbitraryTransactionIfExpired call resume.";
error() << "abortArbitraryTransactionIfExpired call resume.";
Client::setCurrent(std::move(client));
source = source.resume();
};
Expand All @@ -129,7 +131,7 @@ void killAllExpiredTransactions(OperationContext* opCtx) {
[&finished, &mux, &cv, coroCtx, opCtx, session, &client, serviceExecutor](
boost::context::continuation&& sink) {
coroCtx->yieldFunc = [&sink, &client]() {
log() << "abortArbitraryTransactionIfExpired call yield.";
error() << "abortArbitraryTransactionIfExpired call yield.";
client = Client::releaseCurrent();
sink = sink.resume();
};
Expand Down
13 changes: 11 additions & 2 deletions src/mongo/transport/service_executor_coroutine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,22 @@ Status ServiceExecutorCoroutine::schedule(Task task,
std::function<void()> ServiceExecutorCoroutine::coroutineResumeFunctor(uint16_t threadGroupId,
const Task& task) {
invariant(threadGroupId < _threadGroups.size());
return [thd_group = &_threadGroups[threadGroupId], &task]() { thd_group->resumeTask(task); };
// IMPORTANT: capture task by value. Capturing by reference here would dangle because `task`
// is a reference to this function's parameter, which goes out of scope when we return.
Task taskCopy = task;
return [thd_group = &_threadGroups[threadGroupId], task = std::move(taskCopy)]() mutable {
thd_group->resumeTask(task);
};
}

std::function<void()> ServiceExecutorCoroutine::coroutineLongResumeFunctor(uint16_t threadGroupId,
const Task& task) {
invariant(threadGroupId < _threadGroups.size());
return [thd_group = &_threadGroups[threadGroupId], &task]() { thd_group->enqueueTask(task); };
// Same lifetime rule as coroutineResumeFunctor(): capture by value to avoid dangling refs.
Task taskCopy = task;
return [thd_group = &_threadGroups[threadGroupId], task = std::move(taskCopy)]() mutable {
thd_group->enqueueTask(task);
};
}

void ServiceExecutorCoroutine::ongoingCoroutineCountUpdate(uint16_t threadGroupId, int delta) {
Expand Down
97 changes: 83 additions & 14 deletions src/mongo/transport/service_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/quick_exit.h"

#include <boost/context/detail/exception.hpp>
#include <boost/context/preallocated.hpp>
#include <cerrno>
#include <cstdint>
#include <cstring>
#include <sys/mman.h>
#include <unistd.h>
Expand Down Expand Up @@ -277,6 +279,7 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
ServiceStateMachine::~ServiceStateMachine() {
MONGO_LOG(1) << "ServiceStateMachine::~ServiceStateMachine";
_source = {};
_coroSink = {};
::munmap(_coroStack, kCoroStackSize);
}

Expand All @@ -301,6 +304,7 @@ void ServiceStateMachine::reset(ServiceContext* svcContext,
_coroLongResume = {};
_coroMigrateThreadGroup = {};
_resumeTask = {};
_coroSink = {};
_migrating.store(false, std::memory_order_relaxed);
_threadGroupId.store(groupId, std::memory_order_relaxed);
_owned.store(Ownership::kUnowned);
Expand Down Expand Up @@ -543,11 +547,23 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {
_coroStatus = CoroStatus::OnGoing;
_serviceExecutor->ongoingCoroutineCountUpdate(
_threadGroupId.load(std::memory_order_relaxed), 1);
_resumeTask = [ssm = this] {
// IMPORTANT: resume tasks can be queued and executed asynchronously.
// Avoid capturing a raw `this` pointer to prevent UAF during shutdown.
auto wssm = weak_from_this();
_resumeTask = [wssm] {
auto ssm = wssm.lock();
if (!ssm) {
return;
}

Client::setCurrent(std::move(ssm->_dbClient));
if (ssm->_migrating.load(std::memory_order_relaxed)) {
ssm->_coroResume();
ssm->_coroYield();
if (ssm->_coroResume) {
ssm->_coroResume();
}
if (ssm->_coroYield) {
ssm->_coroYield();
}
} else {
ssm->_runResumeProcess();
bool migrating = true;
Expand All @@ -565,8 +581,11 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {
_threadGroupId.load(std::memory_order_relaxed), _resumeTask);
_coroLongResume = _serviceExecutor->coroutineLongResumeFunctor(
_threadGroupId.load(std::memory_order_relaxed), _resumeTask);
_coroMigrateThreadGroup = std::bind(
&ServiceStateMachine::_migrateThreadGroup, this, std::placeholders::_1);
_coroMigrateThreadGroup = [wssm](uint16_t threadGroupId) {
if (auto ssm = wssm.lock()) {
ssm->_migrateThreadGroup(threadGroupId);
}
};

boost::context::stack_context sc = _coroStackContext();
boost::context::preallocated prealloc(sc.sp, sc.size, sc);
Expand All @@ -579,14 +598,28 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {
if (!ssm) {
return std::move(sink);
}
ssm->_coroYield = [ssm = ssm.get(), &sink]() {
// Store sink as a member so yield never captures references to
// stack-local continuation variables.
ssm->_coroSink = std::move(sink);

// Yield functor: only valid to call while executing on the
// coroutine stack. It resumes the caller continuation.
ssm->_coroYield = [wssm]() {
auto ssmLocked = wssm.lock();
if (!ssmLocked) {
return;
}
MONGO_LOG(3) << "call yield";
ssm->_dbClient = Client::releaseCurrent();
sink = sink.resume();
// If these invariants fail, it strongly suggests someone is
// calling yield outside the active coroutine window.
invariant(haveClient());
invariant(static_cast<bool>(ssmLocked->_coroSink));
ssmLocked->_dbClient = Client::releaseCurrent();
ssmLocked->_coroSink = ssmLocked->_coroSink.resume();
};
ssm->_processMessage(std::move(guard));

return std::move(sink);
return std::move(ssm->_coroSink);
});

bool migrating = true;
Expand Down Expand Up @@ -622,6 +655,9 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {
}

return;
} catch (const boost::context::detail::forced_unwind&) {
// Boost.Context uses forced_unwind to unwind stacks safely. It must never be swallowed.
throw;
} catch (const DBException& e) {
// must be right above std::exception to avoid catching subclasses
log() << "DBException handling request, closing client connection: " << redact(e);
Expand Down Expand Up @@ -733,9 +769,26 @@ void ServiceStateMachine::setThreadGroupId(size_t id) {

boost::context::stack_context ServiceStateMachine::_coroStackContext() {
boost::context::stack_context sc;
sc.size = kCoroStackSize - _osPageSize;
// Because stack grows downwards from high address?
sc.sp = _coroStack + kCoroStackSize;
// Stack grows downwards from high address.
//
// We reserve the first OS page as a guard page (PROT_NONE), so the usable region is:
// [bottom, top)
// where:
// bottom = _coroStack + _osPageSize
// top = _coroStack + kCoroStackSize
//
// Boost.Context models the usable range as [sp - size, sp), so if we align sp down we must
// also recompute size from the aligned sp to keep the bottom boundary correct.
char* bottom = _coroStack + _osPageSize;
char* top = _coroStack + kCoroStackSize;

// Align down to 16-byte boundary (common ABI requirement on x86-64 and for SIMD).
char* sp =
reinterpret_cast<char*>(reinterpret_cast<uintptr_t>(top) & ~static_cast<uintptr_t>(15));

invariant(sp > bottom);
sc.sp = sp;
sc.size = static_cast<size_t>(sp - bottom);
return sc;
}

Expand All @@ -745,8 +798,12 @@ void ServiceStateMachine::_migrateThreadGroup(uint16_t threadGroupId) {
_coroResume = _serviceExecutor->coroutineResumeFunctor(threadGroupId, _resumeTask);
_coroLongResume = _serviceExecutor->coroutineLongResumeFunctor(threadGroupId, _resumeTask);
_migrating.store(true, std::memory_order_relaxed);
_coroResume();
_coroYield();
if (_coroResume) {
_coroResume();
}
if (_coroYield) {
_coroYield();
}
#ifdef MONGO_CONFIG_DEBUG_BUILD
_owningThread.store(stdx::this_thread::get_id());
#endif
Expand All @@ -769,6 +826,18 @@ void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {

_inMessage.reset();

// Disable coroutine functors early during teardown to prevent any further coroutine
// yield/resume calls racing with destruction of this SSM and its coroutine stack.
if (haveClient()) {
Client::getCurrent()->setCoroutineFunctors(CoroutineFunctors::Unavailable);
}
_coroYield = {};
_coroResume = {};
_coroLongResume = {};
_coroMigrateThreadGroup = {};
_resumeTask = {};
_coroSink = {};

// By ignoring the return value of Client::releaseCurrent() we destroy the session.
// _dbClient is now nullptr and _dbClientPtr is invalid and should never be accessed.
Client::releaseCurrent();
Expand Down
8 changes: 7 additions & 1 deletion src/mongo/transport/service_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,16 @@ class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMach

void _migrateThreadGroup(uint16_t threadGroupId);

static constexpr size_t kCoroStackSize = 3200 * 1024;
// Coroutine stack size: Increased from 3.2MB to 8MB to prevent stack overflow
// in complex database operations (deep recursion, large BSON processing, etc.)
static constexpr size_t kCoroStackSize = 8192 * 1024; // 8MB
Comment on lines +285 to +287
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for kCoroStackSize and related stack size references in codebase
echo "=== Searching for kCoroStackSize references ==="
rg -n "kCoroStackSize" --type=cpp

echo -e "\n=== Searching for coroutine stack size related code ==="
rg -n "CoroStackSize|coroutine.*stack|stack.*size" --type=cpp -i | head -50

echo -e "\n=== Checking service_state_machine.h file context ==="
rg -n -B5 -A5 "kCoroStackSize" --type=cpp

echo -e "\n=== Searching for tests related to coroutine stack or memory ==="
rg -n "coroutine|stack.*overflow|memory.*profile" --type=cpp -g '*test*' -g '*unittest*' | head -30

Repository: eloqdata/eloqdoc

Length of output: 13016


Document memory impact analysis and load testing for 8MB coroutine stack increase.

The 2.5x increase in coroutine stack size (3.2MB → 8MB) is implemented consistently across two critical code paths: service_state_machine.h and kill_sessions_local.cpp. Each mmap allocation of 8MB per connection introduces measurable memory overhead for deployments with thousands of concurrent connections.

The code rationale (preventing stack overflow in deep recursion and large BSON processing) is documented in both locations, but the following verification gaps remain:

  • No load testing results documenting memory consumption at scale
  • No configurable option to adjust stack size for different deployment requirements
  • No profiling data confirming actual stack usage justifies the 8MB threshold

These should be addressed in deployment documentation or as follow-up work, particularly for high-connection-count deployments where memory efficiency is critical.

🤖 Prompt for AI Agents
In src/mongo/transport/service_state_machine.h around lines 285 to 287, the
hardcoded coroutine stack size was increased to 8MB but lacks documentation,
load-test data, profiling evidence, and a runtime/configurable option; update
the code and repo as follows: add a clear note in the file header referencing an
external design/doc page that contains memory impact analysis and load-test
results for thousands of connections, expose the stack size as a configurable
parameter (compile-time constant fallback plus runtime override via config file
or env var/CLI flag) with validated bounds and a sensible default of 8MB, add a
short profiling hook or sample code comment showing how to capture maximum stack
usage during tests, and add a task in the repo docs (or CHANGELOG) linking to
completed profiling and load-test artifacts so operators can evaluate and tune
stack size for high-connection deployments.

const size_t _osPageSize;
char* _coroStack;
boost::context::continuation _source;
// Continuation representing the caller ("sink") while executing inside the coroutine created
// by callcc(). We store this as a member so yield/resume functors never capture references to
// stack-local continuation variables.
boost::context::continuation _coroSink;

enum class CoroStatus { Empty = 0, OnGoing, Finished };
CoroStatus _coroStatus{CoroStatus::Empty};
Expand Down