diff --git a/concourse/scripts/build_tarball.bash b/concourse/scripts/build_tarball.bash index a5fefb847e..7d05828dda 100755 --- a/concourse/scripts/build_tarball.bash +++ b/concourse/scripts/build_tarball.bash @@ -287,12 +287,6 @@ if [ -f ${DEST_DIR}/bin/dss_server ]; then patchelf --set-rpath '$ORIGIN/../lib' ${DEST_DIR}/bin/dss_server fi -# Preload libmimalloc and libbrpc at launch. -patchelf --remove-needed libmimalloc.so.2 ${DEST_DIR}/bin/eloqdoc -patchelf --remove-needed libbrpc.so ${DEST_DIR}/bin/eloqdoc -patchelf --add-needed libbrpc.so ${DEST_DIR}/bin/eloqdoc -patchelf --add-needed libmimalloc.so.2 ${DEST_DIR}/bin/eloqdoc - # Config files cp ${ELOQDOC_SRC}/concourse/artifact/${DATA_STORE_TYPE}/* ${DEST_DIR}/etc diff --git a/concourse/scripts/build_tarball_open.bash b/concourse/scripts/build_tarball_open.bash index 74297679ab..942afdc4c4 100755 --- a/concourse/scripts/build_tarball_open.bash +++ b/concourse/scripts/build_tarball_open.bash @@ -205,12 +205,6 @@ if [ -f ${DEST_DIR}/bin/host_manager ]; then patchelf --set-rpath '$ORIGIN/../lib' ${DEST_DIR}/bin/host_manager fi -# Preload libmimalloc and libbrpc at launch. -patchelf --remove-needed libmimalloc.so.2 ${DEST_DIR}/bin/eloqdoc -patchelf --remove-needed libbrpc.so ${DEST_DIR}/bin/eloqdoc -patchelf --add-needed libbrpc.so ${DEST_DIR}/bin/eloqdoc -patchelf --add-needed libmimalloc.so.2 ${DEST_DIR}/bin/eloqdoc - # Config files cp ${ELOQDOC_SRC}/concourse/artifact/${DATA_STORE_TYPE}/* ${DEST_DIR}/etc diff --git a/concourse/scripts/common.sh b/concourse/scripts/common.sh index df67b905bb..7fcb8182c7 100644 --- a/concourse/scripts/common.sh +++ b/concourse/scripts/common.sh @@ -63,6 +63,7 @@ compile_and_install() { -DCMAKE_CXX_FLAGS_DEBUG_INIT="-Wno-error -fPIC" \ -DCMAKE_BUILD_TYPE=Debug \ -DEXT_TX_PROC_ENABLED=ON \ + -DELOQ_MODULE_ENABLED=ON \ -DSTATISTICS=ON \ -DUSE_ASAN=OFF \ -DWITH_DATA_STORE=ELOQDSS_ROCKSDB_CLOUD_S3 @@ -79,6 +80,7 @@ compile_and_install() { VARIANT_DIR=Debug \ LIBPATH=/usr/local/lib \ CXXFLAGS="-Wno-nonnull -Wno-class-memaccess -Wno-interference-size -Wno-redundant-move" \ + CPPDEFINES="ELOQ_MODULE_ENABLED" \ --build-dir=#build \ --prefix="$PREFIX" \ --dbg=on \ @@ -110,6 +112,7 @@ compile_and_install_ent() { -DCMAKE_CXX_FLAGS_DEBUG_INIT="-Wno-error -fPIC" \ -DCMAKE_BUILD_TYPE=Debug \ -DEXT_TX_PROC_ENABLED=ON \ + -DELOQ_MODULE_ENABLED=ON \ -DSTATISTICS=ON \ -DUSE_ASAN=OFF \ -DWITH_LOG_STATE=ROCKSDB_CLOUD_S3 \ @@ -130,6 +133,7 @@ compile_and_install_ent() { VARIANT_DIR=Debug \ LIBPATH=/usr/local/lib \ CXXFLAGS="-Wno-nonnull -Wno-class-memaccess -Wno-interference-size -Wno-redundant-move" \ + CPPDEFINES="ELOQ_MODULE_ENABLED" \ --build-dir=#build \ --prefix="$PREFIX" \ --dbg=on \ @@ -151,7 +155,6 @@ launch_eloqdoc() { local bucket_name="$1" local bucket_prefix="$2" echo "launch eloqdoc with bucket name: $bucket_name, bucket prefix: $bucket_prefix" - export LD_PRELOAD=/usr/local/lib/libmimalloc.so mkdir -p "$PREFIX/log" "$PREFIX/data" sed -i "s|rocksdbCloudEndpointUrl: \"http://[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+:[0-9]\+\"|rocksdbCloudEndpointUrl: \"${MINIO_ENDPOINT}\"|g" /home/eloq/workspace/mongo/concourse/scripts/store_rocksdb_cloud.yaml sed -i "s|txlogRocksDBCloudEndpointUrl: \"http://[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+:[0-9]\+\"|txlogRocksDBCloudEndpointUrl: \"${MINIO_ENDPOINT}\"|g" /home/eloq/workspace/mongo/concourse/scripts/store_rocksdb_cloud.yaml @@ -173,7 +176,6 @@ launch_eloqdoc_fast() { local bucket_name="$1" local bucket_prefix="$2" echo "launch eloqdoc fast with bucket name: $bucket_name, bucket prefix: $bucket_prefix" - export LD_PRELOAD=/usr/local/lib/libmimalloc.so mkdir -p "$PREFIX/log" "$PREFIX/data" sed -i "s|rocksdbCloudEndpointUrl: \"http://[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+:[0-9]\+\"|rocksdbCloudEndpointUrl: \"${MINIO_ENDPOINT}\"|g" /home/eloq/workspace/mongo/concourse/scripts/store_rocksdb_cloud.yaml sed -i "s|txlogRocksDBCloudEndpointUrl: \"http://[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+:[0-9]\+\"|txlogRocksDBCloudEndpointUrl: \"${MINIO_ENDPOINT}\"|g" /home/eloq/workspace/mongo/concourse/scripts/store_rocksdb_cloud.yaml diff --git a/concourse/scripts/store_rocksdb_cloud.yaml b/concourse/scripts/store_rocksdb_cloud.yaml index fb990611d7..254ece9631 100644 --- a/concourse/scripts/store_rocksdb_cloud.yaml +++ b/concourse/scripts/store_rocksdb_cloud.yaml @@ -25,6 +25,8 @@ storage: collectActiveTxTsIntervalSec: 2 checkpointerDelaySec: 5 ccProtocol: "OccRead" + enableIOuring: true + txlogAsyncFsync: true # txlogServiceList: ["127.0.0.1:8500"] # txlogGroupReplicaNum: 1 # skipRedoLog: true diff --git a/docs/how-to-compile.md b/docs/how-to-compile.md index 3c2994736d..2e8f114428 100644 --- a/docs/how-to-compile.md +++ b/docs/how-to-compile.md @@ -117,20 +117,3 @@ python scripts/buildscripts/scons.py \ ``` All executable files will be installed to `$INSTALL_PREFIX/bin`, and all libraries will be installed to `$INSTALL_PREFIX/lib`. - -## 3 Adjust load order of libmimalloc and libbrpc - -EloqDoc depends on libmimalloc and libbrpc, and requires them to load before other libraries. - -```bash -patchelf --remove-needed libmimalloc.so.2 $INSTALL_PREFIX/bin/eloqdoc -patchelf --remove-needed libbrpc.so $INSTALL_PREFIX/bin/eloqdoc -patchelf --add-needed libbrpc.so $INSTALL_PREFIX/bin/eloqdoc -patchelf --add-needed libmimalloc.so.2 $INSTALL_PREFIX/bin/eloqdoc -``` - -If you don't adjust load order, then you must set LD_PRELOAD before run EloqDoc. - -```bash -export LD_PRELOAD=/usr/local/lib/libmimalloc.so.2:/usr/lib/libbrpc.so -``` diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 4b052e77dd..b1c4e6cc59 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -64,7 +64,6 @@ env.Library( 'base/init.cpp', 'base/initializer.cpp', 'base/initializer_dependency_graph.cpp', - 'base/local_thread_state.cpp', 'base/make_string_vector.cpp', 'base/parse_number.cpp', 'base/shim.cpp', @@ -365,6 +364,10 @@ mongod = env.Program( 'base', 'mongodmain', ], + SYSLIBDEPS=[ + 'mimalloc', + 'brpc', + ] if env.TargetOSIs('linux') else [], INSTALL_ALIAS=[ 'core', 'default', diff --git a/src/mongo/base/local_thread_state.cpp b/src/mongo/base/local_thread_state.cpp deleted file mode 100644 index de9544f9d9..0000000000 --- a/src/mongo/base/local_thread_state.cpp +++ /dev/null @@ -1,11 +0,0 @@ -#include "mongo/base/local_thread_state.h" - -namespace mongo { - -thread_local int16_t localThreadId = -1; - -std::function, std::function>(int16_t)> - getTxServiceFunctors; - -const CoroutineFunctors CoroutineFunctors::Unavailable{}; -} // namespace mongo diff --git a/src/mongo/base/local_thread_state.h b/src/mongo/base/local_thread_state.h deleted file mode 100644 index eea35178f0..0000000000 --- a/src/mongo/base/local_thread_state.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - - -namespace mongo { - -extern thread_local int16_t localThreadId; - -extern std::function, std::function>(int16_t)> - getTxServiceFunctors; - - -struct CoroutineFunctors { - const std::function* yieldFuncPtr{nullptr}; - const std::function* resumeFuncPtr{nullptr}; - const std::function* longResumeFuncPtr{nullptr}; - const std::function* migrateThreadGroupFuncPtr{nullptr}; - - const static CoroutineFunctors Unavailable; - - friend bool operator==(const CoroutineFunctors& lhs, const CoroutineFunctors& rhs) { - return lhs.yieldFuncPtr == rhs.yieldFuncPtr && lhs.resumeFuncPtr == rhs.resumeFuncPtr && - lhs.longResumeFuncPtr == rhs.longResumeFuncPtr && - lhs.migrateThreadGroupFuncPtr == rhs.migrateThreadGroupFuncPtr; - } - friend bool operator!=(const CoroutineFunctors& lhs, const CoroutineFunctors& rhs) { - return !(lhs == rhs); - } -}; - - -} // namespace mongo diff --git a/src/mongo/base/string_data.h b/src/mongo/base/string_data.h index 4620216d70..3232cae92f 100644 --- a/src/mongo/base/string_data.h +++ b/src/mongo/base/string_data.h @@ -176,7 +176,7 @@ class StringData { std::string toString() const { return std::string(_data, size()); } - std::string_view toStringView() const{ + std::string_view toStringView() const { return std::string_view(_data, size()); } constexpr char operator[](unsigned pos) const { @@ -353,3 +353,12 @@ inline std::string operator+(StringData lhs, std::string rhs) { } } // namespace mongo + +namespace std { +template <> +struct hash { + size_t operator()(const mongo::StringData& sd) const { + return std::hash{}(sd.toStringView()); + } +}; +} // namespace std diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 8dada52bde..9f92e861f5 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -510,6 +510,7 @@ env.Library( 'server_recovery.cpp', 'unclean_shutdown.cpp', 'coro_sync.cpp', + 'local_thread_state.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/logical_session_id', @@ -521,6 +522,7 @@ env.Library( '$BUILD_DIR/mongo/util/net/network', '$BUILD_DIR/mongo/util/periodic_runner', ], + SYSLIBDEPS=["brpc"], ) env.Library( diff --git a/src/mongo/db/catalog/database_holder_impl.cpp b/src/mongo/db/catalog/database_holder_impl.cpp index 846c7597d8..2ddbd531d3 100644 --- a/src/mongo/db/catalog/database_holder_impl.cpp +++ b/src/mongo/db/catalog/database_holder_impl.cpp @@ -51,7 +51,6 @@ #include "mongo/util/log.h" namespace mongo { -extern thread_local int16_t localThreadId; namespace { @@ -99,7 +98,7 @@ Database* DatabaseHolderImpl::get(OperationContext* opCtx, StringData ns) { const StringData db = _todb(ns); invariant(opCtx->lockState()->isDbLockedForMode(db, MODE_IS)); - auto id = static_cast(localThreadId + 1); + auto id = static_cast(LocalThread::ID() + 1); // std::scoped_lock lock{_dbMapMutexVector[id]}; const auto& dbMap = _dbMapVector[id]; if (auto iter = dbMap.find(db); iter != dbMap.end()) { @@ -124,7 +123,7 @@ Database* DatabaseHolderImpl::get(OperationContext* opCtx, StringData ns) { std::set DatabaseHolderImpl::_getNamesWithConflictingCasing_inlock( StringData name) const { std::set duplicates; - auto id = static_cast(localThreadId + 1); + auto id = static_cast(LocalThread::ID() + 1); for (const auto& [dbName, dbPtr] : _dbMapVector[id]) { // A name that's equal with case-insensitive match must be identical, or it's a duplicate. @@ -149,7 +148,7 @@ Database* DatabaseHolderImpl::openDb(OperationContext* opCtx, StringData ns, boo *justCreated = false; // Until proven otherwise. } - auto id = static_cast(localThreadId + 1); + auto id = static_cast(LocalThread::ID() + 1); auto& dbMap = _dbMapVector[id]; // std::scoped_lock lock(_dbMapMutexVector[id]); @@ -223,7 +222,7 @@ void DatabaseHolderImpl::close(OperationContext* opCtx, StringData ns, const std const StringData dbName = _todb(ns); - auto id = static_cast(localThreadId + 1); + auto id = static_cast(LocalThread::ID() + 1); { // std::scoped_lock lock{_dbMapMutexVector[id]}; auto& dbMap = _dbMapVector[id]; @@ -249,7 +248,7 @@ void DatabaseHolderImpl::close(OperationContext* opCtx, StringData ns, const std void DatabaseHolderImpl::closeAll(OperationContext* opCtx, const std::string& reason) { invariant(opCtx->lockState()->isW()); - auto id = static_cast(localThreadId + 1); + auto id = static_cast(LocalThread::ID() + 1); auto& dbMap = _dbMapVector[id]; // std::scoped_lock lock{_dbMapMutexVector[i]}; diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index beb5c9f14c..d90971b0b9 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -39,9 +39,9 @@ #include #include "mongo/base/disallow_copying.h" -#include "mongo/base/local_thread_state.h" #include "mongo/db/client.h" #include "mongo/db/coro_sync.h" +#include "mongo/db/local_thread_state.h" #include "mongo/db/namespace_string.h" #include "mongo/db/service_context.h" #include "mongo/platform/random.h" @@ -52,6 +52,10 @@ #include "mongo/util/invariant.h" #include "mongo/util/net/hostandport.h" +#ifdef D_USE_CORO_SYNC +#include "mongo/db/coro_sync.h" +#endif + namespace mongo { class Collection; diff --git a/src/mongo/db/coro_sync.cpp b/src/mongo/db/coro_sync.cpp index 540c6954a2..dc6d903b90 100644 --- a/src/mongo/db/coro_sync.cpp +++ b/src/mongo/db/coro_sync.cpp @@ -2,16 +2,17 @@ #include "mongo/db/coro_sync.h" #include "mongo/db/client.h" +#include "mongo/db/local_thread_state.h" #include "mongo/util/log.h" namespace mongo { -extern thread_local int16_t localThreadId; +const CoroutineFunctors CoroutineFunctors::Unavailable{}; namespace coro { void Mutex::lock() { - if (localThreadId != -1) { + if (LocalThread::ID() != -1) { Client* client = Client::getCurrent(); if (client) { const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); @@ -24,13 +25,13 @@ void Mutex::lock() { (*coro.yieldFuncPtr)(); } } else { - MONGO_LOG(2) - << "ThreadGroup " << localThreadId + MONGO_LOG(1) + << "ThreadGroup " << LocalThread::ID() << " call std::mutex::lock because the coroutine context is unavailable."; _mux.lock(); } } else { - MONGO_LOG(2) << "ThreadGroup " << localThreadId + MONGO_LOG(1) << "ThreadGroup " << LocalThread::ID() << " call std::mutex::lock because the client object is unavailable."; _mux.lock(); } @@ -41,7 +42,7 @@ void Mutex::lock() { void ConditionVariable::wait(std::unique_lock& lock) { invariant(lock.owns_lock()); - if (localThreadId != -1) { + if (LocalThread::ID() != -1) { Client* client = Client::getCurrent(); if (client) { const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); @@ -51,14 +52,14 @@ void ConditionVariable::wait(std::unique_lock& lock) { (*coro.yieldFuncPtr)(); lock.lock(); } else { - MONGO_LOG(2) << "ThreadGroup " << localThreadId + MONGO_LOG(1) << "ThreadGroup " << LocalThread::ID() << " call std::condition_variable::wait because the coroutine context " "is unavailable."; _cv.wait(reinterpret_cast&>(lock)); } } else { - MONGO_LOG(2) - << "ThreadGroup " << localThreadId + MONGO_LOG(1) + << "ThreadGroup " << LocalThread::ID() << " call std::condition_variable::wait because the client object is unavailable."; _cv.wait(reinterpret_cast&>(lock)); } diff --git a/src/mongo/db/coro_sync.h b/src/mongo/db/coro_sync.h index cc2f071e0c..6c5cb78f9c 100644 --- a/src/mongo/db/coro_sync.h +++ b/src/mongo/db/coro_sync.h @@ -1,11 +1,33 @@ #pragma once #include +#include +#include #include #include "mongo/util/time_support.h" -namespace mongo::coro { +namespace mongo { + +struct CoroutineFunctors { + const std::function* yieldFuncPtr{nullptr}; + const std::function* resumeFuncPtr{nullptr}; + const std::function* longResumeFuncPtr{nullptr}; + const std::function* migrateThreadGroupFuncPtr{nullptr}; + + const static CoroutineFunctors Unavailable; + + friend bool operator==(const CoroutineFunctors& lhs, const CoroutineFunctors& rhs) { + return lhs.yieldFuncPtr == rhs.yieldFuncPtr && lhs.resumeFuncPtr == rhs.resumeFuncPtr && + lhs.longResumeFuncPtr == rhs.longResumeFuncPtr && + lhs.migrateThreadGroupFuncPtr == rhs.migrateThreadGroupFuncPtr; + } + friend bool operator!=(const CoroutineFunctors& lhs, const CoroutineFunctors& rhs) { + return !(lhs == rhs); + } +}; + +namespace coro { /** * coro::Mutex can be used in both pthread/coroutine. * coro::Mutex can be cast to std::mutex. @@ -90,4 +112,5 @@ class ConditionVariable { private: std::condition_variable _cv; }; -} // namespace mongo::coro +} // namespace coro +} // namespace mongo diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index 1f1cd3a4b5..8419f61b5e 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -77,6 +77,7 @@ struct CoroCtx { static constexpr size_t kCoroStackSize = 3200 * 1024; boost::context::protected_fixedsize_stack salloc{kCoroStackSize}; boost::context::continuation source; + std::function resumeTask; std::function yieldFunc; std::function resumeFunc; std::function longResumeFunc; @@ -109,23 +110,23 @@ void killAllExpiredTransactions(OperationContext* opCtx) { transport::ServiceExecutor* serviceExecutor = getGlobalServiceContext()->getServiceEntryPoint()->getServiceExecutor(); - std::function resumeTask = [&source = coroCtx->source, &client] { + coroCtx->resumeTask = [&source = coroCtx->source, &client] { log() << "abortArbitraryTransactionIfExpired call resume."; Client::setCurrent(std::move(client)); source = source.resume(); }; - coroCtx->resumeFunc = - serviceExecutor->coroutineResumeFunctor(session->ThreadGroupId(), resumeTask); - coroCtx->longResumeFunc = - serviceExecutor->coroutineLongResumeFunctor(session->ThreadGroupId(), resumeTask); + coroCtx->resumeFunc = serviceExecutor->coroutineResumeFunctor(session->ThreadGroupId(), + coroCtx->resumeTask); + coroCtx->longResumeFunc = serviceExecutor->coroutineLongResumeFunctor( + session->ThreadGroupId(), coroCtx->resumeTask); - auto task = [&finished, &mux, &cv, coroCtx, opCtx, session, &client] { + auto task = [&finished, &mux, &cv, coroCtx, opCtx, session, &client, serviceExecutor] { Client::setCurrent(std::move(client)); coroCtx->source = boost::context::callcc( std::allocator_arg, coroCtx->salloc, - [&finished, &mux, &cv, coroCtx, opCtx, session, &client]( + [&finished, &mux, &cv, coroCtx, opCtx, session, &client, serviceExecutor]( boost::context::continuation&& sink) { coroCtx->yieldFunc = [&sink, &client]() { log() << "abortArbitraryTransactionIfExpired call yield."; @@ -135,6 +136,12 @@ void killAllExpiredTransactions(OperationContext* opCtx) { std::unique_lock lk(mux); try { + serviceExecutor->ongoingCoroutineCountUpdate(session->ThreadGroupId(), + +1); + const auto finally = MakeGuard([session, serviceExecutor] { + serviceExecutor->ongoingCoroutineCountUpdate( + session->ThreadGroupId(), -1); + }); session->abortArbitraryTransactionIfExpired(opCtx); } catch (const DBException& ex) { const Status& status = ex.toStatus(); diff --git a/src/mongo/db/local_thread_state.cpp b/src/mongo/db/local_thread_state.cpp new file mode 100644 index 0000000000..b18ec8eeaf --- /dev/null +++ b/src/mongo/db/local_thread_state.cpp @@ -0,0 +1,44 @@ +#include "mongo/db/local_thread_state.h" + +#ifndef ELOQ_MODULE_ENABLED +namespace { +thread_local int16_t localThreadId = -1; +} +#else +#include +namespace bthread { +extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; +} // namespace bthread +#endif + +namespace mongo { + +std::function, std::function>(int16_t)> + getTxServiceFunctors; + +int16_t LocalThread::ID() { +#ifndef ELOQ_MODULE_ENABLED + return localThreadId; +#else + if (bthread::tls_task_group) { + return bthread::tls_task_group->group_id_; + } else { + return -1; + } +#endif +} + +#ifndef ELOQ_MODULE_ENABLED +void LocalThread::SetID(int16_t id) { + localThreadId = id; +} +#endif + +bool LocalThread::IsBThread() { +#ifndef ELOQ_MODULE_ENABLED + return false; +#else + return bthread::tls_task_group != nullptr; +#endif +} +} // namespace mongo diff --git a/src/mongo/db/local_thread_state.h b/src/mongo/db/local_thread_state.h new file mode 100644 index 0000000000..585cd8b119 --- /dev/null +++ b/src/mongo/db/local_thread_state.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace mongo { + +extern std::function, std::function>(int16_t)> + getTxServiceFunctors; + +struct LocalThread { + static int16_t ID(); +#ifndef ELOQ_MODULE_ENABLED + static void SetID(int16_t id); +#endif + static bool IsBThread(); +}; + +} // namespace mongo diff --git a/src/mongo/db/modules/eloq/CMakeLists.txt b/src/mongo/db/modules/eloq/CMakeLists.txt index 114cc366bc..8fb93af00a 100644 --- a/src/mongo/db/modules/eloq/CMakeLists.txt +++ b/src/mongo/db/modules/eloq/CMakeLists.txt @@ -20,6 +20,10 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fdiagnostics-color=always") option(EXT_TX_PROC_ENABLED "Allows external threads to move forward the tx service." ON) option(FORK_HM_PROCESS "Whether to fork a separate Host Manager process for Raft." OFF) option(STATISTICS "Whether to enable table statistics collection in the tx service." ON) +option(ELOQ_MODULE_ENABLED "Enable EloqModule" OFF) +if (ELOQ_MODULE_ENABLED) + add_definitions(-DELOQ_MODULE_ENABLED) +endif() # --- ELOQ Metrics Options --- option(ENABLE_BENCHMARK "Enable Google Benchmark for eloq_metrics." OFF) diff --git a/src/mongo/db/modules/eloq/SConscript b/src/mongo/db/modules/eloq/SConscript index dc85f1ab4d..021b14a5d9 100644 --- a/src/mongo/db/modules/eloq/SConscript +++ b/src/mongo/db/modules/eloq/SConscript @@ -189,12 +189,14 @@ base_deps = [ # FindLibPath("absl_bad_variant_access"), # FindLibPath("absl_bad_optional_access"), # FindLibPath("absl_str_format_internal"), + # + # mimalloc and brpc are also linked to eloqdoc in mongo/SConscript. FindLibPath("mimalloc"), + FindLibPath("brpc"), + FindLibPath("braft"), FindLibPath("gflags"), FindLibPath("protobuf"), FindLibPath("glog"), - FindLibPath("brpc"), - FindLibPath("braft"), # FindLibPath("tcmalloc_and_profiler"), ] diff --git a/src/mongo/db/modules/eloq/src/base/eloq_util.h b/src/mongo/db/modules/eloq/src/base/eloq_util.h index b466119d72..06a520c5d2 100644 --- a/src/mongo/db/modules/eloq/src/base/eloq_util.h +++ b/src/mongo/db/modules/eloq/src/base/eloq_util.h @@ -42,8 +42,10 @@ namespace txservice {} // namespace txservice namespace Eloq { extern std::unique_ptr storeHandler; -inline bool GetAllTables(std::vector& tables) { - bool success = Eloq::storeHandler->DiscoverAllTableNames(tables); +inline bool GetAllTables(std::vector& tables, + const std::function* yieldFuncPtr, + const std::function* resumeFuncPtr) { + bool success = Eloq::storeHandler->DiscoverAllTableNames(tables, yieldFuncPtr, resumeFuncPtr); if (!success) { return false; } diff --git a/src/mongo/db/modules/eloq/src/eloq_global_options.cpp b/src/mongo/db/modules/eloq/src/eloq_global_options.cpp index 70f1ee7964..6763db48b6 100644 --- a/src/mongo/db/modules/eloq/src/eloq_global_options.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_global_options.cpp @@ -161,6 +161,12 @@ Status EloqGlobalOptions::add(moe::OptionSection* options) { moe::Bool, "Enable heap defragment.") .setDefault(moe::Value(false)); + eloqOptions + .addOptionChaining("storage.eloq.txService.enableIOuring", + "eloqEnableIOURing", + moe::Bool, + "Enable io_uring") + .setDefault(moe::Value(false)); eloqOptions .addOptionChaining("storage.eloq.txService.nodeGroupReplicaNum", "eloqNodeGroupReplicaNum", @@ -169,6 +175,12 @@ Status EloqGlobalOptions::add(moe::OptionSection* options) { .setDefault(moe::Value(3)); // txlog + eloqOptions + .addOptionChaining("storage.eloq.txService.txlogAsyncFsync", + "eloqTxlogAsyncFsync", + moe::Bool, + "Enable async fsync with io_uring") + .setDefault(moe::Value(false)); eloqOptions .addOptionChaining( "storage.eloq.txService.txlogRocksDBStoragePath", @@ -777,12 +789,19 @@ Status EloqGlobalOptions::store(const moe::Environment& params, eloqGlobalOptions.enableHeapDefragment = params["storage.eloq.txService.enableHeapDefragment"].as(); } + if (params.count("storage.eloq.txService.enableIOuring")) { + eloqGlobalOptions.enableIOuring = params["storage.eloq.txService.enableIOuring"].as(); + } if (params.count("storage.eloq.txService.nodeGroupReplicaNum")) { eloqGlobalOptions.nodeGroupReplicaNum = params["storage.eloq.txService.nodeGroupReplicaNum"].as(); } // txlog + if (params.count("storage.eloq.txService.txlogAsyncFsync")) { + eloqGlobalOptions.raftlogAsyncFsync = + params["storage.eloq.txService.txlogAsyncFsync"].as(); + } if (params.count("storage.eloq.txService.txlogRocksDBStoragePath")) { eloqGlobalOptions.txlogRocksDBStoragePath = params["storage.eloq.txService.txlogRocksDBStoragePath"].as(); diff --git a/src/mongo/db/modules/eloq/src/eloq_global_options.h b/src/mongo/db/modules/eloq/src/eloq_global_options.h index ecac4108dd..7bcbccd198 100644 --- a/src/mongo/db/modules/eloq/src/eloq_global_options.h +++ b/src/mongo/db/modules/eloq/src/eloq_global_options.h @@ -65,6 +65,8 @@ class EloqGlobalOptions { bool kickoutDataForTest{false}; bool realtimeSampling{true}; bool enableHeapDefragment{false}; + bool enableIOuring{false}; + bool raftlogAsyncFsync{false}; // txlog std::string txlogRocksDBStoragePath; diff --git a/src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp b/src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp index 4464de73c6..16371f12b9 100644 --- a/src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp @@ -271,6 +271,21 @@ bool EloqKVEngine::InitMetricsRegistry() { } EloqKVEngine::EloqKVEngine(const std::string& path) : _dbPath(path) { +#ifdef ELOQ_MODULE_ENABLED + GFLAGS_NAMESPACE::SetCommandLineOption("use_pthread_event_dispatcher", "true"); + GFLAGS_NAMESPACE::SetCommandLineOption("worker_polling_time_us", "100000"); // 100ms +#endif + if (!eloqGlobalOptions.enableIOuring && eloqGlobalOptions.raftlogAsyncFsync) { + const char* errmsg = + "Invalid config: when set txlogAsyncFsync, should also set enableIOuring."; + error() << errmsg; + uasserted(ErrorCodes::InvalidOptions, errmsg); + } + GFLAGS_NAMESPACE::SetCommandLineOption("use_io_uring", + eloqGlobalOptions.enableIOuring ? "true" : "false"); + GFLAGS_NAMESPACE::SetCommandLineOption("raft_use_bthread_fsync", + eloqGlobalOptions.raftlogAsyncFsync ? "true" : "false"); + #if (defined(DATA_STORE_TYPE_DYNAMODB) || defined(LOG_STATE_TYPE_RKDB_S3) || \ defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3)) awsInit(); @@ -333,7 +348,7 @@ EloqKVEngine::EloqKVEngine(const std::string& path) : _dbPath(path) { eloqGlobalOptions.nodeGroupReplicaNum, 0); if (!parse_res) { - LOG(ERROR) << "Failed to extract cluster configs from ip_port_list."; + error() << "Failed to extract cluster configs from ip_port_list."; uasserted(ErrorCodes::InvalidOptions, "Failed to extract cluster configs from ip_port_list."); } @@ -829,8 +844,9 @@ void EloqKVEngine::listDatabases(std::vector& out) const { MONGO_LOG(1) << "EloqKVEngine::listDatabases"; std::vector tables; - // bool success = Eloq::storeHandler->DiscoverAllTableNames(tables); - bool success = Eloq::GetAllTables(tables); + + const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); + bool success = Eloq::GetAllTables(tables, coro.yieldFuncPtr, coro.resumeFuncPtr); if (!success) { error() << "Failed to discover table names."; uassertStatusOK(Status{ErrorCodes::InternalError, "Failed to discover collection names."}); @@ -862,8 +878,9 @@ bool EloqKVEngine::databaseExists(std::string_view dbName) const { << ". dbName: " << dbName; std::vector tables; - // Eloq::storeHandler->DiscoverAllTableNames(tables); - bool success = Eloq::GetAllTables(tables); + + const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); + bool success = Eloq::GetAllTables(tables, coro.yieldFuncPtr, coro.resumeFuncPtr); if (!success) { error() << "Failed to discover collection names."; @@ -881,8 +898,9 @@ void EloqKVEngine::listCollections(std::string_view dbName, std::vector allCollections; - // Eloq::storeHandler->DiscoverAllTableNames(allCollections); - bool success = Eloq::GetAllTables(allCollections); + + const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); + bool success = Eloq::GetAllTables(allCollections, coro.yieldFuncPtr, coro.resumeFuncPtr); if (!success) { error() << "Failed to discover collection names."; @@ -904,8 +922,9 @@ void EloqKVEngine::listCollections(std::string_view dbName, std::set allCollections; - // Eloq::storeHandler->DiscoverAllTableNames(allCollections); - bool success = Eloq::GetAllTables(allCollections); + + const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); + bool success = Eloq::GetAllTables(allCollections, coro.yieldFuncPtr, coro.resumeFuncPtr); if (!success) { error() << "Failed to discover collection names."; @@ -1129,8 +1148,9 @@ std::vector EloqKVEngine::getAllIdents(OperationContext* opCtx) con std::vector all; std::vector tableNameVector; - // Eloq::storeHandler->DiscoverAllTableNames(tableNameVector); - bool success = Eloq::GetAllTables(tableNameVector); + + const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); + bool success = Eloq::GetAllTables(tableNameVector, coro.yieldFuncPtr, coro.resumeFuncPtr); if (!success) { error() << "Failed to discover collection names."; @@ -1189,7 +1209,7 @@ std::vector EloqKVEngine::getAllIdents(OperationContext* opCtx) con void EloqKVEngine::cleanShutdown() { MONGO_LOG(0) << "EloqKVEngine::cleanShutdown"; - _txService->Shutdown(); + shutdownTxService(); Eloq::storeHandler.reset(); Eloq::dataStoreService.reset(); @@ -1201,6 +1221,27 @@ void EloqKVEngine::cleanShutdown() { _txService.reset(); } +void EloqKVEngine::shutdownTxService() { +#ifndef ELOQ_MODULE_ENABLED + _txService->Shutdown(); +#else + // 1.When merged into ConvergedDB, `_txService->Shutdown()` should be moved out. + // 2.eloq::unregister_module is not allowed to called in a brpc-worker thread. + bool done = false; + coro::Mutex mux; + coro::ConditionVariable cv; + std::thread thd([this, &done, &mux, &cv]() { + std::unique_lock lk(mux); + _txService->Shutdown(); + done = true; + cv.notify_one(); + }); + thd.detach(); + std::unique_lock lk(mux); + cv.wait(lk, [&done]() { return done; }); +#endif +} + void EloqKVEngine::setJournalListener(JournalListener* jl) { // } diff --git a/src/mongo/db/modules/eloq/src/eloq_kv_engine.h b/src/mongo/db/modules/eloq/src/eloq_kv_engine.h index 54daf3af67..4c2e540546 100644 --- a/src/mongo/db/modules/eloq/src/eloq_kv_engine.h +++ b/src/mongo/db/modules/eloq/src/eloq_kv_engine.h @@ -187,6 +187,9 @@ class EloqKVEngine final : public KVEngine { private: bool InitMetricsRegistry(); + void shutdownTxService(); + +private: std::unique_ptr _txService; std::unique_ptr _logServer; Eloq::MongoCatalogFactory _catalogFactory; diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index 6013e6fd42..bb215db6e2 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -68,8 +68,9 @@ class EloqCatalogRecordStoreCursor : public SeekableRecordCursor { : _ru{EloqRecoveryUnit::get(opCtx)} { MONGO_LOG(1) << "EloqCatalogRecordStoreCursor::EloqCatalogRecordStoreCursor"; // always do full table scan - // Eloq::storeHandler->DiscoverAllTableNames(_tableNameVector); - Eloq::GetAllTables(_tableNameVector); + + const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); + Eloq::GetAllTables(_tableNameVector, coro.yieldFuncPtr, coro.resumeFuncPtr); std::string output; for (const auto& name : _tableNameVector) { output.append(name).append("|"); @@ -285,40 +286,18 @@ StatusWith EloqCatalogRecordStore::insertRecord( txservice::CatalogKey catalogKey{tableName}; txservice::CatalogRecord catalogRecord; - // for (uint16_t i = 1; i < kMaxRetryLimit; ++i) { - // auto [exist, errorCode] = ru->readCatalog(catalogKey, catalogRecord, true); - // if (errorCode != txservice::TxErrorCode::NO_ERROR) { - // MONGO_LOG(1) << "Eloq readCatalog error with write intent. Another transaction " - // "may do DDL on the same table."; - // } else { - // if (exist) { - // const char* msg = "Collection already exists in Eloq storage engine"; - // warning() << msg << ", ns: " << tableName.StringView(); - // return {ErrorCodes::NamespaceExists, msg}; - // } - - // auto status = ru->createTable(tableName, metadata); - // if (status.isOK()) { - // return {recordId}; - // } - // } - - // mongo::Milliseconds duration{uniformDist(randomEngine)}; - // MONGO_LOG(1) << "Fail to create table in Eloq. Sleep for " << duration.count() << "ms"; - // opCtx->sleepFor(duration); - // MONGO_LOG(1) << "Retry count: " << i; - // catalogRecord.Reset(); - // } - - // return {ErrorCodes::InternalError, - // "[Create Table] opertion reaches the maximum number of retries."}; - auto [exist, errorCode] = ru->readCatalog(catalogKey, catalogRecord, true); if (errorCode != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "Eloq readCatalog error with write intent. Another transaction " - "may do DDL on the same table."; - return {ErrorCodes::InternalError, - "[Create Table] Another transaction may do DDL on the same table"}; + if (errorCode == txservice::TxErrorCode::WRITE_WRITE_CONFLICT) { + MONGO_LOG(1) << "Eloq readCatalog error with write intent. Another transaction " + "may do DDL on the same table."; + return {ErrorCodes::WriteConflict, + "[Create Table] Another transaction may do DDL on the same table"}; + } else { + MONGO_LOG(0) << "Eloq readCatalog error with write intent." + << txservice::TxErrorMessage(errorCode); + return {ErrorCodes::InternalError, txservice::TxErrorMessage(errorCode)}; + } } else { if (exist) { const char* msg = "Collection already exists in Eloq storage engine"; @@ -422,8 +401,9 @@ std::unique_ptr EloqCatalogRecordStore::getCursor(Operatio void EloqCatalogRecordStore::getAllCollections(std::vector& collections) const { MONGO_LOG(1) << "EloqCatalogRecordStore::getAllCollections"; - // Eloq::storeHandler->DiscoverAllTableNames(collections); - Eloq::GetAllTables(collections); + + const CoroutineFunctors& coro = Client::getCurrent()->coroutineFunctors(); + Eloq::GetAllTables(collections, coro.yieldFuncPtr, coro.resumeFuncPtr); std::string output; for (const auto& name : collections) { output.append(name).append("|"); diff --git a/src/mongo/db/modules/eloq/src/eloq_recovery_unit.cpp b/src/mongo/db/modules/eloq/src/eloq_recovery_unit.cpp index db4e66da0c..4b1a36fb45 100644 --- a/src/mongo/db/modules/eloq/src/eloq_recovery_unit.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_recovery_unit.cpp @@ -61,8 +61,6 @@ extern std::unique_ptr storeHandler; namespace mongo { -extern thread_local int16_t localThreadId; - namespace { std::atomic nextSnapshotId{1}; @@ -544,7 +542,7 @@ Status EloqRecoveryUnit::createTable(const txservice::TableName& tableName, case txservice::UpsertResult::Failed: MONGO_LOG(1) << "UpsertTableTxRequest error. UpsertTableOp on multiple nodes at the " "same time may conflict and then backoff."; - return {ErrorCodes::Error::InternalError, upsertTableTxReq.ErrorMsg()}; + return {ErrorCodes::Error::WriteConflict, upsertTableTxReq.ErrorMsg()}; break; case txservice::UpsertResult::Unverified: MONGO_LOG(1) @@ -927,7 +925,7 @@ void EloqRecoveryUnit::_txnOpen(txservice::IsolationLevel isolationLevel) { } MONGO_LOG(1) << "Opening transaction with isolation level: " << isolationLevel; _txm = txservice::NewTxInit( - _txService, isolationLevel, eloqGlobalOptions.ccProtocol, UINT32_MAX, localThreadId); + _txService, isolationLevel, eloqGlobalOptions.ccProtocol, UINT32_MAX, LocalThread::ID()); _active = true; } diff --git a/src/mongo/db/server_options_server_helpers.cpp b/src/mongo/db/server_options_server_helpers.cpp index d2e134ce21..6150872b99 100644 --- a/src/mongo/db/server_options_server_helpers.cpp +++ b/src/mongo/db/server_options_server_helpers.cpp @@ -595,7 +595,8 @@ Status storeServerOptions(const moe::Environment& params) { if (params.count("storage.eloq.bootstrap")) { serverGlobalParams.bootstrap = params["storage.eloq.bootstrap"].as(); if (serverGlobalParams.bootstrap) { - log() << "This is a bootstrap for EloqDoc. The program will automatically exit after bootstrap."; + log() << "This is a bootstrap for EloqDoc. The program will automatically exit after " + "bootstrap."; } } diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 6717eda15d..9f8a6b3d5c 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -33,8 +33,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/global_initializer_registerer.h" -#include "mongo/base/local_thread_state.h" #include "mongo/db/coro_sync.h" +#include "mongo/db/local_thread_state.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/platform/atomic_word.h" diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index f199cbb503..984ee7e4e2 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -894,7 +894,7 @@ void Session::abortArbitraryTransactionIfExpired(OperationContext* opCtx) { << _sessionId.getId() << " because it has been running for longer than 'transactionLifetimeLimitSeconds'"; - invariant(localThreadId != -1); + invariant(LocalThread::ID() != -1); opCtx->setLogicalSessionId(_sessionId); opCtx->setTxnNumber(_activeTxnNumber); if (_txnResourceStash) { diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 45ee2f6d9f..c808a9779b 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -55,8 +55,6 @@ const auto operationSessionDecoration = } // namespace -extern thread_local int16_t localThreadId; - void SessionCatalog::reset() { _txnTable.clear(); } @@ -133,8 +131,8 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) stdx::unique_lock ul(_mutex); - invariant(localThreadId >= 0); - uint16_t threadGroupId = localThreadId; + invariant(LocalThread::ID() >= 0); + uint16_t threadGroupId = LocalThread::ID(); auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid, threadGroupId); // Wait until the session is no longer checked out @@ -170,8 +168,8 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, auto ss = [&] { stdx::unique_lock ul(_mutex); - invariant(localThreadId >= 0); - uint16_t threadGroupId = localThreadId; + invariant(LocalThread::ID() >= 0); + uint16_t threadGroupId = LocalThread::ID(); return ScopedSession(_getOrCreateSessionRuntimeInfo(ul, opCtx, lsid, threadGroupId)); }(); diff --git a/src/mongo/db/stats/top.cpp b/src/mongo/db/stats/top.cpp index c8159513fb..cdde04dc1e 100644 --- a/src/mongo/db/stats/top.cpp +++ b/src/mongo/db/stats/top.cpp @@ -48,8 +48,6 @@ using std::string; using std::stringstream; using std::vector; -extern thread_local int16_t localThreadId; - namespace { const auto getTop = ServiceContext::declareDecoration(); @@ -98,7 +96,7 @@ void Top::record(OperationContext* opCtx, return; } - int16_t id = localThreadId + 1; + int16_t id = LocalThread::ID() + 1; std::scoped_lock lock(_usageMutexVector[id]); CollectionData& coll = _usageVector[id][hashedNs]; @@ -229,7 +227,7 @@ void Top::incrementGlobalLatencyStats(OperationContext* opCtx, Command::ReadWriteType readWriteType) { // stdx::lock_guard guard(_lock); // MONGO_UNREACHABLE; - int16_t id = localThreadId + 1; + int16_t id = LocalThread::ID() + 1; std::scoped_lock lk(_histogramMutexVector[id]); _incrementHistogram(opCtx, latency, &_histogramVector[id], readWriteType); } @@ -248,7 +246,7 @@ void Top::appendGlobalLatencyStats(bool includeHistograms, BSONObjBuilder* build void Top::incrementGlobalTransactionLatencyStats(uint64_t latency) { // stdx::lock_guard guard(_lock); // MONGO_UNREACHABLE; - int16_t id = localThreadId + 1; + int16_t id = LocalThread::ID() + 1; std::scoped_lock lk(_histogramMutexVector[id]); _histogramVector[id].increment(latency, Command::ReadWriteType::kTransaction); } diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp index 530770af50..d9623f23d9 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp @@ -55,10 +55,6 @@ namespace mongo { -// using std::string; -// using std::vector; -extern thread_local int16_t localThreadId; - namespace { const std::string catalogInfo = "_mdb_catalog"; const auto kCatalogLogLevel = logger::LogSeverity::Debug(2); @@ -544,7 +540,7 @@ void KVStorageEngine::listCollections(std::string_view dbName, std::set lk(_dbsLock); - auto id = static_cast(localThreadId + 1); + auto id = static_cast(LocalThread::ID() + 1); auto& dbMap = _dbMapVector[id]; if (auto iter = dbMap.find(dbName); iter == dbMap.end()) { auto [newIter, _] = @@ -577,7 +573,7 @@ Status KVStorageEngine::dropDatabase(OperationContext* opCtx, StringData db) { // entry = it->second; // } - auto id = static_cast(localThreadId + 1); + auto id = static_cast(LocalThread::ID() + 1); auto& dbMap = _dbMapVector[id]; KVDatabaseCatalogEntryBase* entry{nullptr}; if (auto iter = dbMap.find(db); iter == dbMap.end()) { diff --git a/src/mongo/scripting/mozjs/proxyscope.cpp b/src/mongo/scripting/mozjs/proxyscope.cpp index 89c5bdfbf8..6b13874a1c 100644 --- a/src/mongo/scripting/mozjs/proxyscope.cpp +++ b/src/mongo/scripting/mozjs/proxyscope.cpp @@ -41,10 +41,6 @@ #include "mongo/util/scopeguard.h" namespace mongo { -extern thread_local int16_t localThreadId; -extern std::function, std::function>(int16_t)> - getTxServiceFunctors; - namespace mozjs { MozJSProxyScope::MozJSProxyScope(MozJSScriptEngine* engine) @@ -283,23 +279,14 @@ void MozJSProxyScope::run(Closure&& closure) { } void MozJSProxyScope::runOnImplThread(stdx::function f) { - stdx::unique_lock lk(_mutex); + stdx::unique_lock lk(_mutex); _function = std::move(f); invariant(_state == State::Idle); _state = State::ProxyRequest; _condvar.notify_one(); - if (localThreadId != -1) { - // Run in a Mongo server - auto [txProcessorExec, updateExtProc] = getTxServiceFunctors(localThreadId); - updateExtProc(-1); - _condvar.wait(lk, [this] { return _state == State::ImplResponse; }); - updateExtProc(1); - } else { - // Run in a Mongo shell client - _condvar.wait(lk, [this] { return _state == State::ImplResponse; }); - } + _condvar.wait(lk, [this] { return _state == State::ImplResponse; }); _state = State::Idle; // Clear the _status state and throw it if necessary @@ -313,7 +300,7 @@ void MozJSProxyScope::runOnImplThread(stdx::function f) { void MozJSProxyScope::shutdownThread() { { - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_mutex); invariant(_state == State::Idle); @@ -360,7 +347,7 @@ void MozJSProxyScope::implThread(void* arg) { const auto unbindImplScope = MakeGuard([&proxy] { proxy->_implScope = nullptr; }); while (true) { - stdx::unique_lock lk(proxy->_mutex); + stdx::unique_lock lk(proxy->_mutex); { MONGO_IDLE_THREAD_BLOCK; proxy->_condvar.wait(lk, [proxy] { diff --git a/src/mongo/scripting/mozjs/proxyscope.h b/src/mongo/scripting/mozjs/proxyscope.h index 1e3fe69999..46ca525261 100644 --- a/src/mongo/scripting/mozjs/proxyscope.h +++ b/src/mongo/scripting/mozjs/proxyscope.h @@ -32,11 +32,18 @@ #include "mongo/client/dbclientcursor.h" #include "mongo/scripting/mozjs/engine.h" -#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" + #include "mongo/stdx/thread.h" +#ifndef D_USE_CORO_SYNC +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#else +#include "mongo/db/coro_sync.h" +#endif + + namespace mongo { namespace mozjs { @@ -194,12 +201,17 @@ class MozJSProxyScope final : public Scope { * This mutex protects _function, _state and _status as channels for * function invocation and exception handling */ +#ifndef D_USE_CORO_SYNC stdx::mutex _mutex; + stdx::condition_variable _condvar; +#else + coro::Mutex _mutex; + coro::ConditionVariable _condvar; +#endif stdx::function _function; State _state; Status _status; - stdx::condition_variable _condvar; PRThread* _thread; }; diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 28b717c8b4..c824bfd150 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -125,6 +125,7 @@ tlEnv.Library( '$BUILD_DIR/third_party/shim_asio', 'transport_layer_common', ], + SYSLIBDEPS=["brpc"], ) tlEnv.CppUnitTest( diff --git a/src/mongo/transport/service_executor_coroutine.cpp b/src/mongo/transport/service_executor_coroutine.cpp index f4386d57d0..58ecdec3ef 100644 --- a/src/mongo/transport/service_executor_coroutine.cpp +++ b/src/mongo/transport/service_executor_coroutine.cpp @@ -7,6 +7,7 @@ #include #include "mongo/base/string_data.h" +#include "mongo/db/local_thread_state.h" #include "mongo/db/server_parameters.h" #include "mongo/transport/service_entry_point_utils.h" #include "mongo/transport/service_executor_coroutine.h" @@ -14,13 +15,14 @@ #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/log.h" +#include + #ifndef EXT_TX_PROC_ENABLED #define EXT_TX_PROC_ENABLED #endif namespace mongo { -extern thread_local int16_t localThreadId; extern std::function, std::function>(int16_t)> getTxServiceFunctors; @@ -38,37 +40,86 @@ namespace transport { // constexpr auto kStartingThreads = "startingThreads"_sd; // } // namespace +#ifdef ELOQ_MODULE_ENABLED +void MongoModule::ExtThdStart(int thd_id) { + MONGO_LOG(3) << "MongoModule::ExtThdStart " << thd_id; + ThreadGroup& threadGroup = _threadGroups[thd_id]; + invariant(threadGroup._threadGroupId == thd_id); + if (!threadGroup._threadNameSet) { + setThreadName(str::stream() << "thread_group_" << thd_id); + threadGroup._threadNameSet = true; + } + threadGroup._extWorkerActive.store(true, std::memory_order_release); +} + +void MongoModule::ExtThdEnd(int thd_id) { + MONGO_LOG(3) << "MongoModule::ExtThdEnd " << thd_id; + ThreadGroup& threadGroup = _threadGroups[thd_id]; + invariant(threadGroup._threadGroupId == thd_id); + threadGroup._extWorkerActive.store(false, std::memory_order_release); +} -void ThreadGroup::enqueueTask(Task task) { - _taskQueueSize.fetch_add(1, std::memory_order_relaxed); - _taskQueue.enqueue(std::move(task)); +void MongoModule::Process(int thd_id) { + MONGO_LOG(3) << "MongoModule::Process " << thd_id; + ThreadGroup& threadGroup = _threadGroups[thd_id]; + invariant(threadGroup._threadGroupId == thd_id); + size_t cnt = 0; + // process resume task + cnt = threadGroup._resumeQueue.TryDequeueBulk( + std::make_move_iterator(threadGroup._taskBulk.begin()), threadGroup._taskBulk.size()); + for (size_t i = 0; i < cnt; ++i) { + threadGroup._taskBulk[i](); + } + + // process normal task + if (cnt < threadGroup._taskBulk.size()) { + cnt = threadGroup._taskQueue.TryDequeueBulk( + std::make_move_iterator(threadGroup._taskBulk.begin()), threadGroup._taskBulk.size()); + for (size_t i = 0; i < cnt; ++i) { + threadGroup._taskBulk[i](); + } + } +} + +bool MongoModule::HasTask(int thd_id) const { + ThreadGroup& threadGroup = _threadGroups[thd_id]; + invariant(threadGroup._threadGroupId == thd_id); + return threadGroup.isBusy(); +} +#endif +void ThreadGroup::enqueueTask(Task task) { + _taskQueue.Enqueue(std::move(task)); notifyIfAsleep(); } void ThreadGroup::resumeTask(Task task) { - _resumeQueueSize.fetch_add(1, std::memory_order_relaxed); - _resumeQueue.enqueue(std::move(task)); - + _resumeQueue.Enqueue(std::move(task)); notifyIfAsleep(); } void ThreadGroup::notifyIfAsleep() { +#ifndef ELOQ_MODULE_ENABLED if (_isSleep.load(std::memory_order_relaxed)) { std::unique_lock lk(_sleepMutex); _sleepCV.notify_one(); } +#else + if (!_extWorkerActive.load(std::memory_order_relaxed)) { + MongoModule::Instance()->NotifyWorker(_threadGroupId); + } +#endif } -void ThreadGroup::setTxServiceFunctors(int16_t id) { - std::tie(_txProcessorExec, _updateExtProc) = getTxServiceFunctors(id); +void ThreadGroup::setTxServiceFunctors() { + std::tie(_txProcessorExec, _updateExtProc) = getTxServiceFunctors(_threadGroupId); } bool ThreadGroup::isBusy() const { - return (_ongoingCoroutineCnt > 0) || (_taskQueueSize.load(std::memory_order_relaxed) > 0) || - (_resumeQueueSize.load(std::memory_order_relaxed) > 0); + return _ongoingCoroutineCnt > 0 || !_taskQueue.IsEmpty() || !_resumeQueue.IsEmpty(); } +#ifndef ELOQ_MODULE_ENABLED void ThreadGroup::trySleep() { // If there are tasks in the , does not sleep. // if (isBusy()) { @@ -116,32 +167,35 @@ void ThreadGroup::terminate() { std::unique_lock lk(_sleepMutex); _sleepCV.notify_one(); } - - -// thread_local std::deque ServiceExecutorCoroutine::_localWorkQueue = {}; -// thread_local int ServiceExecutorCoroutine::_localRecursionDepth = 0; -// thread_local int64_t ServiceExecutorCoroutine::_localThreadIdleCounter = 0; +#endif ServiceExecutorCoroutine::ServiceExecutorCoroutine(ServiceContext* ctx, size_t reservedThreads) - : _reservedThreads(reservedThreads), _threadGroups(reservedThreads) {} + : _reservedThreads(reservedThreads), _threadGroups(reservedThreads) { + bthread_setconcurrency(reservedThreads); + for (int16_t thdGroupId = 0; thdGroupId < reservedThreads; ++thdGroupId) { + _threadGroups[thdGroupId].setThreadGroupID(thdGroupId); + } +} Status ServiceExecutorCoroutine::start() { MONGO_LOG(0) << "ServiceExecutorCoroutine::start"; - { - // stdx::unique_lock lk(_mutex); - _stillRunning.store(true, std::memory_order_release); - } - +#ifndef ELOQ_MODULE_ENABLED for (size_t i = 0; i < _reservedThreads; i++) { auto status = _startWorker(static_cast(i)); if (!status.isOK()) { return status; } } - +#else + MongoModule::Instance()->Init(_threadGroups.data()); + int rc = eloq::register_module(MongoModule::Instance()); + invariant(rc == 0); +#endif + _stillRunning.store(true, std::memory_order_release); return Status::OK(); } +#ifndef ELOQ_MODULE_ENABLED Status ServiceExecutorCoroutine::_startWorker(int16_t groupId) { MONGO_LOG(0) << "Starting new worker thread for " << _name << " service executor. " << " group id: " << groupId; @@ -149,12 +203,8 @@ Status ServiceExecutorCoroutine::_startWorker(int16_t groupId) { return launchServiceWorkerThread([this, threadGroupId = groupId] { while (!_stillRunning.load(std::memory_order_acquire)) { } - localThreadId = threadGroupId; - - std::string threadName("thread_group_" + std::to_string(threadGroupId)); - // std::string threadName("thread_group"); - StringData threadNameSD(threadName); - setThreadName(threadNameSD); + LocalThread::SetID(threadGroupId); + setThreadName(str::stream() << "thread_group_" << threadGroupId); // std::unique_lock lk(_mutex); // _numRunningWorkerThreads.addAndFetch(1); @@ -167,13 +217,12 @@ Status ServiceExecutorCoroutine::_startWorker(int16_t groupId) { ThreadGroup& threadGroup = _threadGroups[threadGroupId]; #ifdef EXT_TX_PROC_ENABLED - threadGroup.setTxServiceFunctors(threadGroupId); + threadGroup.setTxServiceFunctors(); MONGO_LOG(0) << "threadGroup._updateExtProc(1)"; threadGroup._updateExtProc(1); #endif - std::array taskBulk; - moodycamel::ConsumerToken taskToken(threadGroup._taskQueue); - moodycamel::ConsumerToken resumeToken(threadGroup._resumeQueue); + + auto& taskBulk = threadGroup._taskBulk; size_t idleCnt = 0; std::chrono::steady_clock::time_point idleStartTime; @@ -184,21 +233,17 @@ Status ServiceExecutorCoroutine::_startWorker(int16_t groupId) { size_t cnt = 0; // process resume task - if (threadGroup._resumeQueueSize.load(std::memory_order_relaxed) > 0) { - cnt = threadGroup._resumeQueue.try_dequeue_bulk( - resumeToken, taskBulk.begin(), taskBulk.size()); - threadGroup._resumeQueueSize.fetch_sub(cnt); - for (size_t i = 0; i < cnt; ++i) { - // setThreadName(threadNameSD); - taskBulk[i](); - } + cnt = threadGroup._resumeQueue.TryDequeueBulk(std::make_move_iterator(taskBulk.begin()), + taskBulk.size()); + for (size_t i = 0; i < cnt; ++i) { + // setThreadName(threadNameSD); + taskBulk[i](); } // process normal task - if (cnt == 0 && threadGroup._taskQueueSize.load(std::memory_order_relaxed) > 0) { - cnt = threadGroup._taskQueue.try_dequeue_bulk( - taskToken, taskBulk.begin(), taskBulk.size()); - threadGroup._taskQueueSize.fetch_sub(cnt); + if (cnt < taskBulk.size()) { + cnt = threadGroup._taskQueue.TryDequeueBulk( + std::make_move_iterator(taskBulk.begin()), taskBulk.size()); for (size_t i = 0; i < cnt; ++i) { // setThreadName(threadNameSD); taskBulk[i](); @@ -231,31 +276,20 @@ Status ServiceExecutorCoroutine::_startWorker(int16_t groupId) { LOG(0) << "Exiting worker thread in " << _name << " service executor"; }); } - +#endif Status ServiceExecutorCoroutine::shutdown(Milliseconds timeout) { - LOG(0) << "Shutting down coroutine executor"; - - // stdx::unique_lock lock(_mutex); - _stillRunning.store(false, std::memory_order_relaxed); - // _threadWakeup.notify_all(); - // if (_backgroundTimeService.joinable()) { - // _backgroundTimeService.join(); - // } - + MONGO_LOG(0) << "Shutting down coroutine executor"; + _stillRunning.store(false, std::memory_order_release); +#ifndef ELOQ_MODULE_ENABLED for (ThreadGroup& thd_group : _threadGroups) { thd_group.terminate(); } - - // bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() { - // return _numRunningWorkerThreads.load() == 0; - // }); - +#else + int rc = eloq::unregister_module(MongoModule::Instance()); + invariant(rc == 0); +#endif return Status::OK(); - // return result - // ? Status::OK() - // : Status(ErrorCodes::Error::ExceededTimeLimit, - // "coroutine executor couldn't shutdown all worker threads within time limit."); } Status ServiceExecutorCoroutine::schedule(Task task, @@ -273,33 +307,6 @@ Status ServiceExecutorCoroutine::schedule(Task task, return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; } - // if (!_localWorkQueue.empty()) { - // MONGO_LOG(0) << "here?"; - // /* - // * In perf testing we found that yielding after running a each request produced - // * at 5% performance boost in microbenchmarks if the number of worker threads - // * was greater than the number of available cores. - // */ - // if (flags & ScheduleFlags::kMayYieldBeforeSchedule) { - // if ((_localThreadIdleCounter++ & 0xf) == 0) { - // markThreadIdle(); - // } - // } - - // // Execute task directly (recurse) if allowed by the caller as it produced better - // // performance in testing. Try to limit the amount of recursion so we don't blow up the - // // stack, even though this shouldn't happen with this executor that uses blocking network - // // I/O. - // if ((flags & ScheduleFlags::kMayRecurse) && - // (_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) { - // ++_localRecursionDepth; - // task(); - // } else { - // _localWorkQueue.emplace_back(std::move(task)); - // } - // return Status::OK(); - // } - _threadGroups[threadGroupId].enqueueTask(std::move(task)); return Status::OK(); @@ -329,6 +336,5 @@ void ServiceExecutorCoroutine::appendStats(BSONObjBuilder* bob) const { // << static_cast(_numReadyThreads) << kStartingThreads // << static_cast(_numStartingThreads); } - } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_executor_coroutine.h b/src/mongo/transport/service_executor_coroutine.h index 83a43c04eb..0713788aa0 100644 --- a/src/mongo/transport/service_executor_coroutine.h +++ b/src/mongo/transport/service_executor_coroutine.h @@ -1,53 +1,108 @@ #pragma once +#include #include #include #include #include #include "mongo/base/status.h" +#include "mongo/db/modules/eloq/tx_service/include/concurrent_queue_wsize.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/service_executor_task_names.h" +#ifdef ELOQ_MODULE_ENABLED +#include +#endif #include namespace mongo::transport { +class ThreadGroup; + +#ifdef ELOQ_MODULE_ENABLED +class MongoModule final : public eloq::EloqModule { +public: + static MongoModule* Instance() { + static MongoModule mongoModule; + return &mongoModule; + } + + void Init(ThreadGroup* threadGroups) { + _threadGroups = threadGroups; + } + + void ExtThdStart(int thd_id) override; + + void ExtThdEnd(int thd_id) override; + + void Process(int thd_id) override; + + bool HasTask(int thd_id) const override; + +private: + MongoModule() = default; + +private: + ThreadGroup* _threadGroups{nullptr}; +}; +#endif + class ThreadGroup { friend class ServiceExecutorCoroutine; +#ifdef ELOQ_MODULE_ENABLED + friend class MongoModule; +#endif using Task = std::function; public: + ThreadGroup() = default; + void enqueueTask(Task task); void resumeTask(Task task); void notifyIfAsleep(); +#ifndef ELOQ_MODULE_ENABLED /** * @brief Called by the thread bound to this thread group. */ void trySleep(); void terminate(); +#endif + + void setTxServiceFunctors(); - void setTxServiceFunctors(int16_t id); + void setThreadGroupID(int16_t id) { + _threadGroupId = id; + } private: bool isBusy() const; - // uint16_t id; +private: + int16_t _threadGroupId{-1}; + +#ifdef ELOQ_MODULE_ENABLED + bool _threadNameSet{false}; + std::atomic _extWorkerActive{false}; +#endif - moodycamel::ConcurrentQueue _taskQueue; - std::atomic _taskQueueSize{0}; - moodycamel::ConcurrentQueue _resumeQueue; - std::atomic _resumeQueueSize{0}; + constexpr static size_t kTaskBatchSize{100}; + std::array _taskBulk; + txservice::ConcurrentQueueWSize _taskQueue; + txservice::ConcurrentQueueWSize _resumeQueue; + +#ifndef ELOQ_MODULE_ENABLED std::atomic _isSleep{false}; std::mutex _sleepMutex; std::condition_variable _sleepCV; +#endif std::atomic _isTerminated{false}; uint16_t _ongoingCoroutineCnt{0}; @@ -93,7 +148,9 @@ class ServiceExecutorCoroutine final : public ServiceExecutor { void appendStats(BSONObjBuilder* bob) const override; private: +#ifndef ELOQ_MODULE_ENABLED Status _startWorker(int16_t groupId); +#endif // static thread_local std::deque _localWorkQueue; // static thread_local int _localRecursionDepth; @@ -113,7 +170,6 @@ class ServiceExecutorCoroutine final : public ServiceExecutor { // std::thread _backgroundTimeService; constexpr static std::string_view _name{"coroutine"}; - constexpr static size_t kTaskBatchSize{100}; constexpr static uint32_t kIdleCycle = (1 << 10) - 1; // 2^n-1 constexpr static uint32_t kIdleTimeoutMs = 1000; }; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 69efb3b855..eaa2720af4 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -259,6 +259,7 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, _dbClient{ svcContext->makeClient(_threadName, std::move(session), makeCoroutineFunctors(*this))}, _dbClientPtr{_dbClient.get()}, + _osPageSize{static_cast(::getpagesize())}, _threadGroupId(groupId) { MONGO_LOG(1) << "ServiceStateMachine::ServiceStateMachine"; _coroStack = (char*)::mmap( @@ -268,7 +269,7 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, std::abort(); } - if (::mprotect(_coroStack, getpagesize(), PROT_NONE) != 0) { + if (::mprotect(_coroStack, _osPageSize, PROT_NONE) != 0) { error() << "mprotect coroutine stack failed, " << ::strerror(errno); std::abort(); } @@ -567,15 +568,13 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) { _coroMigrateThreadGroup = std::bind( &ServiceStateMachine::_migrateThreadGroup, this, std::placeholders::_1); - std::weak_ptr wssm = weak_from_this(); - boost::context::stack_context sc = _coroStackContext(); boost::context::preallocated prealloc(sc.sp, sc.size, sc); _source = boost::context::callcc( std::allocator_arg, prealloc, NoopAllocator(), - [wssm, &guard](boost::context::continuation&& sink) { + [wssm = weak_from_this(), &guard](boost::context::continuation&& sink) { auto ssm = wssm.lock(); if (!ssm) { return std::move(sink); @@ -734,8 +733,7 @@ void ServiceStateMachine::setThreadGroupId(size_t id) { boost::context::stack_context ServiceStateMachine::_coroStackContext() { boost::context::stack_context sc; - const auto pageSize = static_cast(::getpagesize()); - sc.size = kCoroStackSize - pageSize; + sc.size = kCoroStackSize - _osPageSize; // Because stack grows downwards from high address? sc.sp = _coroStack + kCoroStackSize; return sc; diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index ac27212e1f..e68db8f705 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -282,8 +282,8 @@ class ServiceStateMachine : public std::enable_shared_from_this 15) { - std::string shortName = str::stream() << threadName.substr(0, 7) << '.' - << threadName.substr(threadName.size() - 7); + std::string shortName = str::stream() + << threadName.substr(0, 7) << '.' << threadName.substr(threadName.size() - 7); error = pthread_setname_np(pthread_self(), shortName.c_str()); } else { error = pthread_setname_np(pthread_self(), threadName.rawData());