From d4276f75a13e8cb50c199ff027599a0406768206 Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Wed, 25 Feb 2026 23:00:57 +0000 Subject: [PATCH 01/10] Core: Add ASIO-based Scheduler --- src/common/application.cpp | 17 +- src/common/application.h | 10 +- src/common/scheduler.h | 295 ++++++++++++++++++++++++++++++ src/common/utils.h | 7 + src/login/connect_application.cpp | 4 +- src/login/connect_engine.cpp | 2 +- src/login/connect_engine.h | 5 +- src/map/map_application.cpp | 4 +- src/search/search_application.cpp | 4 +- src/test/test_application.cpp | 2 +- src/world/world_application.cpp | 4 +- 11 files changed, 331 insertions(+), 23 deletions(-) create mode 100644 src/common/scheduler.h diff --git a/src/common/application.cpp b/src/common/application.cpp index d5cd54dd3dc..6755cc9a608 100644 --- a/src/common/application.cpp +++ b/src/common/application.cpp @@ -1,4 +1,4 @@ -/* +/* =========================================================================== Copyright (c) 2022 LandSandBoat Dev Teams @@ -39,6 +39,7 @@ #endif #include +#include namespace { @@ -84,7 +85,8 @@ unsigned long prevQuickEditMode; } // namespace Application::Application(const ApplicationConfig& appConfig, int argc, char** argv) -: signals_(io_context_) +: scheduler_() +, signals_(scheduler_.ioContext()) , serverName_(appConfig.serverName) , args_(std::make_unique(appConfig, argc, argv)) { @@ -270,8 +272,8 @@ void Application::run() try { - // NOTE: io_context_.run() takes over and blocks this thread. Anything after this point will only fire - // if io_context_ finishes! + // NOTE: ioContext_.run() takes over and blocks this thread. Anything after this point will only fire + // if ioContext_ finishes! // // This busy loop looks nasty, however -- // https://think-async.com/Asio/asio-1.24.0/doc/asio/reference/io_service.html @@ -284,12 +286,11 @@ void Application::run() { try { - io_context_.run(); + scheduler_.run(); break; } catch (std::exception& e) { - // TODO: make a list of "allowed exceptions", the rest can/should cause shutdown. ShowErrorFmt("Inner fatal: {}", e.what()); } } @@ -300,9 +301,9 @@ void Application::run() } } -auto Application::ioContext() -> asio::io_context& +auto Application::scheduler() -> Scheduler& { - return io_context_; + return scheduler_; } auto Application::args() const -> Arguments& diff --git a/src/common/application.h b/src/common/application.h index 0ce00428515..64bb77888a6 100644 --- a/src/common/application.h +++ b/src/common/application.h @@ -1,4 +1,4 @@ -/* +/* =========================================================================== Copyright (c) 2022 LandSandBoat Dev Teams @@ -23,7 +23,9 @@ #include "arguments.h" #include "common/engine.h" -#include +#include "common/scheduler.h" + +#include // for signal_set #include #include @@ -93,12 +95,12 @@ class Application // Member accessors // - auto ioContext() -> asio::io_context&; + auto scheduler() -> Scheduler&; auto args() const -> Arguments&; auto console() const -> ConsoleService&; protected: - asio::io_context io_context_; + Scheduler scheduler_; asio::signal_set signals_; std::string serverName_; diff --git a/src/common/scheduler.h b/src/common/scheduler.h new file mode 100644 index 00000000000..1162d8f9e0b --- /dev/null +++ b/src/common/scheduler.h @@ -0,0 +1,295 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef TRACY_ENABLE +#include +#endif + +namespace detail +{ + +template +struct IsASIOAwaitable : std::false_type +{ +}; + +template +struct IsASIOAwaitable> : std::true_type +{ +}; + +template +concept IsAwaitable = IsASIOAwaitable>::value; + +template +struct AwaitableResult; + +template +struct AwaitableResult> +{ + using type = T; +}; + +} // namespace detail + +template +using Task = asio::awaitable; + +// +// Combinators +// + +// +// All +// Await multiple tasks in parallel and return their results as a tuple. +// +template + requires(sizeof...(Tasks) > 0) +auto All(Tasks&&... tasks) +{ + using namespace asio::experimental::awaitable_operators; + return (std::forward(tasks) && ...); +} + +// +// One +// Race multiple tasks and return a variant of the winner. +// +template + requires(sizeof...(Tasks) > 0) +auto One(Tasks&&... tasks) +{ + using namespace asio::experimental::awaitable_operators; + return (std::forward(tasks) || ...); +} + +// +// Scheduler +// +class Scheduler final +{ +public: + Scheduler(std::size_t numThreads = std::max(1U, std::thread::hardware_concurrency() - 1U)) + : mainContext_() + , workerContext_() + , mainGuard_(asio::make_work_guard(mainContext_)) + , workerGuard_(asio::make_work_guard(workerContext_)) + { +#ifdef TRACY_ENABLE + tracy::SetThreadName("Main Thread"); +#endif + + workerThreads_.reserve(numThreads); + + for (size_t i = 0; i < numThreads; ++i) + { + workerThreads_.emplace_back( + [this, i] + { +#ifdef TRACY_ENABLE + const auto threadName = std::format("Worker Thread {}", i + 1); + tracy::SetThreadName(threadName.c_str()); +#else + std::ignore = i; +#endif + // Try and do work, but if an exception is encountered capture it and post it back + // to the main thread. + try + { + workerContext_.run(); + } + catch (...) + { + asio::post( + mainContext_, + [ex = std::current_exception()] + { + std::rethrow_exception(ex); + }); + } + }); + } + } + + ~Scheduler() + { + stop(); + for (auto& t : workerThreads_) + { + if (t.joinable()) + { + t.join(); + } + } + } + + Scheduler(const Scheduler&) = delete; + Scheduler& operator=(const Scheduler&) = delete; + Scheduler(Scheduler&&) = delete; + Scheduler& operator=(Scheduler&&) = delete; + + void run() + { + isRunning_ = true; + + try + { + mainContext_.run(); // block thread + } + catch (...) + { + stop(); + throw; // Throw exception back up to user + } + + // Main loop finished. Allow workers to finish their tasks and exit. + workerGuard_.reset(); + for (auto& t : workerThreads_) + { + if (t.joinable()) + { + t.join(); + } + } + isRunning_ = false; + } + + void stop() + { + isRunning_ = false; + mainGuard_.reset(); + workerGuard_.reset(); + + mainContext_.stop(); + workerContext_.stop(); + } + + [[nodiscard]] auto isRunning() const -> bool + { + return isRunning_; + } + + // TODO: + // Variants of onMainThread/onWorkerThread/postToMainThread/postToWorkerThread that take + // std::invocable (lambdas, std::bind, etc.) + + // onMainThread + // Queue work lazily on the main thread. It won't start executing until you co_await the + // returned task. + template + [[nodiscard]] auto onMainThread(T&& task) -> Task>::type> + { + return asio::co_spawn(mainContext_.get_executor(), std::forward(task), asio::use_awaitable); + } + + // onWorkerThread + // Queue work lazily on the worker thread pool. It won't start executing until you co_await + // the returned task. + template + [[nodiscard]] auto onWorkerThread(T&& task) -> Task>::type> + { + return asio::co_spawn(workerContext_.get_executor(), std::forward(task), asio::use_awaitable); + } + + // postToMainThread + // Queue work eagerly on the main thread. It will start executing immediately. + template + void postToMainThread(T&& task) + { + asio::co_spawn(mainContext_.get_executor(), std::forward(task), asio::detached); + } + + // postToWorkerThread + // Queue work eagerly on the worker thread pool. It will start executing immediately. + template + void postToWorkerThread(T&& task) + { + asio::co_spawn(workerContext_.get_executor(), std::forward(task), asio::detached); + } + + // yield + // co_await on this to hand control back to the scheduler. + [[nodiscard]] auto yield() -> Task + { + auto executor = co_await asio::this_coro::executor; + co_await asio::post(executor, asio::use_awaitable); + } + + // yieldFor + // co_await on this to hand control back to the scheduler, without re-scheduling until the + // duration has elapsed. + [[nodiscard]] auto yieldFor(std::chrono::steady_clock::duration duration) -> Task + { + auto executor = co_await asio::this_coro::executor; + auto timer = asio::steady_timer(executor); + timer.expires_after(duration); + co_await timer.async_wait(asio::use_awaitable); + } + + // ioContext + // Return the main io_context. + // TODO: We should be trying to get rid of this accessor as soon as possible! + [[nodiscard]] auto ioContext() -> asio::io_context& + { + return mainContext_; + } + +private: + std::atomic isRunning_{ false }; + asio::io_context mainContext_; + asio::io_context workerContext_; + std::vector workerThreads_; + + asio::executor_work_guard mainGuard_; + asio::executor_work_guard workerGuard_; +}; diff --git a/src/common/utils.h b/src/common/utils.h index 874ddcb07bc..2bcd54b8b5f 100644 --- a/src/common/utils.h +++ b/src/common/utils.h @@ -173,6 +173,13 @@ bool definitelyLessThan(float a, float b); void crash(); +inline auto thread_id_to_string(std::thread::id id) -> std::string +{ + std::ostringstream oss; + oss << id; + return oss.str(); +} + template std::set sorted_directory_iterator(std::string path_name) { diff --git a/src/login/connect_application.cpp b/src/login/connect_application.cpp index 32da60aa633..8ca9ebd90e0 100644 --- a/src/login/connect_application.cpp +++ b/src/login/connect_application.cpp @@ -48,7 +48,7 @@ ConnectApplication::~ConnectApplication() = default; auto ConnectApplication::createEngine() -> std::unique_ptr { certificateHelpers::generateSelfSignedCert(); - return std::make_unique(ioContext()); + return std::make_unique(scheduler_.ioContext()); } void ConnectApplication::registerCommands(ConsoleService& console) @@ -73,5 +73,5 @@ void ConnectApplication::registerCommands(ConsoleService& console) void ConnectApplication::requestExit() { Application::requestExit(); - io_context_.stop(); + scheduler_.stop(); } diff --git a/src/login/connect_engine.cpp b/src/login/connect_engine.cpp index eb5fc609122..5d58152a018 100644 --- a/src/login/connect_engine.cpp +++ b/src/login/connect_engine.cpp @@ -57,7 +57,7 @@ ConnectEngine::ConnectEngine(asio::io_context& io_context) ConnectEngine::~ConnectEngine() { m_sessionCleanupTimer.cancel(); -}; +} void ConnectEngine::periodicCleanup(const asio::error_code& error) { diff --git a/src/login/connect_engine.h b/src/login/connect_engine.h index 573f82ccc4b..f6141e928ee 100644 --- a/src/login/connect_engine.h +++ b/src/login/connect_engine.h @@ -47,7 +47,10 @@ class ConnectEngine final : public Engine void periodicCleanup(const asio::error_code& error); private: - ZMQDealerWrapper zmqDealerWrapper_; + Scheduler& scheduler_; + + ZMQDealerWrapper zmqDealerWrapper_; + handler m_authHandler; handler m_dataHandler; handler m_viewHandler; diff --git a/src/map/map_application.cpp b/src/map/map_application.cpp index 600d855a9ac..5a49cec16fa 100644 --- a/src/map/map_application.cpp +++ b/src/map/map_application.cpp @@ -88,7 +88,7 @@ MapApplication::~MapApplication() auto MapApplication::createEngine() -> std::unique_ptr { - return std::make_unique(ioContext(), engineConfig_); + return std::make_unique(scheduler_.ioContext(), engineConfig_); } void MapApplication::registerCommands(ConsoleService& console) @@ -122,7 +122,7 @@ void MapApplication::run() // MapEngine destructor must occur before Application destructor engine_.reset(); - io_context_.stop(); + scheduler_.stop(); const auto taskManager = CTaskManager::getInstance(); while (!taskManager->getTaskList().empty()) diff --git a/src/search/search_application.cpp b/src/search/search_application.cpp index e26598402f6..3bfa420243b 100644 --- a/src/search/search_application.cpp +++ b/src/search/search_application.cpp @@ -46,7 +46,7 @@ SearchApplication::~SearchApplication() = default; auto SearchApplication::createEngine() -> std::unique_ptr { - return std::make_unique(ioContext()); + return std::make_unique(scheduler_.ioContext()); } void SearchApplication::registerCommands(ConsoleService& console) @@ -66,5 +66,5 @@ void SearchApplication::registerCommands(ConsoleService& console) void SearchApplication::requestExit() { Application::requestExit(); - io_context_.stop(); + scheduler_.stop(); } diff --git a/src/test/test_application.cpp b/src/test/test_application.cpp index 4501ca4c9fb..1f60822b44f 100644 --- a/src/test/test_application.cpp +++ b/src/test/test_application.cpp @@ -113,7 +113,7 @@ auto TestApplication::createEngine() -> std::unique_ptr }, }; - return std::make_unique(ioContext(), config); + return std::make_unique(scheduler_.ioContext(), config); } void TestApplication::run() diff --git a/src/world/world_application.cpp b/src/world/world_application.cpp index b40bcea074b..cf9b98526f0 100644 --- a/src/world/world_application.cpp +++ b/src/world/world_application.cpp @@ -49,11 +49,11 @@ WorldApplication::~WorldApplication() = default; auto WorldApplication::createEngine() -> std::unique_ptr { - return std::make_unique(ioContext()); + return std::make_unique(scheduler_.ioContext()); } void WorldApplication::requestExit() { Application::requestExit(); - io_context_.stop(); + scheduler_.stop(); } From 94434779eef2b8c5be610538696bc700262cefad Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Wed, 25 Feb 2026 23:08:42 +0000 Subject: [PATCH 02/10] Core: Make ConnectEngine::periodicCleanup() a coroutine --- src/login/connect_application.cpp | 2 +- src/login/connect_engine.cpp | 24 +++++++++++------------- src/login/connect_engine.h | 5 ++--- src/login/handler.h | 4 ++-- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/src/login/connect_application.cpp b/src/login/connect_application.cpp index 8ca9ebd90e0..2c6e049dc67 100644 --- a/src/login/connect_application.cpp +++ b/src/login/connect_application.cpp @@ -48,7 +48,7 @@ ConnectApplication::~ConnectApplication() = default; auto ConnectApplication::createEngine() -> std::unique_ptr { certificateHelpers::generateSelfSignedCert(); - return std::make_unique(scheduler_.ioContext()); + return std::make_unique(scheduler_); } void ConnectApplication::registerCommands(ConsoleService& console) diff --git a/src/login/connect_engine.cpp b/src/login/connect_engine.cpp index 5d58152a018..2b5d626cc0b 100644 --- a/src/login/connect_engine.cpp +++ b/src/login/connect_engine.cpp @@ -44,25 +44,26 @@ constexpr auto kSessionCleanTime = 15min; } // namespace -ConnectEngine::ConnectEngine(asio::io_context& io_context) -: zmqDealerWrapper_(getZMQEndpointString(), getZMQRoutingId()) -, m_authHandler(io_context, settings::get("network.LOGIN_AUTH_PORT"), zmqDealerWrapper_) -, m_dataHandler(io_context, settings::get("network.LOGIN_DATA_PORT"), zmqDealerWrapper_) -, m_viewHandler(io_context, settings::get("network.LOGIN_VIEW_PORT"), zmqDealerWrapper_) -, m_sessionCleanupTimer(io_context, kSessionCleanTime) +ConnectEngine::ConnectEngine(Scheduler& scheduler) +: scheduler_(scheduler) +, zmqDealerWrapper_(getZMQEndpointString(), getZMQRoutingId()) +, m_authHandler(scheduler_, settings::get("network.LOGIN_AUTH_PORT"), zmqDealerWrapper_) +, m_dataHandler(scheduler_, settings::get("network.LOGIN_DATA_PORT"), zmqDealerWrapper_) +, m_viewHandler(scheduler_, settings::get("network.LOGIN_VIEW_PORT"), zmqDealerWrapper_) { - m_sessionCleanupTimer.async_wait(std::bind(&ConnectEngine::periodicCleanup, this, std::placeholders::_1)); + scheduler.postToMainThread(periodicCleanup()); } ConnectEngine::~ConnectEngine() { - m_sessionCleanupTimer.cancel(); } -void ConnectEngine::periodicCleanup(const asio::error_code& error) +auto ConnectEngine::periodicCleanup() -> Task { - if (!error) + while (scheduler_.isRunning()) { + co_await scheduler_.yieldFor(kSessionCleanTime); + auto& sessions = loginHelpers::getAuthenticatedSessions(); auto ipAddrIterator = sessions.begin(); while (ipAddrIterator != sessions.end()) @@ -95,8 +96,5 @@ void ConnectEngine::periodicCleanup(const asio::error_code& error) ++ipAddrIterator; } } - - m_sessionCleanupTimer.expires_at(m_sessionCleanupTimer.expiry() + kSessionCleanTime); - m_sessionCleanupTimer.async_wait(std::bind(&ConnectEngine::periodicCleanup, this, std::placeholders::_1)); } } diff --git a/src/login/connect_engine.h b/src/login/connect_engine.h index f6141e928ee..1cdffbf68aa 100644 --- a/src/login/connect_engine.h +++ b/src/login/connect_engine.h @@ -38,13 +38,13 @@ class ConnectEngine final : public Engine { public: - ConnectEngine(asio::io_context& io_context); + ConnectEngine(Scheduler& scheduler); ~ConnectEngine() override; // This cleanup function is to periodically poll for auth sessions that were successful but xiloader failed to actually launch FFXI // When this happens, the data/view socket are never opened and will never be cleaned up normally. // Auth is closed before any other sessions are open, so the data/view cleanups aren't sufficient - void periodicCleanup(const asio::error_code& error); + auto periodicCleanup() -> Task; private: Scheduler& scheduler_; @@ -54,5 +54,4 @@ class ConnectEngine final : public Engine handler m_authHandler; handler m_dataHandler; handler m_viewHandler; - asio::steady_timer m_sessionCleanupTimer; }; diff --git a/src/login/handler.h b/src/login/handler.h index 035f876c189..b0dc6b5603c 100644 --- a/src/login/handler.h +++ b/src/login/handler.h @@ -35,8 +35,8 @@ template class handler { public: - handler(asio::io_context& io_context, unsigned int port, ZMQDealerWrapper& zmqDealerWrapper) - : acceptor_(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) + handler(Scheduler& scheduler, unsigned int port, ZMQDealerWrapper& zmqDealerWrapper) + : acceptor_(scheduler.ioContext(), asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) , sslContext_(asio::ssl::context::tls_server) , zmqDealerWrapper_(zmqDealerWrapper) { From e3bc0cc44fb6ace776551bda84a0ae3e0b0c81ca Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Thu, 26 Feb 2026 00:40:10 +0000 Subject: [PATCH 03/10] Core: Make ConsoleService use new Scheduler --- src/common/console_service.cpp | 136 +++++++++++++++------------ src/common/console_service.h | 9 +- src/common/scheduler.h | 163 ++++++++++++++++++++++----------- src/login/connect_engine.cpp | 2 +- 4 files changed, 191 insertions(+), 119 deletions(-) diff --git a/src/common/console_service.cpp b/src/common/console_service.cpp index d9406392cb6..5676a5a9de1 100644 --- a/src/common/console_service.cpp +++ b/src/common/console_service.cpp @@ -108,23 +108,16 @@ bool getLine(std::string& line) ConsoleService::ConsoleService(Application& application) : application_(application) -, m_consoleThreadRun(true) { registerDefaultCommands(); - if (application_.isRunningInCI()) + if (!application_.isRunningInCI()) { - return; + run(); } - - run(); } -ConsoleService::~ConsoleService() -{ - m_consoleThreadRun = false; - m_consoleStopCondition.notify_all(); -} +ConsoleService::~ConsoleService() = default; // NOTE: If you capture things in this function, make sure they're protected (locked or atomic)! // NOTE: If you're going to print, use fmt::print, rather than ShowInfo etc. @@ -192,23 +185,30 @@ void ConsoleService::registerDefaultCommands() auto input = fmt::format("local var = {}; if type(var) ~= \"nil\" then print(var) end", fmt::join(inputs, " ")); - // TODO: Make sure to execute on the main thread lua.safe_script(input); } }); registerCommand( - "crash", "Crash the process", [](std::vector& inputs) + "crash", "Crash the process (main thread)", [](std::vector& inputs) { - // TODO: Make sure to execute on the main thread crash(); }); + registerCommand( + "crash_worker", "Crash the process (worker thread)", [&](std::vector& inputs) + { + application_.scheduler().postToWorkerThread( + [] + { + crash(); + }); + }); + registerCommand( "throw", "Throw an exception", [](std::vector& inputs) { - // TODO: Make sure to execute on the main thread - throw std::runtime_error("Exception thrown from console command"); + throw std::runtime_error("Exception thrown from console command (on main thread)"); }); registerCommand( @@ -221,62 +221,80 @@ void ConsoleService::registerDefaultCommands() void ConsoleService::run() { - bool attached = isatty(0); - if (attached) + if (isatty(0)) { - m_consoleInputThread = std::jthread( - [&]() - { - std::string line; + application_.scheduler().postToWorkerThread(consoleLoop()); + } +} - const auto predicate = [&] - { - return !m_consoleThreadRun; - }; +auto ConsoleService::consoleLoop() -> Task +{ + auto& scheduler = application_.scheduler(); - while (!predicate()) - { - std::unique_lock lock(m_consoleInputBottleneck); + std::string line; - // https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for - if (!m_consoleStopCondition.wait_for(lock, 50ms, predicate)) - { - if (!getLine(line)) - { - continue; - } + while (!scheduler.closeRequested()) + { + // If there is data, process as much as possible before yielding. + // This removes the 50ms delay between every single keystroke. + bool hasLine = false; + while (stdinHasData()) + { + if (getLine(line)) + { + hasLine = true; + break; // We have a full command to process + } + } - std::istringstream stream(line); - std::string input; + if (hasLine) + { + std::istringstream stream(line); + std::string part; + std::vector inputs; - std::vector inputs; - while (stream >> input) - { - for (auto& part : split(input)) - { - inputs.emplace_back(part); - } - } + while (stream >> part) + { + for (auto& s : split(part)) + { + inputs.emplace_back(s); + } + } - if (!inputs.empty()) - { - TracyZoneScoped; + if (!inputs.empty()) + { + TracyZoneScoped; - auto entry = m_commands.find(inputs[0]); - if (entry != m_commands.end()) + auto it = m_commands.find(inputs[0]); + if (it != m_commands.end()) + { + // Dispatch to main thread + scheduler.postToMainThread( + [cmd = it->second.func, args = std::move(inputs)]() mutable + { + try { - // TODO: Execute this on the main thread, not the worker thread - entry->second.func(inputs); + cmd(args); } - else + catch (const std::exception& e) { - fmt::print(fmt::runtime("> Unknown command: {}\n"), inputs[0]); + fmt::print(stderr, "> Command error: {}\n", e.what()); } - } - - line = std::string(); - } + }); + } + else + { + fmt::print("> Unknown command: {}\n", inputs[0]); } - }); + } + line.clear(); + + // Immediately check for more input without yielding + // if we just processed a line. + continue; + } + + // Yield only when there is no data to process. + co_await scheduler.yieldFor(100ms); } } diff --git a/src/common/console_service.h b/src/common/console_service.h index d9ec32aa313..1177c873f49 100644 --- a/src/common/console_service.h +++ b/src/common/console_service.h @@ -30,6 +30,8 @@ #include #include +#include + class Application; class ConsoleService final @@ -55,13 +57,10 @@ class ConsoleService final private: void registerDefaultCommands(); void run(); + auto consoleLoop() -> Task; Application& application_; - std::mutex m_consoleInputBottleneck; - std::atomic m_consoleThreadRun; - std::jthread m_consoleInputThread; - std::condition_variable m_consoleStopCondition; - + std::mutex m_consoleInputBottleneck; std::unordered_map m_commands; }; diff --git a/src/common/scheduler.h b/src/common/scheduler.h index 1162d8f9e0b..1efe2830cd1 100644 --- a/src/common/scheduler.h +++ b/src/common/scheduler.h @@ -80,6 +80,9 @@ struct AwaitableResult> using type = T; }; +template +concept IsInvocable = std::invocable>; + } // namespace detail template @@ -130,47 +133,15 @@ class Scheduler final #endif workerThreads_.reserve(numThreads); - - for (size_t i = 0; i < numThreads; ++i) + for (size_t idx = 0; idx < numThreads; ++idx) { - workerThreads_.emplace_back( - [this, i] - { -#ifdef TRACY_ENABLE - const auto threadName = std::format("Worker Thread {}", i + 1); - tracy::SetThreadName(threadName.c_str()); -#else - std::ignore = i; -#endif - // Try and do work, but if an exception is encountered capture it and post it back - // to the main thread. - try - { - workerContext_.run(); - } - catch (...) - { - asio::post( - mainContext_, - [ex = std::current_exception()] - { - std::rethrow_exception(ex); - }); - } - }); + workerThreads_.emplace_back(&Scheduler::workerLoop, this, idx); } } ~Scheduler() { stop(); - for (auto& t : workerThreads_) - { - if (t.joinable()) - { - t.join(); - } - } } Scheduler(const Scheduler&) = delete; @@ -180,8 +151,6 @@ class Scheduler final void run() { - isRunning_ = true; - try { mainContext_.run(); // block thread @@ -192,37 +161,61 @@ class Scheduler final throw; // Throw exception back up to user } - // Main loop finished. Allow workers to finish their tasks and exit. - workerGuard_.reset(); - for (auto& t : workerThreads_) + // If we break out of context::run(), begin shutting down + stop(); + } + + void workerLoop(std::size_t index) + { +#ifdef TRACY_ENABLE + const auto threadName = std::format("Worker Thread {}", index + 1); + tracy::SetThreadName(threadName.c_str()); +#else + std::ignore = index; +#endif + // Try and do work, but if an exception is encountered capture it and post it back + // to the main thread. + try { - if (t.joinable()) - { - t.join(); - } + workerContext_.run(); + } + catch (...) + { + asio::post( + mainContext_, + [ex = std::current_exception()] + { + std::rethrow_exception(ex); + }); } - isRunning_ = false; } void stop() { - isRunning_ = false; + bool expected = false; + if (!closeRequested_.compare_exchange_strong(expected, true)) + { + return; + } + mainGuard_.reset(); workerGuard_.reset(); - mainContext_.stop(); - workerContext_.stop(); + for (auto& t : workerThreads_) + { + if (t.joinable()) + { + t.join(); + } + } + workerThreads_.clear(); } - [[nodiscard]] auto isRunning() const -> bool + [[nodiscard]] auto closeRequested() const -> bool { - return isRunning_; + return closeRequested_; } - // TODO: - // Variants of onMainThread/onWorkerThread/postToMainThread/postToWorkerThread that take - // std::invocable (lambdas, std::bind, etc.) - // onMainThread // Queue work lazily on the main thread. It won't start executing until you co_await the // returned task. @@ -232,6 +225,22 @@ class Scheduler final return asio::co_spawn(mainContext_.get_executor(), std::forward(task), asio::use_awaitable); } + // onMainThread + // Queue work lazily on the main thread. It won't start executing until you co_await the + // returned task. + template + [[nodiscard]] auto onMainThread(F&& func) -> Task>::type> + { + return asio::co_spawn( + mainContext_.get_executor(), + [fn = std::forward(func)]() mutable -> Task + { + fn(); + co_return; + }, + asio::use_awaitable); + } + // onWorkerThread // Queue work lazily on the worker thread pool. It won't start executing until you co_await // the returned task. @@ -241,6 +250,22 @@ class Scheduler final return asio::co_spawn(workerContext_.get_executor(), std::forward(task), asio::use_awaitable); } + // onWorkerThread + // Queue work lazily on the worker thread pool. It won't start executing until you co_await + // the returned task. + template + [[nodiscard]] auto onWorkerThread(F&& func) -> Task>::type> + { + return asio::co_spawn( + workerContext_.get_executor(), + [fn = std::forward(func)]() mutable -> Task + { + fn(); + co_return; + }, + asio::use_awaitable); + } + // postToMainThread // Queue work eagerly on the main thread. It will start executing immediately. template @@ -249,6 +274,21 @@ class Scheduler final asio::co_spawn(mainContext_.get_executor(), std::forward(task), asio::detached); } + // postToMainThread + // Queue work eagerly on the main thread. It will start executing immediately. + template + void postToMainThread(F&& func) + { + asio::co_spawn( + mainContext_.get_executor(), + [fn = std::forward(func)]() mutable -> Task + { + fn(); + co_return; + }, + asio::detached); + } + // postToWorkerThread // Queue work eagerly on the worker thread pool. It will start executing immediately. template @@ -257,6 +297,21 @@ class Scheduler final asio::co_spawn(workerContext_.get_executor(), std::forward(task), asio::detached); } + // postToWorkerThread + // Queue work eagerly on the worker thread pool. It will start executing immediately. + template + void postToWorkerThread(F&& func) + { + asio::co_spawn( + workerContext_.get_executor(), + [fn = std::forward(func)]() mutable -> Task + { + fn(); + co_return; + }, + asio::detached); + } + // yield // co_await on this to hand control back to the scheduler. [[nodiscard]] auto yield() -> Task @@ -285,7 +340,7 @@ class Scheduler final } private: - std::atomic isRunning_{ false }; + std::atomic closeRequested_{ false }; asio::io_context mainContext_; asio::io_context workerContext_; std::vector workerThreads_; diff --git a/src/login/connect_engine.cpp b/src/login/connect_engine.cpp index 2b5d626cc0b..3e5a4340395 100644 --- a/src/login/connect_engine.cpp +++ b/src/login/connect_engine.cpp @@ -60,7 +60,7 @@ ConnectEngine::~ConnectEngine() auto ConnectEngine::periodicCleanup() -> Task { - while (scheduler_.isRunning()) + while (!scheduler_.closeRequested()) { co_await scheduler_.yieldFor(kSessionCleanTime); From 3c2e2912c629df23cd72b68886f53783826c8c84 Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Thu, 26 Feb 2026 11:37:32 +0000 Subject: [PATCH 04/10] Core: Propagate new Scheduler out to xi_world --- src/common/scheduler.h | 16 +++++++++++ src/world/world_application.cpp | 2 +- src/world/world_engine.cpp | 48 +++++++++++++++------------------ src/world/world_engine.h | 13 ++++----- 4 files changed, 45 insertions(+), 34 deletions(-) diff --git a/src/common/scheduler.h b/src/common/scheduler.h index 1efe2830cd1..31dc894657b 100644 --- a/src/common/scheduler.h +++ b/src/common/scheduler.h @@ -22,6 +22,7 @@ #pragma once #include +#include #include #include #include @@ -331,6 +332,21 @@ class Scheduler final co_await timer.async_wait(asio::use_awaitable); } + // withTimeout + // Executes a task with a given timeout. Returns std::optional, which is empty if the timeout + // was reached before the task completed. + template + [[nodiscard]] auto withTimeout(Task task, std::chrono::steady_clock::duration timeout) -> Task> + { + using namespace asio::experimental::awaitable_operators; + auto result = co_await (std::move(task) || yieldFor(timeout)); + if (result.index() == 0) + { + co_return std::move(std::get<0>(result)); + } + co_return std::nullopt; + } + // ioContext // Return the main io_context. // TODO: We should be trying to get rid of this accessor as soon as possible! diff --git a/src/world/world_application.cpp b/src/world/world_application.cpp index cf9b98526f0..a15258746e7 100644 --- a/src/world/world_application.cpp +++ b/src/world/world_application.cpp @@ -49,7 +49,7 @@ WorldApplication::~WorldApplication() = default; auto WorldApplication::createEngine() -> std::unique_ptr { - return std::make_unique(scheduler_.ioContext()); + return std::make_unique(scheduler_); } void WorldApplication::requestExit() diff --git a/src/world/world_engine.cpp b/src/world/world_engine.cpp index 201d18924c6..ede9474db0f 100644 --- a/src/world/world_engine.cpp +++ b/src/world/world_engine.cpp @@ -41,52 +41,46 @@ constexpr auto kPumpQueuesTime = 250ms; } // namespace -WorldEngine::WorldEngine(asio::io_context& io_context) -: ipcServer_(std::make_unique(*this)) +WorldEngine::WorldEngine(Scheduler& scheduler) +: scheduler_(scheduler) +, ipcServer_(std::make_unique(*this)) , partySystem_(std::make_unique(*this)) , conquestSystem_(std::make_unique(*this)) , besiegedSystem_(std::make_unique(*this)) , campaignSystem_(std::make_unique(*this)) , colonizationSystem_(std::make_unique(*this)) , httpServer_(std::make_unique()) -, m_timeServerTimer(io_context, kTimeServerTickInterval) -, m_queuePumpTimer(io_context, kPumpQueuesTime) { - m_timeServerTimer.async_wait(std::bind(&WorldEngine::timeServer, this, std::placeholders::_1)); + scheduler_.postToMainThread(timeServer()); + // TODO: Bind ZMQ socket FD to ASIO directly - m_queuePumpTimer.async_wait(std::bind(&WorldEngine::pumpQueues, this, std::placeholders::_1)); + scheduler_.postToMainThread(pumpQueues()); } -WorldEngine::~WorldEngine() -{ - m_timeServerTimer.cancel(); - m_queuePumpTimer.cancel(); -}; +WorldEngine::~WorldEngine() = default; -void WorldEngine::timeServer(const asio::error_code ec) +auto WorldEngine::timeServer() -> Task { - TracyZoneScoped; - - if (!ec) + while (!scheduler_.closeRequested()) { - time_server(this); + co_await scheduler_.yieldFor(kTimeServerTickInterval); - // Reschedule - m_timeServerTimer.expires_at(m_timeServerTimer.expiry() + kPumpQueuesTime); - m_timeServerTimer.async_wait(std::bind(&WorldEngine::timeServer, this, std::placeholders::_1)); + if (!scheduler_.closeRequested()) + { + time_server(this); + } } } -void WorldEngine::pumpQueues(const asio::error_code ec) +auto WorldEngine::pumpQueues() -> Task { - TracyZoneScoped; - - if (!ec) + while (!scheduler_.closeRequested()) { - ipcServer_->handleIncomingMessages(); + co_await scheduler_.yieldFor(kPumpQueuesTime); - // Reschedule - m_queuePumpTimer.expires_at(m_queuePumpTimer.expiry() + kPumpQueuesTime); - m_queuePumpTimer.async_wait(std::bind(&WorldEngine::pumpQueues, this, std::placeholders::_1)); + if (!scheduler_.closeRequested()) + { + ipcServer_->handleIncomingMessages(); + } } } diff --git a/src/world/world_engine.h b/src/world/world_engine.h index 93bba2ce995..66d2d9ce88e 100644 --- a/src/world/world_engine.h +++ b/src/world/world_engine.h @@ -22,6 +22,7 @@ #pragma once #include "common/application.h" +#include "common/scheduler.h" #include "common/zmq_router_wrapper.h" #include "http_server.h" @@ -40,9 +41,12 @@ class ColonizationSystem; class WorldEngine final : public Engine { public: - WorldEngine(asio::io_context& io_context); + WorldEngine(Scheduler& scheduler); ~WorldEngine() override; + // TODO: Make all of these members private + Scheduler& scheduler_; + std::unique_ptr ipcServer_; std::unique_ptr partySystem_; @@ -55,9 +59,6 @@ class WorldEngine final : public Engine std::unique_ptr httpServer_; private: - void timeServer(asio::error_code ec); - void pumpQueues(asio::error_code ec); - - asio::steady_timer m_timeServerTimer; - asio::steady_timer m_queuePumpTimer; + auto timeServer() -> Task; + auto pumpQueues() -> Task; }; From a27be118bc1153ccd844e0991c96a6a6f6257c17 Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Thu, 26 Feb 2026 11:38:04 +0000 Subject: [PATCH 05/10] Core: Propagate new Scheduler out to xi_search --- src/common/scheduler.h | 48 +++++-- src/search/CMakeLists.txt | 2 +- src/search/handler.h | 71 ---------- src/search/search_application.cpp | 2 +- src/search/search_engine.cpp | 39 ++++-- src/search/search_engine.h | 17 +-- src/search/search_handler.cpp | 223 +++++++++++------------------- src/search/search_handler.h | 78 ++++------- src/search/search_listener.h | 68 +++++++++ 9 files changed, 253 insertions(+), 295 deletions(-) delete mode 100644 src/search/handler.h create mode 100644 src/search/search_listener.h diff --git a/src/common/scheduler.h b/src/common/scheduler.h index 31dc894657b..24f3079cb39 100644 --- a/src/common/scheduler.h +++ b/src/common/scheduler.h @@ -230,14 +230,21 @@ class Scheduler final // Queue work lazily on the main thread. It won't start executing until you co_await the // returned task. template - [[nodiscard]] auto onMainThread(F&& func) -> Task>::type> + [[nodiscard]] auto onMainThread(F&& func) -> Task>> { return asio::co_spawn( mainContext_.get_executor(), - [fn = std::forward(func)]() mutable -> Task + [fn = std::forward(func)]() mutable -> Task>> { - fn(); - co_return; + if constexpr (std::is_void_v>>) + { + fn(); + co_return; + } + else + { + co_return fn(); + } }, asio::use_awaitable); } @@ -261,8 +268,15 @@ class Scheduler final workerContext_.get_executor(), [fn = std::forward(func)]() mutable -> Task { - fn(); - co_return; + if constexpr (std::is_void_v>>) + { + fn(); + co_return; + } + else + { + co_return fn(); + } }, asio::use_awaitable); } @@ -284,8 +298,15 @@ class Scheduler final mainContext_.get_executor(), [fn = std::forward(func)]() mutable -> Task { - fn(); - co_return; + if constexpr (std::is_void_v>>) + { + fn(); + co_return; + } + else + { + co_return fn(); + } }, asio::detached); } @@ -307,8 +328,15 @@ class Scheduler final workerContext_.get_executor(), [fn = std::forward(func)]() mutable -> Task { - fn(); - co_return; + if constexpr (std::is_void_v>>) + { + fn(); + co_return; + } + else + { + co_return fn(); + } }, asio::detached); } diff --git a/src/search/CMakeLists.txt b/src/search/CMakeLists.txt index bcfb5da6695..ad19f9bf9a9 100644 --- a/src/search/CMakeLists.txt +++ b/src/search/CMakeLists.txt @@ -5,7 +5,7 @@ set(SOURCES ${PACKET_SOURCES} data_loader.cpp data_loader.h - handler.h + search_listener.h search_application.h search_application.cpp search_engine.h diff --git a/src/search/handler.h b/src/search/handler.h deleted file mode 100644 index 507f1f15250..00000000000 --- a/src/search/handler.h +++ /dev/null @@ -1,71 +0,0 @@ -/* -=========================================================================== - - Copyright (c) 2024 LandSandBoat Dev Teams - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see http://www.gnu.org/licenses/ - -=========================================================================== -*/ - -#pragma once - -#include -#include - -#include "search_handler.h" - -class handler -{ -public: - handler(asio::io_context& io_context, unsigned int port, SynchronizedShared>& ipWhitelist) - : acceptor_(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) - { - acceptor_.set_option(asio::socket_base::reuse_address(true)); - - do_accept( - [&](asio::ip::tcp::socket socket) - { - const auto handler = std::make_shared(std::move(socket), io_context, ipInFlight_, ipWhitelist); - handler->start(); - }); - } - -private: - void do_accept(std::function acceptFn) - { - acceptor_.async_accept( - [this, acceptFn](const std::error_code ec, asio::ip::tcp::socket socket) - { - if (!ec) - { - acceptFn(std::move(socket)); - } - else - { - // TODO: This can't be the Fmt variant because of constexpr things? - ShowError(ec.message()); - } - - do_accept(acceptFn); - }); - } - - asio::ip::tcp::acceptor acceptor_; - - // A single IP should only have one request in flight at a time, so we are going to - // be tracking the IP addresses of incoming requests and if we haven't cleared the - // record for it - we block until it's done - SynchronizedShared> ipInFlight_; -}; diff --git a/src/search/search_application.cpp b/src/search/search_application.cpp index 3bfa420243b..1c2208a455c 100644 --- a/src/search/search_application.cpp +++ b/src/search/search_application.cpp @@ -46,7 +46,7 @@ SearchApplication::~SearchApplication() = default; auto SearchApplication::createEngine() -> std::unique_ptr { - return std::make_unique(scheduler_.ioContext()); + return std::make_unique(scheduler_); } void SearchApplication::registerCommands(ConsoleService& console) diff --git a/src/search/search_engine.cpp b/src/search/search_engine.cpp index 6c29e67b8e8..1499de979db 100644 --- a/src/search/search_engine.cpp +++ b/src/search/search_engine.cpp @@ -1,4 +1,4 @@ -/* +/* =========================================================================== Copyright (c) 2025 LandSandBoat Dev Teams @@ -23,15 +23,15 @@ #include "common/lua.h" #include "data_loader.h" -SearchEngine::SearchEngine(asio::io_context& io_context) -: m_searchHandler(io_context, settings::get("network.SEARCH_PORT"), m_ipWhitelist) -, m_periodicCleanupTimer(io_context, std::chrono::seconds(settings::get("search.EXPIRE_INTERVAL"))) +SearchEngine::SearchEngine(Scheduler& scheduler) +: scheduler_(scheduler) +, searchListener_(scheduler_, settings::get("network.SEARCH_PORT"), ipWhitelist_) { const auto accessWhitelist = lua["xi"]["settings"]["search"]["ACCESS_WHITELIST"].get_or_create(); for (const auto& [_, value] : accessWhitelist) { auto str = value.as(); - m_ipWhitelist.write( + ipWhitelist_.write( [str](auto& ipWhitelist) { ipWhitelist.insert(str); @@ -40,21 +40,29 @@ SearchEngine::SearchEngine(asio::io_context& io_context) if (settings::get("search.EXPIRE_AUCTIONS")) { - m_periodicCleanupTimer.async_wait(std::bind(&SearchEngine::periodicCleanup, this, std::placeholders::_1)); + scheduler_.postToMainThread( + [this]() -> Task + { + co_await periodicCleanup(); + }); } } SearchEngine::~SearchEngine() { - m_periodicCleanupTimer.cancel(); -}; +} void SearchEngine::onInitialize() { if (settings::get("search.EXPIRE_AUCTIONS")) { ShowInfoFmt("AH task to return items older than {} days is running", settings::get("search.EXPIRE_DAYS")); - expireAH(settings::get("search.EXPIRE_DAYS")); + + scheduler_.postToWorkerThread( + [this, days = settings::get("search.EXPIRE_DAYS")] + { + expireAH(days); + }); } } @@ -74,14 +82,15 @@ void SearchEngine::expireAH(const std::optional days) const data.ExpireAHItems(days.value_or(0)); } -void SearchEngine::periodicCleanup(const asio::error_code& error) +auto SearchEngine::periodicCleanup() -> Task { - if (!error) + while (!scheduler_.closeRequested()) { - expireAH(settings::get("search.EXPIRE_DAYS")); + co_await scheduler_.yieldFor(std::chrono::seconds(settings::get("search.EXPIRE_INTERVAL"))); - // reset timer - m_periodicCleanupTimer.expires_at(m_periodicCleanupTimer.expiry() + std::chrono::seconds(settings::get("search.EXPIRE_INTERVAL"))); - m_periodicCleanupTimer.async_wait(std::bind(&SearchEngine::periodicCleanup, this, std::placeholders::_1)); + if (!scheduler_.closeRequested()) + { + expireAH(settings::get("search.EXPIRE_DAYS")); + } } } diff --git a/src/search/search_engine.h b/src/search/search_engine.h index d84e5a038d1..b3115361bf3 100644 --- a/src/search/search_engine.h +++ b/src/search/search_engine.h @@ -21,8 +21,9 @@ #pragma once -#include "common/application.h" -#include "common/utils.h" +#include +#include +#include #include @@ -31,12 +32,12 @@ #endif // search specific stuff -#include "handler.h" +#include "search_listener.h" class SearchEngine final : public Engine { public: - SearchEngine(asio::io_context& io_context); + SearchEngine(Scheduler& scheduler); ~SearchEngine() override; void onInitialize() override; @@ -47,11 +48,11 @@ class SearchEngine final : public Engine void expireAH(std::optional days) const; private: - handler m_searchHandler; - asio::steady_timer m_periodicCleanupTimer; + auto periodicCleanup() -> Task; - void periodicCleanup(const asio::error_code& error); + Scheduler& scheduler_; + SearchListener searchListener_; // NOTE: We're only using the read-lock for this - SynchronizedShared> m_ipWhitelist; + SynchronizedShared> ipWhitelist_; }; diff --git a/src/search/search_handler.cpp b/src/search/search_handler.cpp index c8452a8c1c6..2c625966c5c 100644 --- a/src/search/search_handler.cpp +++ b/src/search/search_handler.cpp @@ -1,4 +1,4 @@ -/* +/* =========================================================================== Copyright (c) 2023 LandSandBoat Dev Teams @@ -37,142 +37,133 @@ #include "packets/search_comment.h" #include "packets/search_list.h" -search_handler::search_handler(asio::ip::tcp::socket socket, asio::io_context& io_context, SynchronizedShared>& IPAddressesInUseList, SynchronizedShared>& IPAddressWhitelist) -: socket_(std::move(socket)) +SearchHandler::SearchHandler(Scheduler& scheduler, asio::ip::tcp::socket socket, SynchronizedShared>& IPAddressesInUseList, SynchronizedShared>& IPAddressWhitelist) +: scheduler_(scheduler) +, socket_(std::move(socket)) , buffer_{} , IPAddressesInUse_(IPAddressesInUseList) , IPAddressWhitelist_(IPAddressWhitelist) -, deadline_(io_context) { DebugSocketsFmt("New connection from IP {}", socket_.lowest_layer().remote_endpoint().address().to_string()); asio::error_code ec = {}; socket_.lowest_layer().set_option(asio::socket_base::reuse_address(true)); - ipAddress = socket_.lowest_layer().remote_endpoint(ec).address().to_string(); + ipAddress_ = socket_.lowest_layer().remote_endpoint(ec).address().to_string(); if (ec) { - ipAddress = "error"; + ipAddress_ = "error"; socket_.lowest_layer().close(); } else { - addToUsedIPAddresses(ipAddress); + addToUsedIPAddresses(ipAddress_); - if (getNumSessionsInUse(ipAddress) > 5) + if (getNumSessionsInUse(ipAddress_) > 5) { - ShowErrorFmt("More than 5 simultaneous connections from {}. Closing socket.", ipAddress); + ShowErrorFmt("More than 5 simultaneous connections from {}. Closing socket.", ipAddress_); socket_.lowest_layer().close(); return; } } } -search_handler::~search_handler() +SearchHandler::~SearchHandler() { - DebugSocketsFmt("Connection from IP {} closed", ipAddress); - removeFromUsedIPAddresses(ipAddress); + DebugSocketsFmt("Connection from IP {} closed", ipAddress_); + removeFromUsedIPAddresses(ipAddress_); } -void search_handler::start() +auto SearchHandler::run() -> Task { - if (socket_.lowest_layer().is_open()) - { - deadline_.expires_after(10s); // AH searches can take quite a while - deadline_.async_wait(std::bind(&search_handler::checkDeadline, this, shared_from_this())); + auto self = shared_from_this(); - do_read(); - } -} + try + { + while (socket_.lowest_layer().is_open() && !scheduler_.closeRequested()) + { + std::memset(buffer_.data(), 0, buffer_.size()); -void search_handler::do_read() -{ - std::memset(buffer_.data(), 0, buffer_.size()); + auto result = co_await scheduler_.withTimeout( + socket_.async_read_some(asio::buffer(buffer_.data(), buffer_.size()), asio::use_awaitable), + 10s); - socket_.async_read_some( - asio::buffer(buffer_.data(), buffer_.size()), - [this, self = shared_from_this()](std::error_code ec, std::size_t length) - { - if (!ec) + if (!result.has_value()) // timed out { - DebugSocketsFmt("async_read_some: Received packet from IP {} ({} bytes)", ipAddress, length); - read_func(length); + DebugSocketsFmt("Socket timed out from {}", ipAddress_); + break; } - else + + const auto length = result.value(); + if (length == 0) // EOF { - // EOF when searchPackets is empty is normal. Any other state is a legitimate error. - if (!searchPackets.empty() || (searchPackets.empty() && ec.value() != asio::error::eof)) - { - DebugSocketsFmt("async_read_some error in from IP {} ({}: {})", ipAddress, ec.value(), ec.message()); - handle_error(ec, self); - } + break; } - }); -} -void search_handler::do_write() -{ - auto packet = searchPackets.front(); - auto length = packet.getSize(); + DebugSocketsFmt("Received packet from IP {} ({} bytes)", ipAddress_, length); - std::memset(buffer_.data(), 0, buffer_.size()); - std::memcpy(buffer_.data(), packet.getData(), packet.getSize()); + read_func(static_cast(length)); - searchPackets.pop_front(); + while (!searchPackets_.empty()) + { + auto packet = searchPackets_.front(); + auto write_len = packet.getSize(); - encrypt(length); + std::memset(buffer_.data(), 0, buffer_.size()); + std::memcpy(buffer_.data(), packet.getData(), write_len); - DebugSocketsFmt("async_write: Sending packet to IP {} ({} bytes)", ipAddress, length); - socket_.async_write_some( - asio::buffer(buffer_.data(), length), - [this, self = shared_from_this()](std::error_code ec, std::size_t /*length*/) - { - if (!ec) - { - // Apparently a reply is expected. Not sure what the reply contains exactly, but bad things happen if we don't wait for it. - do_read(); - } - else - { - DebugSocketsFmt("async_write_some error in from IP {} ({}: {})", ipAddress, ec.value(), ec.message()); - handle_error(ec, self); + searchPackets_.pop_front(); + + encrypt(write_len); + + DebugSocketsFmt("Sending packet to IP {} ({} bytes)", ipAddress_, write_len); + + co_await socket_.async_write_some(asio::buffer(buffer_.data(), write_len), asio::use_awaitable); } - }); + } + } + catch (const std::exception& e) + { + DebugSocketsFmt("Socket error from IP {}: {}", ipAddress_, e.what()); + } + + asio::error_code ec; + socket_.lowest_layer().close(ec); } -void search_handler::decrypt(uint16_t length) +void SearchHandler::decrypt(uint16_t length) { - DebugSocketsFmt("Decrypting packet from IP {} ({} bytes)", ipAddress, length); + DebugSocketsFmt("Decrypting packet from IP {} ({} bytes)", ipAddress_, length); // Get key from packet ref(key, 16) = ref(buffer_.data(), length - 4); // Decrypt packet - md5(reinterpret_cast(key), blowfish.hash, 20); + md5(reinterpret_cast(key), blowfish_.hash, 20); - blowfish_init(reinterpret_cast(blowfish.hash), 16, blowfish.P, blowfish.S[0]); + blowfish_init(reinterpret_cast(blowfish_.hash), 16, blowfish_.P, blowfish_.S[0]); uint16_t tmp = (length - 12) / 4; tmp -= tmp % 2; for (uint16_t i = 0; i < tmp; i += 2) { - blowfish_decipher(reinterpret_cast(buffer_.data()) + i + 2, reinterpret_cast(buffer_.data()) + i + 3, blowfish.P, blowfish.S[0]); + blowfish_decipher(reinterpret_cast(buffer_.data()) + i + 2, reinterpret_cast(buffer_.data()) + i + 3, blowfish_.P, blowfish_.S[0]); } ref(key, 20) = ref(buffer_.data(), length - 0x18); } -void search_handler::encrypt(uint16_t length) +void SearchHandler::encrypt(uint16_t length) { - DebugSocketsFmt("Encrypting packet for IP {} ({} bytes)", ipAddress, length); + DebugSocketsFmt("Encrypting packet for IP {} ({} bytes)", ipAddress_, length); ref(buffer_.data(), 0x00) = length; // packet size ref(buffer_.data(), 0x04) = 0x46465849; // "IXFF" - md5(reinterpret_cast(key), blowfish.hash, 24); + md5(reinterpret_cast(key), blowfish_.hash, 24); - blowfish_init((int8*)blowfish.hash, 16, blowfish.P, blowfish.S[0]); + blowfish_init((int8*)blowfish_.hash, 16, blowfish_.P, blowfish_.S[0]); md5(buffer_.data() + 8, buffer_.data() + length - 0x18 + 0x04, length - 0x18 - 0x04); @@ -181,15 +172,15 @@ void search_handler::encrypt(uint16_t length) for (uint8 i = 0; i < tmp; i += 2) { - blowfish_encipher(reinterpret_cast(buffer_.data()) + i + 2, reinterpret_cast(buffer_.data()) + i + 3, blowfish.P, blowfish.S[0]); + blowfish_encipher(reinterpret_cast(buffer_.data()) + i + 2, reinterpret_cast(buffer_.data()) + i + 3, blowfish_.P, blowfish_.S[0]); } memcpy(&buffer_[length] - 0x04, key + 16, 4); } -bool search_handler::validatePacket(uint16_t length) +bool SearchHandler::validatePacket(uint16_t length) { - DebugSocketsFmt("Validating packet from IP {} ({} bytes)", ipAddress, length); + DebugSocketsFmt("Validating packet from IP {} ({} bytes)", ipAddress_, length); // Check if packet is valid uint8 PacketHash[16]{}; @@ -239,29 +230,21 @@ inline std::string searchTypeToString(uint8 type) } } -void search_handler::read_func(uint16_t length) +void SearchHandler::read_func(uint16_t length) { - // if we already have a query in-flight... - if (!searchPackets.empty()) - { - do_write(); - return; - } - if (length != ref(buffer_.data(), 0x00) || length < 28) { ShowErrorFmt("Search packetsize wrong. Size {} should be {}.", length, ref(buffer_.data(), 0x00)); return; } - deadline_.cancel(); // If we read, don't abort the deadline in the future decrypt(length); if (validatePacket(length)) { uint8 packetType = buffer_[0x0B]; - ShowInfoFmt("Search Request: {} ({}), size: {}, ip: {}", searchTypeToString(packetType), packetType, length, ipAddress); + ShowInfoFmt("Search Request: {} ({}), size: {}, ip: {}", searchTypeToString(packetType), packetType, length, ipAddress_); switch (packetType) { @@ -301,13 +284,6 @@ void search_handler::read_func(uint16_t length) } } -void search_handler::handle_error(std::error_code ec, std::shared_ptr self) -{ - std::ignore = ec; - - self = nullptr; -} - // Mostly copy-pasted DSP era code. It works, so why change it? /************************************************************************ * * @@ -332,7 +308,6 @@ void DebugPrintPacket(char* data, uint16_t size) } } - // TODO: This can't be the Fmt variant because of constexpr things? ShowDebug(outStr); } @@ -342,7 +317,7 @@ void DebugPrintPacket(char* data, uint16_t size) * * ************************************************************************/ -void search_handler::HandleGroupListRequest() +void SearchHandler::HandleGroupListRequest() { uint32 partyid = ref(buffer_.data(), 0x10); uint32 allianceid = ref(buffer_.data(), 0x14); @@ -368,7 +343,7 @@ void search_handler::HandleGroupListRequest() uint16_t length = PPartyPacket.GetSize(); DebugPrintPacket((char*)PPartyPacket.GetData(), length); - searchPackets.emplace_back(PPartyPacket.GetData(), length); + searchPackets_.emplace_back(PPartyPacket.GetData(), length); } else if (linkshellid1 != 0 || linkshellid2 != 0) { @@ -406,18 +381,13 @@ void search_handler::HandleGroupListRequest() uint16_t length = PLinkshellPacket.GetSize(); DebugPrintPacket((char*)PLinkshellPacket.GetData(), length); - searchPackets.emplace_back(PLinkshellPacket.GetData(), length); + searchPackets_.emplace_back(PLinkshellPacket.GetData(), length); } while (currentResult < totalResults); } - - if (!searchPackets.empty()) - { - do_write(); - } } -void search_handler::HandleSearchComment() +void SearchHandler::HandleSearchComment() { uint32 playerId = ref(buffer_.data(), 0x10); @@ -433,12 +403,10 @@ void search_handler::HandleSearchComment() uint16_t length = commentPacket.GetSize(); DebugPrintPacket((char*)commentPacket.GetData(), length); - searchPackets.emplace_back(commentPacket.GetData(), length); - - do_write(); + searchPackets_.emplace_back(commentPacket.GetData(), length); } -void search_handler::HandleSearchRequest() +void SearchHandler::HandleSearchRequest() { const search_req sr = _HandleSearchRequest(); @@ -478,17 +446,12 @@ void search_handler::HandleSearchRequest() uint16_t length = PSearchPacket.GetSize(); DebugPrintPacket((char*)PSearchPacket.GetData(), length); - searchPackets.emplace_back(PSearchPacket.GetData(), length); + searchPackets_.emplace_back(PSearchPacket.GetData(), length); } while (currentResult < totalResults); - - if (!searchPackets.empty()) - { - do_write(); - } } -void search_handler::HandleAuctionHouseRequest() +void SearchHandler::HandleAuctionHouseRequest() { uint8 AHCatID = ref(buffer_.data(), 0x16); @@ -546,16 +509,11 @@ void search_handler::HandleAuctionHouseRequest() uint16_t length = PAHPacket.GetSize(); DebugPrintPacket((char*)PAHPacket.GetData(), length); - searchPackets.emplace_back(PAHPacket.GetData(), length); - } - - if (!searchPackets.empty()) - { - do_write(); + searchPackets_.emplace_back(PAHPacket.GetData(), length); } } -void search_handler::HandleAuctionHouseHistory() +void SearchHandler::HandleAuctionHouseHistory() { uint16 ItemID = ref(buffer_.data(), 0x12); uint8 stack = ref(buffer_.data(), 0x15); @@ -574,12 +532,10 @@ void search_handler::HandleAuctionHouseHistory() uint16_t length = PAHPacket.GetSize(); DebugPrintPacket((char*)PAHPacket.GetData(), length); - searchPackets.emplace_back(PAHPacket.GetData(), length); - - do_write(); + searchPackets_.emplace_back(PAHPacket.GetData(), length); } -search_req search_handler::_HandleSearchRequest() +search_req SearchHandler::_HandleSearchRequest() { // This function constructs a `search_req` based on which query should be sent to the database. // The results from the database will eventually be sent to the client. @@ -828,7 +784,7 @@ search_req search_handler::_HandleSearchRequest() // For example: "/blacklist delete Name" and "/sea all Name" } -uint16_t search_handler::getNumSessionsInUse(const std::string& ipAddressStr) +uint16_t SearchHandler::getNumSessionsInUse(const std::string& ipAddressStr) { DebugSocketsFmt("Checking if IP is in use: {}", ipAddressStr); @@ -841,8 +797,6 @@ uint16_t search_handler::getNumSessionsInUse(const std::string& ipAddressStr) return 0; } - // ShowInfoFmt("Checking if IP is in use: {}", ipAddressStr); - return IPAddressesInUse_.read( [ipAddressStr](const auto& ipAddrsInUse) -> uint16_t { @@ -855,7 +809,7 @@ uint16_t search_handler::getNumSessionsInUse(const std::string& ipAddressStr) }); } -void search_handler::removeFromUsedIPAddresses(const std::string& ipAddressStr) +void SearchHandler::removeFromUsedIPAddresses(const std::string& ipAddressStr) { DebugSocketsFmt("Removing IP from active set: {}", ipAddressStr); @@ -868,8 +822,6 @@ void search_handler::removeFromUsedIPAddresses(const std::string& ipAddressStr) return; } - // ShowInfoFmt("Removing IP from set: {}", ipAddressStr); - IPAddressesInUse_.write( [ipAddressStr](auto& ipAddrsInUse) { @@ -890,7 +842,7 @@ void search_handler::removeFromUsedIPAddresses(const std::string& ipAddressStr) }); } -void search_handler::addToUsedIPAddresses(const std::string& ipAddressStr) +void SearchHandler::addToUsedIPAddresses(const std::string& ipAddressStr) { DebugSocketsFmt("Adding IP to active set: {}", ipAddressStr); @@ -903,8 +855,6 @@ void search_handler::addToUsedIPAddresses(const std::string& ipAddressStr) return; } - // ShowInfoFmt("Adding IP to set: {}", ipAddressStr); - IPAddressesInUse_.write( [ipAddressStr](auto& ipAddrsInUse) { @@ -918,12 +868,3 @@ void search_handler::addToUsedIPAddresses(const std::string& ipAddressStr) } }); } - -void search_handler::checkDeadline(const std::shared_ptr& self) // self to keep the object alive -{ - if (timer::now() > deadline_.expiry()) - { - DebugSocketsFmt("Socket timed out from {}", ipAddress); - socket_.cancel(); - } -} diff --git a/src/search/search_handler.h b/src/search/search_handler.h index 3d7af3eac66..847d4b1a5f1 100644 --- a/src/search/search_handler.h +++ b/src/search/search_handler.h @@ -1,4 +1,4 @@ -/* +/* =========================================================================== Copyright (c) 2023 LandSandBoat Dev Teams @@ -24,11 +24,14 @@ #include #include #include +#include #include #include #include "common/blowfish.h" +#include "common/scheduler.h" #include "common/synchronized.h" + #include "search.h" enum TCPREQUESTTYPE @@ -43,31 +46,44 @@ enum TCPREQUESTTYPE TCP_AH_REQUEST = 0x15, }; -class search_handler -: public std::enable_shared_from_this +class SearchHandler : public std::enable_shared_from_this { public: - search_handler(asio::ip::tcp::socket socket, asio::io_context& io_context, SynchronizedShared>& IPAddressesInUseList, SynchronizedShared>& IPAddressWhitelist); - - ~search_handler(); + SearchHandler(Scheduler& scheduler, asio::ip::tcp::socket socket, SynchronizedShared>& IPAddressesInUseList, SynchronizedShared>& IPAddressWhitelist); + ~SearchHandler(); + auto run() -> Task; - void start(); +private: + void read_func(uint16_t length); - void do_read(); + uint16_t getNumSessionsInUse(const std::string& ipAddressStr); + void addToUsedIPAddresses(const std::string& ipAddressStr); + void removeFromUsedIPAddresses(const std::string& ipAddressStr); - void handle_error(std::error_code ec, std::shared_ptr self); + bool validatePacket(uint16_t length); + void decrypt(uint16_t length); + void encrypt(uint16_t length); - void do_write(); + void HandleSearchRequest(); + void HandleGroupListRequest(); + void HandleSearchComment(); + void HandleAuctionHouseRequest(); + void HandleAuctionHouseHistory(); - void read_func(uint16_t length); + auto _HandleSearchRequest() -> search_req; - std::string ipAddress; // Store IP address in class -- once the file handle is invalid this can no longer be obtained from socket_ - asio::ip::tcp::socket socket_; + Scheduler& scheduler_; + blowfish_t blowfish_; + std::deque searchPackets_; + std::string ipAddress_; + asio::ip::tcp::socket socket_; std::array buffer_; - // Blowfish key + SynchronizedShared>& IPAddressesInUse_; + SynchronizedShared>& IPAddressWhitelist_; + // Blowfish key uint8 key[24] = { 0x30, 0x73, @@ -94,38 +110,4 @@ class search_handler 0x00, 0x00, }; - -private: - // A single IP should only have one request in flight at a time, so we are going to - // be tracking the IP addresses of incoming requests and if we haven't cleared the - // record for it - we block until it's done - SynchronizedShared>& IPAddressesInUse_; - - // NOTE: We're only using the read-lock for this - SynchronizedShared>& IPAddressWhitelist_; - - // Deadline timer to drop a read - asio::steady_timer deadline_; - - void checkDeadline(const std::shared_ptr& self); - - uint16_t getNumSessionsInUse(const std::string& ipAddressStr); - void addToUsedIPAddresses(const std::string& ipAddressStr); - void removeFromUsedIPAddresses(const std::string& ipAddressStr); - - bool validatePacket(uint16_t length); - void decrypt(uint16_t length); - void encrypt(uint16_t length); - - void HandleSearchRequest(); - void HandleGroupListRequest(); - void HandleSearchComment(); - void HandleAuctionHouseRequest(); - void HandleAuctionHouseHistory(); - - auto _HandleSearchRequest() -> search_req; - - blowfish_t blowfish; - - std::deque searchPackets; }; diff --git a/src/search/search_listener.h b/src/search/search_listener.h new file mode 100644 index 00000000000..51c8c36c713 --- /dev/null +++ b/src/search/search_listener.h @@ -0,0 +1,68 @@ +/* +=========================================================================== + + Copyright (c) 2024 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#pragma once + +#include +#include +#include + +#include "common/scheduler.h" +#include "search_handler.h" + +class SearchListener +{ +public: + SearchListener(Scheduler& scheduler, unsigned int port, SynchronizedShared>& ipWhitelist) + : scheduler_(scheduler) + , acceptor_(scheduler_.ioContext(), asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) + , ipWhitelist_(ipWhitelist) + { + acceptor_.set_option(asio::socket_base::reuse_address(true)); + + scheduler_.postToMainThread(accept_loop()); + } + +private: + auto accept_loop() -> Task + { + while (!scheduler_.closeRequested()) + { + auto [ec, socket] = co_await acceptor_.async_accept(asio::as_tuple(asio::use_awaitable)); + + if (!ec) + { + auto handler = std::make_shared(scheduler_, std::move(socket), ipInFlight_, ipWhitelist_); + scheduler_.postToMainThread(handler->run()); + } + else + { + ShowErrorFmt("Failed to accept connection: {}", ec.message()); + } + } + } + + Scheduler& scheduler_; + asio::ip::tcp::acceptor acceptor_; + + SynchronizedShared>& ipWhitelist_; + SynchronizedShared> ipInFlight_; +}; From e1bde4e09e1cbbd683bd69aa6393eb60a00033e0 Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Thu, 26 Feb 2026 11:44:01 +0000 Subject: [PATCH 06/10] Core: Propagate new Scheduler out to xi_map and xi_test --- src/map/map_application.cpp | 8 +++++++- src/map/map_application.h | 1 + src/map/map_engine.cpp | 6 +++--- src/map/map_engine.h | 4 ++-- src/map/map_networking.cpp | 7 ++++--- src/map/map_networking.h | 4 +++- src/map/map_socket.cpp | 21 ++++++++++----------- src/map/map_socket.h | 7 ++++--- src/test/test_application.cpp | 2 +- src/test/test_engine.cpp | 7 ++++--- src/test/test_engine.h | 3 ++- 11 files changed, 41 insertions(+), 29 deletions(-) diff --git a/src/map/map_application.cpp b/src/map/map_application.cpp index 5a49cec16fa..6905337f49c 100644 --- a/src/map/map_application.cpp +++ b/src/map/map_application.cpp @@ -88,7 +88,7 @@ MapApplication::~MapApplication() auto MapApplication::createEngine() -> std::unique_ptr { - return std::make_unique(scheduler_.ioContext(), engineConfig_); + return std::make_unique(scheduler_, engineConfig_); } void MapApplication::registerCommands(ConsoleService& console) @@ -101,6 +101,12 @@ void MapApplication::registerCommands(ConsoleService& console) console.registerCommand("backtrace", "Print backtrace", std::bind(&MapEngine::onBacktrace, mapEngine, std::placeholders::_1)); } +void MapApplication::requestExit() +{ + Application::requestExit(); + scheduler_.stop(); +} + void MapApplication::run() { engine_ = createEngine(); diff --git a/src/map/map_application.h b/src/map/map_application.h index 66f3a5ac2f9..473f127c092 100644 --- a/src/map/map_application.h +++ b/src/map/map_application.h @@ -48,6 +48,7 @@ class MapApplication final : public Application auto createEngine() -> std::unique_ptr override; void registerCommands(ConsoleService& console) override; void run() override; + void requestExit() override; private: MapConfig engineConfig_{}; diff --git a/src/map/map_engine.cpp b/src/map/map_engine.cpp index 91258c1b0dd..18f2d62d11b 100644 --- a/src/map/map_engine.cpp +++ b/src/map/map_engine.cpp @@ -94,10 +94,10 @@ extern std::map g_PZoneList; // Global array of pointers for zones -MapEngine::MapEngine(asio::io_context& io_context, MapConfig& config) -: ioContext_(io_context) +MapEngine::MapEngine(Scheduler& scheduler, MapConfig& config) +: scheduler_(scheduler) , mapStatistics_(std::make_unique()) -, networking_(std::make_unique(*mapStatistics_, config, ioContext_)) +, networking_(std::make_unique(scheduler_, *mapStatistics_, config)) , engineConfig_(config) { do_init(); diff --git a/src/map/map_engine.h b/src/map/map_engine.h index f3bb152a94d..b93cc4d554d 100644 --- a/src/map/map_engine.h +++ b/src/map/map_engine.h @@ -57,7 +57,7 @@ extern std::map g_PZoneList; // Global array of pointers for zon class MapEngine final : public Engine { public: - MapEngine(asio::io_context& io_context, MapConfig& config); + MapEngine(Scheduler& scheduler, MapConfig& config); ~MapEngine() override; void gameLoop(); @@ -99,7 +99,7 @@ class MapEngine final : public Engine void requestExit(); private: - asio::io_context& ioContext_; // this is also shared with networking_ + Scheduler& scheduler_; std::unique_ptr mapStatistics_; std::unique_ptr networking_; std::unique_ptr watchdog_; diff --git a/src/map/map_networking.cpp b/src/map/map_networking.cpp index 2843de7dd61..433d28ff3b6 100644 --- a/src/map/map_networking.cpp +++ b/src/map/map_networking.cpp @@ -66,8 +66,9 @@ uint32 TotalPacketsDelayedPerTick = 0U; } // namespace -MapNetworking::MapNetworking(MapStatistics& mapStatistics, const MapConfig& mapConfig, asio::io_context& io_context) -: mapStatistics_(mapStatistics) +MapNetworking::MapNetworking(Scheduler& scheduler, MapStatistics& mapStatistics, const MapConfig& mapConfig) +: scheduler_(scheduler) +, mapStatistics_(mapStatistics) , mapIPP_(mapConfig.ipp) { TracyZoneScoped; @@ -82,7 +83,7 @@ MapNetworking::MapNetworking(MapStatistics& mapStatistics, const MapConfig& mapC try { const auto udpPort = mapIPP_.getPort() == 0 ? settings::get("network.MAP_PORT") : mapIPP_.getPort(); - mapSocket_ = std::make_unique(io_context, udpPort, std::bind(&MapNetworking::handle_incoming_packet, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + mapSocket_ = std::make_unique(scheduler_, udpPort, std::bind(&MapNetworking::handle_incoming_packet, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } catch (const std::exception& e) { diff --git a/src/map/map_networking.h b/src/map/map_networking.h index 22df265c18c..f1af8ded8b2 100644 --- a/src/map/map_networking.h +++ b/src/map/map_networking.h @@ -24,6 +24,7 @@ #include "common/blowfish.h" #include "common/cbasetypes.h" #include "common/ipp.h" +#include "common/scheduler.h" #include "map_constants.h" #include "map_session.h" @@ -40,7 +41,7 @@ class MapEngine; class MapNetworking { public: - MapNetworking(MapStatistics& mapStatistics, const MapConfig& mapConfig, asio::io_context& io_context); + MapNetworking(Scheduler& scheduler, MapStatistics& mapStatistics, const MapConfig& mapConfig); // // Networking @@ -69,6 +70,7 @@ class MapNetworking auto socket() -> MapSocket&; private: + Scheduler& scheduler_; MapStatistics& mapStatistics_; IPP mapIPP_; MapSessionContainer mapSessions_; diff --git a/src/map/map_socket.cpp b/src/map/map_socket.cpp index b94b89cbf20..26ac92bc085 100644 --- a/src/map/map_socket.cpp +++ b/src/map/map_socket.cpp @@ -23,12 +23,11 @@ #include "common/logging.h" -MapSocket::MapSocket(asio::io_context& io_context, const uint16 port, ReceiveFn onReceiveFn) -: port_(port) -, io_context_(io_context) -, socket_(io_context) +MapSocket::MapSocket(Scheduler& scheduler, const uint16 port, ReceiveFn onReceiveFn) +: scheduler_(scheduler) +, port_(port) +, socket_(scheduler_.ioContext()) , buffer_{} -, isRunning(true) , onReceiveFn_(std::move(onReceiveFn)) { TracyZoneScoped; @@ -71,7 +70,7 @@ void MapSocket::startReceive() onReceiveFn_(ec, buffer, ipp); - if (!io_context_.stopped() && socket_.is_open()) + if (!scheduler_.closeRequested() && socket_.is_open()) { startReceive(); // Queue up more work } @@ -83,14 +82,14 @@ void MapSocket::recvFor(timer::duration duration) TracyZoneScoped; // Blocks until the duration is up - io_context_.run_for(duration); + scheduler_.ioContext().run_for(duration); // Once run_for() or run() return the io_context enters a stopped state, // even if there are still pending asynchronous operations. You need to // call restart() to clear that state before you can run it again. - if (isRunning) + if (isRunning_) { - io_context_.restart(); + scheduler_.ioContext().restart(); } } @@ -122,6 +121,6 @@ void MapSocket::send(const IPP& ipp, std::span buffer) void MapSocket::requestExit() { - isRunning = false; - io_context_.stop(); + isRunning_ = false; + scheduler_.stop(); } diff --git a/src/map/map_socket.h b/src/map/map_socket.h index a66316cc697..5ed15cd018c 100644 --- a/src/map/map_socket.h +++ b/src/map/map_socket.h @@ -24,6 +24,7 @@ #include "common/blowfish.h" #include "common/cbasetypes.h" #include "common/ipp.h" +#include "common/scheduler.h" #include "map_constants.h" @@ -37,7 +38,7 @@ class MapSocket public: using ReceiveFn = std::function, IPP)>; - MapSocket(asio::io_context& io_context, uint16 port, ReceiveFn onReceiveFn); // TODO: Move passing in onReceiveFn to recvFor + MapSocket(Scheduler& scheduler, uint16 port, ReceiveFn onReceiveFn); ~MapSocket(); void recvFor(timer::duration duration); @@ -48,12 +49,12 @@ class MapSocket private: void startReceive(); + Scheduler& scheduler_; uint16 port_; - asio::io_context& io_context_; asio::ip::udp::socket socket_; NetworkBuffer buffer_; // TODO: Pass in the global buffer, or only use this one asio::ip::udp::endpoint remote_endpoint_; - bool isRunning; + bool isRunning_{ true }; ReceiveFn onReceiveFn_; }; diff --git a/src/test/test_application.cpp b/src/test/test_application.cpp index 1f60822b44f..e34e4bcc99b 100644 --- a/src/test/test_application.cpp +++ b/src/test/test_application.cpp @@ -113,7 +113,7 @@ auto TestApplication::createEngine() -> std::unique_ptr }, }; - return std::make_unique(scheduler_.ioContext(), config); + return std::make_unique(scheduler_, config); } void TestApplication::run() diff --git a/src/test/test_engine.cpp b/src/test/test_engine.cpp index 07e7571acab..142b89d3759 100644 --- a/src/test/test_engine.cpp +++ b/src/test/test_engine.cpp @@ -39,8 +39,9 @@ #include #include -TestEngine::TestEngine(asio::io_context& io_context, TestConfig testConfig) -: worldEngine_(std::make_unique(io_context)) +TestEngine::TestEngine(Scheduler& scheduler, TestConfig testConfig) +: scheduler_(scheduler) +, worldEngine_(std::make_unique(scheduler_)) , mockManager_(std::make_unique()) , testConfig_(std::move(testConfig)) , reporters_(testConfig_.verbose, testConfig_.output) @@ -62,7 +63,7 @@ TestEngine::TestEngine(asio::io_context& io_context, TestConfig testConfig) .controlledWeather = true, }; - mapEngine_ = std::make_unique(io_context, mapConfig); + mapEngine_ = std::make_unique(scheduler, mapConfig); worldEngine_->onInitialize(); mapEngine_->onInitialize(); diff --git a/src/test/test_engine.h b/src/test/test_engine.h index 7cce307d725..1a7f58dc246 100644 --- a/src/test/test_engine.h +++ b/src/test/test_engine.h @@ -73,7 +73,7 @@ struct HookContext class TestEngine final : public Engine { public: - TestEngine(asio::io_context& io_context, TestConfig testConfig); + TestEngine(Scheduler& scheduler, TestConfig testConfig); ~TestEngine() override; DISALLOW_COPY_AND_MOVE(TestEngine); @@ -87,6 +87,7 @@ class TestEngine final : public Engine auto runBeforeHooks(const HookContext& context, const std::string& testName) const -> std::optional; void runAfterHooks(const HookContext& context, const std::string& testName) const; + Scheduler& scheduler_; std::unique_ptr mapEngine_; std::unique_ptr worldEngine_; std::unique_ptr mockManager_; From 90a067cd93a113d47ce39f54e78894d70f4cf4ba Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Thu, 26 Feb 2026 13:16:34 +0000 Subject: [PATCH 07/10] Login: Handle sessions on worker thread --- src/login/handler.h | 51 ++++++++++++++++---------------------- src/login/view_session.cpp | 2 +- src/login/view_session.h | 8 +++++- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/src/login/handler.h b/src/login/handler.h index b0dc6b5603c..fbb1c379ee2 100644 --- a/src/login/handler.h +++ b/src/login/handler.h @@ -29,6 +29,7 @@ #include "data_session.h" #include "view_session.h" +#include "common/scheduler.h" #include "common/zmq_dealer_wrapper.h" template @@ -36,7 +37,8 @@ class handler { public: handler(Scheduler& scheduler, unsigned int port, ZMQDealerWrapper& zmqDealerWrapper) - : acceptor_(scheduler.ioContext(), asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) + : scheduler_(scheduler) + , acceptor_(scheduler_.ioContext(), asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) , sslContext_(asio::ssl::context::tls_server) , zmqDealerWrapper_(zmqDealerWrapper) { @@ -47,42 +49,33 @@ class handler sslContext_.use_rsa_private_key_file("login.key", asio::ssl::context::file_format::pem); sslContext_.use_certificate_chain_file("login.cert"); - do_accept(); + scheduler_.postToMainThread(accept_loop()); } private: - void do_accept() + auto accept_loop() -> Task { - acceptor_.async_accept( - [this](std::error_code ec, asio::ip::tcp::socket socket) + while (!scheduler_.closeRequested()) + { + auto [ec, socket] = co_await acceptor_.async_accept(asio::as_tuple(asio::use_awaitable)); + + if (!ec) { - if (!ec) - { - if constexpr (std::is_same_v) - { - const auto auth_handler = std::make_shared(asio::ssl::stream(std::move(socket), sslContext_), zmqDealerWrapper_); - auth_handler->start(); - } - else if constexpr (std::is_same_v) + const auto sessionHandler = std::make_shared(asio::ssl::stream(std::move(socket), sslContext_), zmqDealerWrapper_); + scheduler_.postToWorkerThread( + [sessionHandler] { - const auto view_handler = std::make_shared(asio::ssl::stream(std::move(socket), sslContext_)); - view_handler->start(); - } - else if constexpr (std::is_same_v) - { - const auto data_handler = std::make_shared(asio::ssl::stream(std::move(socket), sslContext_), zmqDealerWrapper_); - data_handler->start(); - } - } - else - { - ShowError(ec.message()); - } - - do_accept(); - }); + sessionHandler->start(); + }); + } + else + { + ShowError(ec.message()); + } + } } + Scheduler& scheduler_; asio::ip::tcp::acceptor acceptor_; asio::ssl::context sslContext_; diff --git a/src/login/view_session.cpp b/src/login/view_session.cpp index 7fddde4632f..b26eadcbad8 100644 --- a/src/login/view_session.cpp +++ b/src/login/view_session.cpp @@ -41,7 +41,7 @@ void view_session::read_func() session_t& session = loginHelpers::get_authenticated_session(ipAddress, sessionHash); if (!session.view_session) { - session.view_session = std::make_shared(std::forward>(socket_)); + session.view_session = std::make_shared(std::forward>(socket_), zmqDealerWrapper_); } session.view_session->sessionHash = sessionHash; diff --git a/src/login/view_session.h b/src/login/view_session.h index a71247e7ffb..657b24ea8b0 100644 --- a/src/login/view_session.h +++ b/src/login/view_session.h @@ -27,6 +27,8 @@ #include "handler_session.h" #include "login_helpers.h" +#include "common/zmq_dealer_wrapper.h" + // Main menu (Lobby), port 54001 // A comment on the packets below, defined as macros. // byte 0 - packet size @@ -38,8 +40,9 @@ class view_session : public handler_session { public: - view_session(asio::ssl::stream socket) + view_session(asio::ssl::stream socket, ZMQDealerWrapper& zmqDealerWrapper) : handler_session(std::move(socket)) + , zmqDealerWrapper_(zmqDealerWrapper) { DebugSockets("view_session from IP %s", ipAddress); } @@ -53,4 +56,7 @@ class view_session : public handler_session } void handle_error(std::error_code ec, std::shared_ptr self) override; + +private: + ZMQDealerWrapper& zmqDealerWrapper_; }; From f44129419df0bd0f4fcf5d3012675bb7147da2ec Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Thu, 26 Feb 2026 14:44:47 +0000 Subject: [PATCH 08/10] Core: Enforce Scheduler fire-and-forget --- src/common/scheduler.h | 40 +++++++++++++++--------------------- src/search/search_engine.cpp | 6 +----- 2 files changed, 17 insertions(+), 29 deletions(-) diff --git a/src/common/scheduler.h b/src/common/scheduler.h index 24f3079cb39..48076a7658c 100644 --- a/src/common/scheduler.h +++ b/src/common/scheduler.h @@ -84,6 +84,12 @@ struct AwaitableResult> template concept IsInvocable = std::invocable>; +template +concept IsInvocableReturnsVoid = IsInvocable && std::is_void_v>>; + +template +concept IsAwaitableReturnsVoid = IsAwaitable && std::is_void_v>::type>; + } // namespace detail template @@ -262,11 +268,11 @@ class Scheduler final // Queue work lazily on the worker thread pool. It won't start executing until you co_await // the returned task. template - [[nodiscard]] auto onWorkerThread(F&& func) -> Task>::type> + [[nodiscard]] auto onWorkerThread(F&& func) -> Task>> { return asio::co_spawn( workerContext_.get_executor(), - [fn = std::forward(func)]() mutable -> Task + [fn = std::forward(func)]() mutable -> Task>> { if constexpr (std::is_void_v>>) { @@ -283,7 +289,7 @@ class Scheduler final // postToMainThread // Queue work eagerly on the main thread. It will start executing immediately. - template + template void postToMainThread(T&& task) { asio::co_spawn(mainContext_.get_executor(), std::forward(task), asio::detached); @@ -291,29 +297,22 @@ class Scheduler final // postToMainThread // Queue work eagerly on the main thread. It will start executing immediately. - template + template void postToMainThread(F&& func) { asio::co_spawn( mainContext_.get_executor(), [fn = std::forward(func)]() mutable -> Task { - if constexpr (std::is_void_v>>) - { - fn(); - co_return; - } - else - { - co_return fn(); - } + fn(); + co_return; }, asio::detached); } // postToWorkerThread // Queue work eagerly on the worker thread pool. It will start executing immediately. - template + template void postToWorkerThread(T&& task) { asio::co_spawn(workerContext_.get_executor(), std::forward(task), asio::detached); @@ -321,22 +320,15 @@ class Scheduler final // postToWorkerThread // Queue work eagerly on the worker thread pool. It will start executing immediately. - template + template void postToWorkerThread(F&& func) { asio::co_spawn( workerContext_.get_executor(), [fn = std::forward(func)]() mutable -> Task { - if constexpr (std::is_void_v>>) - { - fn(); - co_return; - } - else - { - co_return fn(); - } + fn(); + co_return; }, asio::detached); } diff --git a/src/search/search_engine.cpp b/src/search/search_engine.cpp index 1499de979db..1f9fb89b8fe 100644 --- a/src/search/search_engine.cpp +++ b/src/search/search_engine.cpp @@ -40,11 +40,7 @@ SearchEngine::SearchEngine(Scheduler& scheduler) if (settings::get("search.EXPIRE_AUCTIONS")) { - scheduler_.postToMainThread( - [this]() -> Task - { - co_await periodicCleanup(); - }); + scheduler_.postToMainThread(periodicCleanup()); } } From 74868b81fbb07ef8a287bc6283a6a3ee64c51d14 Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Thu, 26 Feb 2026 15:16:07 +0000 Subject: [PATCH 09/10] Ext: Bump ASIO version to 1.36.0 --- ext/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ext/CMakeLists.txt b/ext/CMakeLists.txt index 65aea6cfd28..ddacceefd17 100644 --- a/ext/CMakeLists.txt +++ b/ext/CMakeLists.txt @@ -153,9 +153,9 @@ endif() CPMAddPackage( NAME asio - VERSION 1.32.0 + VERSION 1.36.0 GITHUB_REPOSITORY chriskohlhoff/asio - GIT_TAG asio-1-32-0 # asio uses non-standard version tag, we must specify GIT_TAG + GIT_TAG asio-1-36-0 # asio uses non-standard version tag, we must specify GIT_TAG ) #defines asio # ASIO doesn't use CMake, we have to configure it manually. Extra notes for using on Windows: From 2bc710507d9a14ec6f35a5b5bfd522bdc0409fe7 Mon Sep 17 00:00:00 2001 From: Zach Toogood Date: Thu, 26 Feb 2026 15:52:25 +0000 Subject: [PATCH 10/10] Ext: Bump to Tracy 0.13.1 Also improves reporting for flamegraph --- ext/CMakeLists.txt | 2 +- src/common/zmq_dealer_wrapper.h | 3 +-- src/common/zmq_router_wrapper.h | 4 +--- src/map/map_engine.cpp | 1 - 4 files changed, 3 insertions(+), 7 deletions(-) diff --git a/ext/CMakeLists.txt b/ext/CMakeLists.txt index ddacceefd17..ca56ce8741a 100644 --- a/ext/CMakeLists.txt +++ b/ext/CMakeLists.txt @@ -9,7 +9,7 @@ add_subdirectory(sol) # CPM Modules if(TRACY_ENABLE) # Tracy version tag, without the leading 'v' - set(TRACY_VERSION 0.12.2) + set(TRACY_VERSION 0.13.1) # Download client library CPMAddPackage( diff --git a/src/common/zmq_dealer_wrapper.h b/src/common/zmq_dealer_wrapper.h index b7e4ec2047f..4964b39ef6b 100644 --- a/src/common/zmq_dealer_wrapper.h +++ b/src/common/zmq_dealer_wrapper.h @@ -74,8 +74,6 @@ class ZMQDealerWrapper final { while (!requestExit_) { - TracyZoneScoped; - zmq::message_t msg; try { @@ -121,6 +119,7 @@ class ZMQDealerWrapper final , thread_( [this, endpoint, routingId]() { + TracySetThreadName("ZMQ Dealer"); ZMQWorker worker(requestExit_, incomingQueue_, outgoingQueue_, endpoint, routingId); }) { diff --git a/src/common/zmq_router_wrapper.h b/src/common/zmq_router_wrapper.h index 1add7849b76..8c383ee020e 100644 --- a/src/common/zmq_router_wrapper.h +++ b/src/common/zmq_router_wrapper.h @@ -78,8 +78,6 @@ class ZMQRouterWrapper final { while (!requestExit_) { - TracyZoneScoped; - // Since we are a zmq::socket_type::router, we expect a multipart message: // [routing id (IPP), message] std::array msgs; @@ -140,7 +138,7 @@ class ZMQRouterWrapper final , thread_( [this, endpoint]() { - TracySetThreadName("Message Server (ZMQ)"); + TracySetThreadName("ZMQ Router"); ZMQWorker worker(requestExit_, incomingQueue_, outgoingQueue_, endpoint); }) { diff --git a/src/map/map_engine.cpp b/src/map/map_engine.cpp index 18f2d62d11b..5f21707fba9 100644 --- a/src/map/map_engine.cpp +++ b/src/map/map_engine.cpp @@ -197,7 +197,6 @@ void MapEngine::gameLoop() if (tickDiffTime > 0ms) { - TracyZoneNamed(_sleep, "MapEngine Sleep"); std::this_thread::sleep_for(tickDiffTime); } else if (tickDiffTime < -kMainLoopBacklogThreshold)