From 6165cfb66e69f6afe5c988ee09b082dfdc4c4c42 Mon Sep 17 00:00:00 2001 From: Andrzej Pijanowski Date: Fri, 6 Mar 2026 13:50:52 +0100 Subject: [PATCH 1/6] Implement Binance exchange adapter and Prometheus metrics integration - Added BinanceAdapter class for handling WebSocket connections and snapshot logic. - Introduced IExchangeAdapter interface for exchange feed adapters. - Created utility functions for symbol extraction and Prometheus metrics formatting. - Updated main application logic to utilize the new BinanceAdapter. - Added unit tests for streamToSymbol function and Prometheus output generation. --- CMakeLists.txt | 2 + grafana/dashboards/lob.json | 31 ++++++-- src/binance_utils.hpp | 17 +++++ src/feed_handler.cpp | 38 +++++++--- src/feed_handler.hpp | 31 +++++--- src/i_exchange_adapter.hpp | 38 ++++++++++ src/main.cpp | 22 ++---- src/metrics_server.hpp | 91 ++-------------------- src/prometheus_format.hpp | 102 +++++++++++++++++++++++++ tests/test_binance_utils.cpp | 24 ++++++ tests/test_prometheus_format.cpp | 126 +++++++++++++++++++++++++++++++ 11 files changed, 390 insertions(+), 132 deletions(-) create mode 100644 src/binance_utils.hpp create mode 100644 src/i_exchange_adapter.hpp create mode 100644 src/prometheus_format.hpp create mode 100644 tests/test_binance_utils.cpp create mode 100644 tests/test_prometheus_format.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 72f6cb8..e0cac0d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,6 +46,8 @@ if(BUILD_TESTS) tests/test_order_book.cpp tests/test_decimal.cpp tests/test_safe_arithmetic.cpp + tests/test_binance_utils.cpp + tests/test_prometheus_format.cpp src/order_book.cpp ) diff --git a/grafana/dashboards/lob.json b/grafana/dashboards/lob.json index 8bf0611..2e4455c 100644 --- a/grafana/dashboards/lob.json +++ b/grafana/dashboards/lob.json @@ -14,12 +14,27 @@ "annotations": { "list": [] }, "templating": { "list": [ + { + "name": "exchange", + "label": "Exchange", + "type": "query", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "query": "label_values(lob_messages_total, exchange)", + "refresh": 1, + "multi": false, + "includeAll": true, + "allValue": ".*", + "sort": 1, + "hide": 0, + "options": [], + "current": {} + }, { "name": "symbol", "label": "Symbol", "type": "query", "datasource": { "type": "prometheus", "uid": "prometheus" }, - "query": "label_values(lob_messages_total, symbol)", + "query": "label_values(lob_messages_total{exchange=~\"$exchange\"}, symbol)", "refresh": 1, "multi": false, "includeAll": false, @@ -66,19 +81,19 @@ { "id": 2, "title": "Event Lag", - "description": "Worst-case lag across all symbols, with selected symbol overlaid for comparison", + "description": "Worst-case lag per exchange, with selected symbol overlaid for comparison", "type": "timeseries", "gridPos": { "x": 12, "y": 0, "w": 12, "h": 8 }, "datasource": { "type": "prometheus", "uid": "prometheus" }, "targets": [ { - "expr": "max(lob_event_lag_milliseconds)", - "legendFormat": "worst lag ms (all)", + "expr": "max by(exchange) (lob_event_lag_milliseconds)", + "legendFormat": "{{exchange}} worst lag ms", "refId": "A" }, { - "expr": "lob_event_lag_milliseconds{symbol=\"$symbol\"}", - "legendFormat": "{{symbol}} lag ms", + "expr": "lob_event_lag_milliseconds{exchange=~\"$exchange\",symbol=\"$symbol\"}", + "legendFormat": "{{exchange}} {{symbol}} lag ms", "refId": "B" } ], @@ -255,8 +270,8 @@ "refId": "A" }, { - "expr": "max(lob_event_lag_milliseconds)", - "legendFormat": "Worst Lag (ms)", + "expr": "max by(exchange) (lob_event_lag_milliseconds)", + "legendFormat": "{{exchange}} Lag (ms)", "refId": "B" }, { diff --git a/src/binance_utils.hpp b/src/binance_utils.hpp new file mode 100644 index 0000000..41b8379 --- /dev/null +++ b/src/binance_utils.hpp @@ -0,0 +1,17 @@ +#pragma once +#include +#include + +// Extracts the symbol from a Binance combined-stream name and uppercases it. +// +// Binance stream names look like: +// "btcusdt@depth" (basic depth stream) +// "ethusdt@depth@100ms" (depth stream with interval suffix) +// +// Returns everything before the first '@', uppercased, which is the key +// used in the order book and metrics maps. +inline std::string streamToSymbol(const std::string& stream) { + std::string sym = stream.substr(0, stream.find('@')); + std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper); + return sym; +} diff --git a/src/feed_handler.cpp b/src/feed_handler.cpp index 288e0dc..2da0d8c 100644 --- a/src/feed_handler.cpp +++ b/src/feed_handler.cpp @@ -13,13 +13,18 @@ namespace { constexpr std::size_t kMaxBufferedMsgsPerSymbol = 5000; } -std::string FeedHandler::streamToSymbol(const std::string& stream) { - std::string sym = stream.substr(0, stream.find('@')); - std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper); - return sym; +BinanceAdapter::BinanceAdapter(int updateIntervalMs) : updateIntervalMs(updateIntervalMs) {} + +BinanceAdapter::~BinanceAdapter() { stop(); } + +void BinanceAdapter::stop() { + resyncThread.request_stop(); + resyncCv.notify_one(); + resyncThread = {}; // join + destroy + webSocket.stop(); } -bool FeedHandler::fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook) { +bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook) { ix::HttpClient httpClient; auto args = std::make_shared(); args->connectTimeout = 5; @@ -71,7 +76,7 @@ bool FeedHandler::fetchAndApplySnapshot(const std::string& symbol, OrderBook& or } } -void FeedHandler::handleWsMessage(const ix::WebSocketMessagePtr& msg) { +void BinanceAdapter::handleWsMessage(const ix::WebSocketMessagePtr& msg) { if (msg->type == ix::WebSocketMessageType::Open) { printf("connected\n"); std::lock_guard lock(wsReadyMutex); @@ -156,7 +161,7 @@ void FeedHandler::handleWsMessage(const ix::WebSocketMessagePtr& msg) { } } -void FeedHandler::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken) { +void BinanceAdapter::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken) { std::stop_callback wake(stoken, [this] { resyncCv.notify_one(); }); while (!stoken.stop_requested()) { std::string symbol; @@ -194,11 +199,10 @@ void FeedHandler::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken } } -bool FeedHandler::initialize( - const std::vector& symbols, - std::unordered_map>& booksRef, ix::WebSocket& webSocket, - std::unordered_map>& metricsMapRef, int snapshotDepthArg, - int maxSnapshotRetries) { +bool BinanceAdapter::start(const std::vector& symbols, + std::unordered_map>& booksRef, + std::unordered_map>& metricsMapRef, + int snapshotDepthArg, int maxSnapshotRetries) { books = &booksRef; metricsMap = &metricsMapRef; snapshotDepth = snapshotDepthArg; @@ -215,6 +219,16 @@ bool FeedHandler::initialize( wsConnected = false; } + // Build the combo stream URL from the symbol list. + std::string urlStreams; + for (const auto& sym : symbols) { + std::string lower = sym; + std::transform(lower.begin(), lower.end(), lower.begin(), ::tolower); + if (!urlStreams.empty()) urlStreams += "/"; + urlStreams += lower + "@depth@" + std::to_string(updateIntervalMs) + "ms"; + } + webSocket.setUrl("wss://stream.binance.com:9443/stream?streams=" + urlStreams); + webSocket.setOnMessageCallback( [this](const ix::WebSocketMessagePtr& msg) { handleWsMessage(msg); }); diff --git a/src/feed_handler.hpp b/src/feed_handler.hpp index 0be865f..4b7a297 100644 --- a/src/feed_handler.hpp +++ b/src/feed_handler.hpp @@ -8,25 +8,37 @@ #include #include #include +#include #include #include #include +#include "binance_utils.hpp" +#include "i_exchange_adapter.hpp" #include "metrics.hpp" #include "order_book.hpp" -class FeedHandler { +class BinanceAdapter : public IExchangeAdapter { public: - // Connects the WebSocket, starts the resync worker, and fetches initial snapshots for all - // symbols. Snapshot requests are staggered to stay within Binance REST rate limits. + // updateIntervalMs must be 100 or 1000, matching the Binance stream suffix (@depth@100ms). + explicit BinanceAdapter(int updateIntervalMs = 100); + ~BinanceAdapter() override; + + std::string_view exchangeName() const override { return "binance"; } + + // Builds the WebSocket URL from symbols, connects, fetches snapshots, and starts streaming. // Returns false if the WebSocket doesn't connect or any snapshot can't be fetched. - bool initialize(const std::vector& symbols, - std::unordered_map>& books, - ix::WebSocket& webSocket, - std::unordered_map>& metricsMap, - int snapshotDepth = 1000, int maxSnapshotRetries = 5); + bool start(const std::vector& symbols, + std::unordered_map>& books, + std::unordered_map>& metricsMap, + int snapshotDepth = 1000, int maxSnapshotRetries = 5) override; + + void stop() override; private: + ix::WebSocket webSocket; + int updateIntervalMs; + std::unordered_map>* books = nullptr; std::unordered_map>* metricsMap = nullptr; @@ -43,9 +55,6 @@ class FeedHandler { std::jthread resyncThread; - // "btcusdt@depth" → "BTCUSDT" - static std::string streamToSymbol(const std::string& stream); - int snapshotDepth = 1000; // Fetches a depth snapshot from the Binance REST API and applies it to the order book. diff --git a/src/i_exchange_adapter.hpp b/src/i_exchange_adapter.hpp new file mode 100644 index 0000000..c77c3d6 --- /dev/null +++ b/src/i_exchange_adapter.hpp @@ -0,0 +1,38 @@ +#pragma once +#include +#include +#include +#include +#include + +#include "metrics.hpp" +#include "order_book.hpp" + +// Interface for exchange feed adapters. +// Each adapter owns its WebSocket connection, snapshot logic, and rate-limit handling. +class IExchangeAdapter { +public: + virtual ~IExchangeAdapter() = default; + + // Returns the exchange name (e.g. "binance", "kraken"). + virtual std::string_view exchangeName() const = 0; + + // Connects to the exchange, fetches initial snapshots for all symbols, + // and begins streaming live updates into books and metricsMap. + // Returns false if the connection or any snapshot fails. + virtual bool start(const std::vector& symbols, + std::unordered_map>& books, + std::unordered_map>& metricsMap, + int snapshotDepth = 1000, int maxSnapshotRetries = 5) = 0; + + // Stops the WebSocket connection and all background threads. + virtual void stop() = 0; + + IExchangeAdapter(const IExchangeAdapter&) = delete; + IExchangeAdapter& operator=(const IExchangeAdapter&) = delete; + IExchangeAdapter(IExchangeAdapter&&) = delete; + IExchangeAdapter& operator=(IExchangeAdapter&&) = delete; + +protected: + IExchangeAdapter() = default; +}; diff --git a/src/main.cpp b/src/main.cpp index 3fd021f..2403fc4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,5 +1,3 @@ -#include - #include #include #include @@ -12,6 +10,7 @@ #include #include "feed_handler.hpp" +#include "i_exchange_adapter.hpp" #include "metrics.hpp" #include "metrics_server.hpp" #include "order_book.hpp" @@ -81,9 +80,8 @@ int main(int argc, char* argv[]) { return -1; } - // Validate each entry and canonicalise: UPPERCASE for map keys, lowercase for the URL. + // Validate each entry and canonicalise to UPPERCASE for map keys. std::vector symbols; - std::string urlStreams; for (const auto& entry : config["symbols"]) { if (!entry.is_string()) { fprintf(stderr, "config error: every entry in 'symbols' must be a string\n"); @@ -92,15 +90,7 @@ int main(int argc, char* argv[]) { std::string sym = entry.get(); std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper); symbols.push_back(sym); - - std::string lower = sym; - std::transform(lower.begin(), lower.end(), lower.begin(), ::tolower); - if (!urlStreams.empty()) { - urlStreams += "/"; - } - urlStreams += lower + "@depth@" + std::to_string(updateIntervalMs) + "ms"; } - const std::string url = "wss://stream.binance.com:9443/stream?streams=" + urlStreams; std::string primarySymbol = config["primary_symbol"].get(); std::transform(primarySymbol.begin(), primarySymbol.end(), primarySymbol.begin(), ::toupper); @@ -118,10 +108,9 @@ int main(int argc, char* argv[]) { metricsMap[sym] = std::make_unique(); } - ix::WebSocket webSocket; - webSocket.setUrl(url); + BinanceAdapter adapter(updateIntervalMs); - MetricsServer metricsServer(metricsMap, books); + MetricsServer metricsServer(adapter.exchangeName(), metricsMap, books); if (!metricsServer.start()) { fprintf(stderr, "couldn't bind metrics server on port 9090\n"); return -1; @@ -133,8 +122,7 @@ int main(int argc, char* argv[]) { } printf("\n"); - FeedHandler feedHandler; - if (!feedHandler.initialize(symbols, books, webSocket, metricsMap, snapshotDepth)) { + if (!adapter.start(symbols, books, metricsMap, snapshotDepth)) { fprintf(stderr, "failed to start feed handler\n"); return -1; } diff --git a/src/metrics_server.hpp b/src/metrics_server.hpp index 5653097..e81dd47 100644 --- a/src/metrics_server.hpp +++ b/src/metrics_server.hpp @@ -5,18 +5,21 @@ #include #include #include +#include #include #include #include "metrics.hpp" #include "order_book.hpp" +#include "prometheus_format.hpp" class MetricsServer { public: - MetricsServer(std::unordered_map>& metricsMap, + MetricsServer(std::string_view exchange, + std::unordered_map>& metricsMap, std::unordered_map>& books, int port = 9090) - : metricsMap(metricsMap), books(books), port(port) {} + : exchange(exchange), metricsMap(metricsMap), books(books), port(port) {} ~MetricsServer() { svr.stop(); @@ -63,90 +66,10 @@ class MetricsServer { private: std::string buildPrometheusMetrics() { - std::ostringstream ss; - - // Helper: emit HELP+TYPE header once, then one line per symbol. - auto writeCounterHeader = [&](const char* name, const char* help) { - ss << "# HELP " << name << " " << help << "\n"; - ss << "# TYPE " << name << " counter\n"; - }; - auto writeGaugeHeader = [&](const char* name, const char* help) { - ss << "# HELP " << name << " " << help << "\n"; - ss << "# TYPE " << name << " gauge\n"; - }; - auto writeLine = [&](const char* name, const std::string& symbol, double value) { - ss << name << "{symbol=\"" << symbol << "\"} " << value << "\n"; - }; - - writeCounterHeader("lob_messages_total", "Total messages received from Binance"); - for (const auto& [sym, m] : metricsMap) { - writeLine("lob_messages_total", sym, static_cast(m->msgCount.load())); - } - - writeGaugeHeader("lob_event_lag_milliseconds", "Last event lag in milliseconds"); - for (const auto& [sym, m] : metricsMap) { - writeLine("lob_event_lag_milliseconds", sym, - static_cast(m->lastEventLagMs.load())); - } - - writeGaugeHeader("lob_processing_time_microseconds", - "Last update processing time in microseconds"); - for (const auto& [sym, m] : metricsMap) { - writeLine("lob_processing_time_microseconds", sym, - static_cast(m->lastProcessingUs.load())); - } - - writeGaugeHeader("lob_max_processing_time_microseconds", - "Maximum observed processing time in microseconds"); - for (const auto& [sym, m] : metricsMap) { - writeLine("lob_max_processing_time_microseconds", sym, - static_cast(m->maxProcessingUs.load())); - } - - // Collect stats once per symbol to avoid locking each book multiple times. - std::vector> allStats; - allStats.reserve(books.size()); - for (const auto& [sym, book] : books) { - allStats.emplace_back(sym, book->getStats()); - } - - writeGaugeHeader("lob_orderbook_asks_count", - "Number of ask price levels in the order book"); - for (const auto& [sym, stats] : allStats) { - writeLine("lob_orderbook_asks_count", sym, static_cast(stats.asksCount)); - } - - writeGaugeHeader("lob_orderbook_bids_count", - "Number of bid price levels in the order book"); - for (const auto& [sym, stats] : allStats) { - writeLine("lob_orderbook_bids_count", sym, static_cast(stats.bidsCount)); - } - - writeGaugeHeader("lob_orderbook_best_ask_price", "Best ask price in USD"); - for (const auto& [sym, stats] : allStats) { - if (stats.bestAsk > 0.0) { - writeLine("lob_orderbook_best_ask_price", sym, stats.bestAsk); - } - } - - writeGaugeHeader("lob_orderbook_best_bid_price", "Best bid price in USD"); - for (const auto& [sym, stats] : allStats) { - if (stats.bestBid > 0.0) { - writeLine("lob_orderbook_best_bid_price", sym, stats.bestBid); - } - } - - writeGaugeHeader("lob_orderbook_spread_price", - "Spread between best ask and best bid in USD"); - for (const auto& [sym, stats] : allStats) { - if (stats.bestAsk > 0.0 && stats.bestBid > 0.0) { - writeLine("lob_orderbook_spread_price", sym, stats.bestAsk - stats.bestBid); - } - } - - return ss.str(); + return buildPrometheusOutput(exchange, metricsMap, books); } + std::string exchange; std::unordered_map>& metricsMap; std::unordered_map>& books; int port; diff --git a/src/prometheus_format.hpp b/src/prometheus_format.hpp new file mode 100644 index 0000000..a3cb606 --- /dev/null +++ b/src/prometheus_format.hpp @@ -0,0 +1,102 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include + +#include "metrics.hpp" +#include "order_book.hpp" + +// Builds a Prometheus text exposition for all symbols managed by one adapter. +// +// Each metric line gets both exchange="..." and symbol="..." labels so series +// from different adapters don't collide in a multi-exchange setup. +// +// Spread and best-price lines are skipped when the book is empty (no snapshot +// yet) to avoid zero-valued prices breaking min() aggregations. +inline std::string buildPrometheusOutput( + std::string_view exchange, + const std::unordered_map>& metricsMap, + const std::unordered_map>& books) { + std::ostringstream ss; + + auto writeCounterHeader = [&](const char* name, const char* help) { + ss << "# HELP " << name << " " << help << "\n"; + ss << "# TYPE " << name << " counter\n"; + }; + auto writeGaugeHeader = [&](const char* name, const char* help) { + ss << "# HELP " << name << " " << help << "\n"; + ss << "# TYPE " << name << " gauge\n"; + }; + auto writeLine = [&](const char* name, const std::string& symbol, double value) { + ss << name << "{exchange=\"" << exchange << "\",symbol=\"" << symbol << "\"} " << value + << "\n"; + }; + + writeCounterHeader("lob_messages_total", "Total messages received from exchange"); + for (const auto& [sym, m] : metricsMap) { + writeLine("lob_messages_total", sym, static_cast(m->msgCount.load())); + } + + writeGaugeHeader("lob_event_lag_milliseconds", "Last event lag in milliseconds"); + for (const auto& [sym, m] : metricsMap) { + writeLine("lob_event_lag_milliseconds", sym, static_cast(m->lastEventLagMs.load())); + } + + writeGaugeHeader("lob_processing_time_microseconds", + "Last update processing time in microseconds"); + for (const auto& [sym, m] : metricsMap) { + writeLine("lob_processing_time_microseconds", sym, + static_cast(m->lastProcessingUs.load())); + } + + writeGaugeHeader("lob_max_processing_time_microseconds", + "Maximum observed processing time in microseconds"); + for (const auto& [sym, m] : metricsMap) { + writeLine("lob_max_processing_time_microseconds", sym, + static_cast(m->maxProcessingUs.load())); + } + + // Collect stats once per symbol to avoid locking each book multiple times. + std::vector> allStats; + allStats.reserve(books.size()); + for (const auto& [sym, book] : books) { + allStats.emplace_back(sym, book->getStats()); + } + + writeGaugeHeader("lob_orderbook_asks_count", "Number of ask price levels in the order book"); + for (const auto& [sym, stats] : allStats) { + writeLine("lob_orderbook_asks_count", sym, static_cast(stats.asksCount)); + } + + writeGaugeHeader("lob_orderbook_bids_count", "Number of bid price levels in the order book"); + for (const auto& [sym, stats] : allStats) { + writeLine("lob_orderbook_bids_count", sym, static_cast(stats.bidsCount)); + } + + writeGaugeHeader("lob_orderbook_best_ask_price", "Best ask price in USD"); + for (const auto& [sym, stats] : allStats) { + if (stats.bestAsk > 0.0) { + writeLine("lob_orderbook_best_ask_price", sym, stats.bestAsk); + } + } + + writeGaugeHeader("lob_orderbook_best_bid_price", "Best bid price in USD"); + for (const auto& [sym, stats] : allStats) { + if (stats.bestBid > 0.0) { + writeLine("lob_orderbook_best_bid_price", sym, stats.bestBid); + } + } + + writeGaugeHeader("lob_orderbook_spread_price", "Spread between best ask and best bid in USD"); + for (const auto& [sym, stats] : allStats) { + if (stats.bestAsk > 0.0 && stats.bestBid > 0.0) { + writeLine("lob_orderbook_spread_price", sym, stats.bestAsk - stats.bestBid); + } + } + + return ss.str(); +} diff --git a/tests/test_binance_utils.cpp b/tests/test_binance_utils.cpp new file mode 100644 index 0000000..9f3912f --- /dev/null +++ b/tests/test_binance_utils.cpp @@ -0,0 +1,24 @@ +#include + +#include "binance_utils.hpp" + +// ─── streamToSymbol ─────────────────────────────────────────────────────────── +// Covers the stream name to uppercase symbol conversion used as map keys. + +TEST(StreamToSymbol, BasicDepthStreamExtractsSymbol) { + EXPECT_EQ(streamToSymbol("btcusdt@depth"), "BTCUSDT"); +} + +TEST(StreamToSymbol, DepthWithIntervalSuffixIsStripped) { + // The @100ms / @1000ms parameter must not bleed into the symbol. + EXPECT_EQ(streamToSymbol("ethusdt@depth@100ms"), "ETHUSDT"); + EXPECT_EQ(streamToSymbol("solusdt@depth@1000ms"), "SOLUSDT"); +} + +TEST(StreamToSymbol, OutputIsAlwaysUppercase) { + EXPECT_EQ(streamToSymbol("BtcUsdt@depth"), "BTCUSDT"); +} + +TEST(StreamToSymbol, AlreadyUppercaseInputIsIdempotent) { + EXPECT_EQ(streamToSymbol("BNBUSDT@depth"), "BNBUSDT"); +} diff --git a/tests/test_prometheus_format.cpp b/tests/test_prometheus_format.cpp new file mode 100644 index 0000000..dda789c --- /dev/null +++ b/tests/test_prometheus_format.cpp @@ -0,0 +1,126 @@ +#include + +#include +#include +#include +#include + +#include "prometheus_format.hpp" + +namespace { + +// Builds a minimal JSON snapshot suitable for OrderBook::applySnapshot. +nlohmann::json makeSnap(long long lastUpdateId, + const std::vector>& asks, + const std::vector>& bids) { + nlohmann::json snap; + snap["lastUpdateId"] = lastUpdateId; + snap["asks"] = nlohmann::json::array(); + for (const auto& [p, q] : asks) snap["asks"].push_back({p, q}); + snap["bids"] = nlohmann::json::array(); + for (const auto& [p, q] : bids) snap["bids"].push_back({p, q}); + return snap; +} + +// Collect every non-comment, non-empty line from a Prometheus exposition block. +std::vector dataLines(const std::string& output) { + std::vector result; + std::istringstream ss(output); + std::string line; + while (std::getline(ss, line)) { + if (!line.empty() && line[0] != '#') { + result.push_back(line); + } + } + return result; +} + +// Fixture that owns a single-symbol metrics + book pair for BTCUSDT on "testex". +struct PrometheusFormatTest : testing::Test { + std::unordered_map> metrics; + std::unordered_map> books; + + void SetUp() override { + metrics["BTCUSDT"] = std::make_unique(); + books["BTCUSDT"] = std::make_unique(); + } + + std::string build(std::string_view exchange = "testex") { + return buildPrometheusOutput(exchange, metrics, books); + } +}; + +} // namespace + +// ─── Label correctness ──────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, ExchangeLabelAppearsOnEveryDataLine) { + const auto lines = dataLines(build("binance")); + ASSERT_FALSE(lines.empty()); + for (const auto& line : lines) { + EXPECT_NE(line.find("exchange=\"binance\""), std::string::npos) << line; + } +} + +TEST_F(PrometheusFormatTest, SymbolLabelAppearsOnEveryDataLine) { + const auto lines = dataLines(build()); + ASSERT_FALSE(lines.empty()); + for (const auto& line : lines) { + EXPECT_NE(line.find("symbol=\"BTCUSDT\""), std::string::npos) << line; + } +} + +// ─── Counter value ──────────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, MessageCountIsReflectedInOutput) { + metrics["BTCUSDT"]->msgCount.store(42); + const auto output = build(); + // The exact label+value fragment must be present. + EXPECT_NE(output.find("lob_messages_total{exchange=\"testex\",symbol=\"BTCUSDT\"} 42"), + std::string::npos); +} + +// ─── Spread guard ───────────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, SpreadOmittedWhenBookIsEmpty) { + // No snapshot applied → both ask and bid are 0.0 → spread must not appear. + const auto output = build(); + EXPECT_EQ(output.find("lob_orderbook_spread_price"), std::string::npos); +} + +TEST_F(PrometheusFormatTest, SpreadOmittedWhenOnlyAsksPopulated) { + books["BTCUSDT"]->applySnapshot(makeSnap(1, {{"50001.00", "1.0"}}, {})); + const auto output = build(); + EXPECT_EQ(output.find("lob_orderbook_spread_price"), std::string::npos); +} + +TEST_F(PrometheusFormatTest, SpreadEmittedWhenBothSidesPopulated) { + books["BTCUSDT"]->applySnapshot(makeSnap(1, {{"50002.00", "1.0"}}, {{"50000.00", "1.0"}})); + const auto output = build(); + EXPECT_NE(output.find("lob_orderbook_spread_price"), std::string::npos); +} + +// ─── HELP / TYPE headers ────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, HelpAndTypeHeadersPresentForAllMetrics) { + const auto output = build(); + for (const auto* name : + {"lob_messages_total", "lob_event_lag_milliseconds", "lob_processing_time_microseconds", + "lob_orderbook_asks_count", "lob_orderbook_bids_count"}) { + EXPECT_NE(output.find(std::string("# HELP ") + name), std::string::npos) << name; + EXPECT_NE(output.find(std::string("# TYPE ") + name), std::string::npos) << name; + } +} + +// ─── Multi-symbol ───────────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, AllSymbolsAppearInOutput) { + // Add a second symbol. + metrics["ETHUSDT"] = std::make_unique(); + books["ETHUSDT"] = std::make_unique(); + metrics["ETHUSDT"]->msgCount.store(7); + + const auto output = build(); + EXPECT_NE(output.find("symbol=\"BTCUSDT\""), std::string::npos); + EXPECT_NE(output.find("symbol=\"ETHUSDT\""), std::string::npos); +} From 6a9965c550ccc70c8e9b3fbacf61ffdfcd2e15df Mon Sep 17 00:00:00 2001 From: Andrzej Pijanowski Date: Fri, 6 Mar 2026 13:53:21 +0100 Subject: [PATCH 2/6] Refactor Prometheus output to collect order book spread prices in a separate vector before writing --- src/prometheus_format.hpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/prometheus_format.hpp b/src/prometheus_format.hpp index a3cb606..f26d193 100644 --- a/src/prometheus_format.hpp +++ b/src/prometheus_format.hpp @@ -91,10 +91,19 @@ inline std::string buildPrometheusOutput( } } - writeGaugeHeader("lob_orderbook_spread_price", "Spread between best ask and best bid in USD"); - for (const auto& [sym, stats] : allStats) { - if (stats.bestAsk > 0.0 && stats.bestBid > 0.0) { - writeLine("lob_orderbook_spread_price", sym, stats.bestAsk - stats.bestBid); + { + std::vector> spreadLines; + for (const auto& [sym, stats] : allStats) { + if (stats.bestAsk > 0.0 && stats.bestBid > 0.0) { + spreadLines.emplace_back(sym, stats.bestAsk - stats.bestBid); + } + } + if (!spreadLines.empty()) { + writeGaugeHeader("lob_orderbook_spread_price", + "Spread between best ask and best bid in USD"); + for (const auto& [sym, spread] : spreadLines) { + writeLine("lob_orderbook_spread_price", sym, spread); + } } } From 3deac56d43499834dc8ad7c799192c5a829c6dcd Mon Sep 17 00:00:00 2001 From: Andrzej Pijanowski Date: Fri, 6 Mar 2026 14:18:07 +0100 Subject: [PATCH 3/6] Enhance BinanceAdapter and Prometheus integration with improved error handling and label escaping --- grafana/dashboards/lob.json | 22 +++++++++++----------- src/binance_utils.hpp | 4 +++- src/feed_handler.cpp | 24 ++++++++++++++++++++++-- src/main.cpp | 2 +- src/prometheus_format.hpp | 23 +++++++++++++++++++++-- 5 files changed, 58 insertions(+), 17 deletions(-) diff --git a/grafana/dashboards/lob.json b/grafana/dashboards/lob.json index 2e4455c..3e1b630 100644 --- a/grafana/dashboards/lob.json +++ b/grafana/dashboards/lob.json @@ -60,7 +60,7 @@ "refId": "A" }, { - "expr": "rate(lob_messages_total{symbol=\"$symbol\"}[1m])", + "expr": "rate(lob_messages_total{exchange=~\"$exchange\",symbol=\"$symbol\"}[1m])", "legendFormat": "{{symbol}} msg/s", "refId": "B" } @@ -137,7 +137,7 @@ "refId": "B" }, { - "expr": "lob_processing_time_microseconds{symbol=\"$symbol\"}", + "expr": "lob_processing_time_microseconds{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "{{symbol}} µs", "refId": "C" } @@ -172,12 +172,12 @@ "datasource": { "type": "prometheus", "uid": "prometheus" }, "targets": [ { - "expr": "lob_orderbook_best_ask_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_best_ask_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Best Ask", "refId": "A" }, { - "expr": "lob_orderbook_best_bid_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_best_bid_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Best Bid", "refId": "B" } @@ -205,7 +205,7 @@ "datasource": { "type": "prometheus", "uid": "prometheus" }, "targets": [ { - "expr": "lob_orderbook_spread_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_spread_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "spread USD", "refId": "A" } @@ -233,12 +233,12 @@ "datasource": { "type": "prometheus", "uid": "prometheus" }, "targets": [ { - "expr": "lob_orderbook_asks_count{symbol=\"$symbol\"}", + "expr": "lob_orderbook_asks_count{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Asks", "refId": "A" }, { - "expr": "lob_orderbook_bids_count{symbol=\"$symbol\"}", + "expr": "lob_orderbook_bids_count{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Bids", "refId": "B" } @@ -271,7 +271,7 @@ }, { "expr": "max by(exchange) (lob_event_lag_milliseconds)", - "legendFormat": "{{exchange}} Lag (ms)", + "legendFormat": "Event Lag (ms)", "refId": "B" }, { @@ -280,17 +280,17 @@ "refId": "C" }, { - "expr": "lob_orderbook_best_ask_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_best_ask_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Best Ask", "refId": "D" }, { - "expr": "lob_orderbook_best_bid_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_best_bid_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Best Bid", "refId": "E" }, { - "expr": "lob_orderbook_spread_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_spread_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Spread", "refId": "F" } diff --git a/src/binance_utils.hpp b/src/binance_utils.hpp index 41b8379..21c0266 100644 --- a/src/binance_utils.hpp +++ b/src/binance_utils.hpp @@ -1,5 +1,6 @@ #pragma once #include +#include #include // Extracts the symbol from a Binance combined-stream name and uppercases it. @@ -12,6 +13,7 @@ // used in the order book and metrics maps. inline std::string streamToSymbol(const std::string& stream) { std::string sym = stream.substr(0, stream.find('@')); - std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper); + std::transform(sym.begin(), sym.end(), sym.begin(), + [](unsigned char c) { return ::toupper(c); }); return sym; } diff --git a/src/feed_handler.cpp b/src/feed_handler.cpp index 2da0d8c..5e965a0 100644 --- a/src/feed_handler.cpp +++ b/src/feed_handler.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -13,7 +14,12 @@ namespace { constexpr std::size_t kMaxBufferedMsgsPerSymbol = 5000; } -BinanceAdapter::BinanceAdapter(int updateIntervalMs) : updateIntervalMs(updateIntervalMs) {} +BinanceAdapter::BinanceAdapter(int updateIntervalMs) : updateIntervalMs(updateIntervalMs) { + if (updateIntervalMs != 100 && updateIntervalMs != 1000) { + throw std::invalid_argument("BinanceAdapter: updateIntervalMs must be 100 or 1000, got " + + std::to_string(updateIntervalMs)); + } +} BinanceAdapter::~BinanceAdapter() { stop(); } @@ -58,9 +64,12 @@ bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& try { auto snapshot = nlohmann::json::parse(response->body); - orderBook.applySnapshot(snapshot); + // Hold bufferMutex across applySnapshot + buffer replay so that + // handleWsMessage cannot observe isSnapshotApplied()==true and apply + // a live update while we are still draining the pre-snapshot buffer. std::lock_guard lock(bufferMutex); + orderBook.applySnapshot(snapshot); for (const auto& msg : symbolBuffers[symbol]) { if (!orderBook.applyUpdate(msg)) { fprintf(stderr, "[%s] buffered message out of order, resyncing\n", symbol.c_str()); @@ -203,6 +212,15 @@ bool BinanceAdapter::start(const std::vector& symbols, std::unordered_map>& booksRef, std::unordered_map>& metricsMapRef, int snapshotDepthArg, int maxSnapshotRetries) { + for (const auto& sym : symbols) { + if (booksRef.find(sym) == booksRef.end() || + metricsMapRef.find(sym) == metricsMapRef.end()) { + fprintf(stderr, "BinanceAdapter::start: symbol '%s' missing from books or metricsMap\n", + sym.c_str()); + return false; + } + } + books = &booksRef; metricsMap = &metricsMapRef; snapshotDepth = snapshotDepthArg; @@ -239,6 +257,8 @@ bool BinanceAdapter::start(const std::vector& symbols, wsReady.wait_for(lock, std::chrono::seconds(30), [this] { return wsConnected; }); if (!connected) { fprintf(stderr, "WebSocket didn't connect within 30s\n"); + lock.unlock(); + webSocket.stop(); return false; } } diff --git a/src/main.cpp b/src/main.cpp index 2403fc4..cc49c71 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -123,7 +123,7 @@ int main(int argc, char* argv[]) { printf("\n"); if (!adapter.start(symbols, books, metricsMap, snapshotDepth)) { - fprintf(stderr, "failed to start feed handler\n"); + fprintf(stderr, "failed to start exchange adapter\n"); return -1; } diff --git a/src/prometheus_format.hpp b/src/prometheus_format.hpp index f26d193..e6d59bf 100644 --- a/src/prometheus_format.hpp +++ b/src/prometheus_format.hpp @@ -10,6 +10,25 @@ #include "metrics.hpp" #include "order_book.hpp" +// Escapes a Prometheus label value per the text exposition format spec: +// backslash → \\, double-quote → \", newline → \n +inline std::string escapeLabelValue(std::string_view v) { + std::string out; + out.reserve(v.size()); + for (char c : v) { + if (c == '\\') { + out += "\\\\"; + } else if (c == '"') { + out += "\\\""; + } else if (c == '\n') { + out += "\\n"; + } else { + out += c; + } + } + return out; +} + // Builds a Prometheus text exposition for all symbols managed by one adapter. // // Each metric line gets both exchange="..." and symbol="..." labels so series @@ -32,8 +51,8 @@ inline std::string buildPrometheusOutput( ss << "# TYPE " << name << " gauge\n"; }; auto writeLine = [&](const char* name, const std::string& symbol, double value) { - ss << name << "{exchange=\"" << exchange << "\",symbol=\"" << symbol << "\"} " << value - << "\n"; + ss << name << "{exchange=\"" << escapeLabelValue(exchange) << "\",symbol=\"" + << escapeLabelValue(symbol) << "\"} " << value << "\n"; }; writeCounterHeader("lob_messages_total", "Total messages received from exchange"); From 7d104fd57b63149058dbadcad9a183cfc45f86a0 Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Fri, 6 Mar 2026 13:22:31 +0000 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`fea?= =?UTF-8?q?t/iexchange-adapter-interface`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @bountx. The following files were modified: * `src/binance_utils.hpp` * `src/feed_handler.cpp` * `src/feed_handler.hpp` * `src/i_exchange_adapter.hpp` * `src/main.cpp` * `src/metrics_server.hpp` * `src/prometheus_format.hpp` * `tests/test_prometheus_format.cpp` These files were kept as they were: * `tests/test_binance_utils.cpp` These file types are not supported: * `CMakeLists.txt` * `grafana/dashboards/lob.json` --- src/binance_utils.hpp | 11 ++++- src/feed_handler.cpp | 72 ++++++++++++++++++++++++++++++++ src/feed_handler.hpp | 7 +++- src/i_exchange_adapter.hpp | 46 +++++++++++++++++--- src/main.cpp | 21 ++++------ src/metrics_server.hpp | 24 ++++++++++- src/prometheus_format.hpp | 33 ++++++++++++++- tests/test_prometheus_format.cpp | 32 +++++++++++++- 8 files changed, 221 insertions(+), 25 deletions(-) diff --git a/src/binance_utils.hpp b/src/binance_utils.hpp index 21c0266..dbf6eab 100644 --- a/src/binance_utils.hpp +++ b/src/binance_utils.hpp @@ -10,7 +10,16 @@ // "ethusdt@depth@100ms" (depth stream with interval suffix) // // Returns everything before the first '@', uppercased, which is the key -// used in the order book and metrics maps. +/** + * Extracts the symbol portion from a Binance combined-stream name and returns it uppercased. + * + * The function takes the substring before the first '@' in `stream` and converts it to uppercase. + * If `stream` contains no '@', the entire input is uppercased. If `stream` is empty, an empty + * string is returned. + * + * @param stream Combined-stream name (e.g., "btcusdt@depth" or "ethusdt@aggTrade"). + * @return Uppercased symbol extracted from before the first '@' (or the entire uppercased input if no '@'). + */ inline std::string streamToSymbol(const std::string& stream) { std::string sym = stream.substr(0, stream.find('@')); std::transform(sym.begin(), sym.end(), sym.begin(), diff --git a/src/feed_handler.cpp b/src/feed_handler.cpp index 5e965a0..b67bb8b 100644 --- a/src/feed_handler.cpp +++ b/src/feed_handler.cpp @@ -14,6 +14,14 @@ namespace { constexpr std::size_t kMaxBufferedMsgsPerSymbol = 5000; } +/** + * @brief Construct a BinanceAdapter configured for the specified depth update interval. + * + * Initializes the adapter's update interval used when subscribing to Binance depth streams. + * + * @param updateIntervalMs Milliseconds between depth update snapshots; must be either 100 or 1000. + * @throws std::invalid_argument if `updateIntervalMs` is not 100 or 1000. + */ BinanceAdapter::BinanceAdapter(int updateIntervalMs) : updateIntervalMs(updateIntervalMs) { if (updateIntervalMs != 100 && updateIntervalMs != 1000) { throw std::invalid_argument("BinanceAdapter: updateIntervalMs must be 100 or 1000, got " + @@ -21,8 +29,20 @@ BinanceAdapter::BinanceAdapter(int updateIntervalMs) : updateIntervalMs(updateIn } } +/** + * @brief Cleans up the adapter, ensuring background workers and connections are stopped. + * + * Ensures the resync worker is requested to stop, the resync condition variable is + * notified, the resync thread is joined/reset, and the WebSocket client is stopped. + */ BinanceAdapter::~BinanceAdapter() { stop(); } +/** + * @brief Stops background processing and shuts down the WebSocket connection. + * + * Requests cancellation of the resync worker, wakes the resync condition variable, + * joins and destroys the resync thread, and stops the WebSocket client. + */ void BinanceAdapter::stop() { resyncThread.request_stop(); resyncCv.notify_one(); @@ -30,6 +50,17 @@ void BinanceAdapter::stop() { webSocket.stop(); } +/** + * @brief Fetches a depth snapshot for a symbol from Binance and applies it to the given order book. + * + * Attempts to download the REST order-book snapshot for `symbol`, apply it to `orderBook`, and then replay any + * pre-snapshot buffered websocket updates for that symbol atomically. If the HTTP response indicates rate limiting + * or an IP ban the function will wait according to the server's guidance (or a default) and return `false`. + * + * @param symbol Symbol identifier used in the Binance REST request (e.g., "BTCUSDT"). + * @param orderBook OrderBook instance to receive the applied snapshot and subsequent replayed updates. + * @return `true` if the snapshot was applied and all buffered updates were successfully replayed; `false` otherwise. + */ bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook) { ix::HttpClient httpClient; auto args = std::make_shared(); @@ -85,6 +116,17 @@ bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& } } +/** + * @brief Process an incoming WebSocket event from Binance and update state or metrics accordingly. + * + * Handles connection lifecycle events (Open/Close/Error) and parses combo-stream messages for order-book updates. + * While a symbol's initial snapshot is missing, messages are buffered. After snapshot application, updates are applied to + * the corresponding OrderBook; on update failures the symbol is scheduled for resynchronization and its buffer is cleared. + * Processing time and event lag are recorded in the associated Metrics. + * + * @param msg Pointer to the WebSocket message; for data messages it is expected to contain a Binance combo stream JSON + * object with fields `"stream"` and `"data"`. + */ void BinanceAdapter::handleWsMessage(const ix::WebSocketMessagePtr& msg) { if (msg->type == ix::WebSocketMessageType::Open) { printf("connected\n"); @@ -170,6 +212,19 @@ void BinanceAdapter::handleWsMessage(const ix::WebSocketMessagePtr& msg) { } } +/** + * @brief Background worker that processes symbols needing resynchronization. + * + * Waits for symbols enqueued for resync, then attempts up to `maxSnapshotRetries` + * to fetch and apply a fresh snapshot for each symbol. Retries use an increasing + * delay (1000ms * attempt) between attempts, and a 500ms pause is added after + * each symbol to avoid bursting REST requests. The worker wakes and exits when + * `stoken` requests stop. + * + * @param maxSnapshotRetries Maximum number of snapshot fetch attempts per symbol. + * @param stoken Stop token used to interrupt waiting, wake the worker, and + * request graceful shutdown. + */ void BinanceAdapter::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken) { std::stop_callback wake(stoken, [this] { resyncCv.notify_one(); }); while (!stoken.stop_requested()) { @@ -208,6 +263,23 @@ void BinanceAdapter::runResyncWorker(int maxSnapshotRetries, std::stop_token sto } } +/** + * @brief Initialize the adapter for a set of trading symbols, establish streaming, start + * the resync worker, and load initial order book snapshots. + * + * Validates that each symbol has an associated OrderBook and Metrics, configures internal + * references and snapshot depth, opens the Binance combined WebSocket stream for the symbols, + * starts the background resync thread, and concurrently fetches and applies initial REST + * snapshots for each symbol with retry/backoff and a staggered launch to avoid rate limits. + * + * @param symbols List of symbol names to subscribe and initialize (e.g., "BTCUSDT"). + * @param booksRef Map of symbol -> owned OrderBook instances; must contain every symbol. + * @param metricsMapRef Map of symbol -> owned Metrics instances; must contain every symbol. + * @param snapshotDepthArg Depth parameter used when requesting REST order book snapshots. + * @param maxSnapshotRetries Maximum number of attempts per-symbol to fetch and apply the initial snapshot. + * @return true if the adapter connected and all initial snapshots were successfully applied; `false` if + * validation failed, the WebSocket failed to connect, or any symbol failed to obtain a snapshot. + */ bool BinanceAdapter::start(const std::vector& symbols, std::unordered_map>& booksRef, std::unordered_map>& metricsMapRef, diff --git a/src/feed_handler.hpp b/src/feed_handler.hpp index 4b7a297..100b85f 100644 --- a/src/feed_handler.hpp +++ b/src/feed_handler.hpp @@ -24,7 +24,12 @@ class BinanceAdapter : public IExchangeAdapter { explicit BinanceAdapter(int updateIntervalMs = 100); ~BinanceAdapter() override; - std::string_view exchangeName() const override { return "binance"; } + /** + * @brief Identifies the exchange implementation. + * + * @return std::string_view Exchange identifier "binance". + */ +std::string_view exchangeName() const override { return "binance"; } // Builds the WebSocket URL from symbols, connects, fetches snapshots, and starts streaming. // Returns false if the WebSocket doesn't connect or any snapshot can't be fetched. diff --git a/src/i_exchange_adapter.hpp b/src/i_exchange_adapter.hpp index c77c3d6..15aaa53 100644 --- a/src/i_exchange_adapter.hpp +++ b/src/i_exchange_adapter.hpp @@ -12,7 +12,13 @@ // Each adapter owns its WebSocket connection, snapshot logic, and rate-limit handling. class IExchangeAdapter { public: - virtual ~IExchangeAdapter() = default; + /** + * @brief Ensures derived adapters are destroyed correctly when deleted via the interface. + * + * Guarantees that deleting an IExchangeAdapter pointer calls the concrete adapter's destructor + * so derived resources (connections, threads, etc.) are released properly. + */ +virtual ~IExchangeAdapter() = default; // Returns the exchange name (e.g. "binance", "kraken"). virtual std::string_view exchangeName() const = 0; @@ -28,11 +34,39 @@ class IExchangeAdapter { // Stops the WebSocket connection and all background threads. virtual void stop() = 0; - IExchangeAdapter(const IExchangeAdapter&) = delete; - IExchangeAdapter& operator=(const IExchangeAdapter&) = delete; - IExchangeAdapter(IExchangeAdapter&&) = delete; - IExchangeAdapter& operator=(IExchangeAdapter&&) = delete; + /** + * @brief Deleted copy constructor to prevent copying of adapter instances. + * + * Ensures that exchange adapter objects (and their derived types) cannot be copy-constructed, enforcing unique ownership of connection and thread resources. + */ +IExchangeAdapter(const IExchangeAdapter&) = delete; + /** + * @brief Deleted copy assignment operator to prevent copying of adapters. + * + * Adapter instances cannot be copy-assigned; ownership and resources must be managed by the concrete implementation. + */ +IExchangeAdapter& operator=(const IExchangeAdapter&) = delete; + /** + * @brief Deleted move constructor to prevent moving an adapter instance. + * + * Ensures concrete adapter implementations are not movable so ownership of + * connections, threads, and other resources remains with the original instance. + */ +IExchangeAdapter(IExchangeAdapter&&) = delete; + /** + * @brief Deleted move-assignment operator to prevent move-assignment of adapters. + * + * Prevents transferring ownership of adapter resources (connections, threads, etc.) + * via move-assignment; implementations must manage lifecycle without relying on move + * semantics. + */ +IExchangeAdapter& operator=(IExchangeAdapter&&) = delete; protected: - IExchangeAdapter() = default; + /** + * @brief Protected default constructor. + * + * Allows derived exchange adapter implementations to construct the base interface subobject. + */ +IExchangeAdapter() = default; }; diff --git a/src/main.cpp b/src/main.cpp index cc49c71..9b96239 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -15,18 +15,15 @@ #include "metrics_server.hpp" #include "order_book.hpp" -/* - Setup (do once): - 1. Open WebSocket to combo @depth stream — start buffering messages immediately - 2. For each symbol, fetch REST snapshot: GET /api/v3/depth?symbol=X&limit=5000 - 3. Apply buffered messages that arrived during snapshot fetch - 4. Process normally - - Every subsequent message: - - If u < your current book's lastUpdateId → stale, ignore it - - If U > your current book's lastUpdateId + 1 → you missed events, restart from scratch - - Otherwise → apply the update, set your lastUpdateId = u -*/ +/** + * @brief Program entry point that loads configuration, initializes order books, metrics, the exchange adapter and metrics server, then runs the monitoring loop. + * + * The first command-line argument (if present) is treated as the path to the JSON configuration file; otherwise "config.json" is used. The configuration controls update interval, snapshot depth, the list of symbols to watch, and the primary symbol. After successful initialization the function starts the metrics HTTP server and the exchange adapter, then enters an infinite loop that periodically prints per-symbol order book statistics and metrics. + * + * @param argc Number of command-line arguments. + * @param argv Array of command-line argument strings; argv[1] may provide the config file path. + * @return int `0` on normal termination (unreachable under normal operation), `-1` on configuration, initialization, or runtime startup errors. + */ int main(int argc, char* argv[]) { const char* configPath = (argc > 1) ? argv[1] : "config.json"; diff --git a/src/metrics_server.hpp b/src/metrics_server.hpp index e81dd47..8d5a086 100644 --- a/src/metrics_server.hpp +++ b/src/metrics_server.hpp @@ -15,12 +15,26 @@ class MetricsServer { public: - MetricsServer(std::string_view exchange, + /** + * @brief Construct a MetricsServer that serves Prometheus metrics and health for an exchange. + * + * @param exchange Identifier of the exchange whose metrics will be exposed. + * @param metricsMap Reference to the map of metric objects indexed by symbol; the server stores this reference and reads metrics from it. + * @param books Reference to the map of order books indexed by symbol; the server stores this reference and reads book state from it. + * @param port TCP port to bind the HTTP metrics/health server to (default 9090). + */ + MetricsServer(std::string_view exchange, std::unordered_map>& metricsMap, std::unordered_map>& books, int port = 9090) : exchange(exchange), metricsMap(metricsMap), books(books), port(port) {} + /** + * @brief Stop the internal HTTP server and join the worker thread. + * + * Ensures the HTTP server is stopped and, if the internal thread is joinable, + * waits for the thread to finish before destruction. + */ ~MetricsServer() { svr.stop(); if (thread.joinable()) { @@ -65,6 +79,14 @@ class MetricsServer { } private: + /** + * @brief Produce Prometheus-formatted metrics for the server's configured exchange. + * + * Builds the full Prometheus exposition text representing the current state of the stored + * metrics and order books for the exchange associated with this MetricsServer. + * + * @return std::string Prometheus exposition body ready to be served (plain text in Prometheus format). + */ std::string buildPrometheusMetrics() { return buildPrometheusOutput(exchange, metricsMap, books); } diff --git a/src/prometheus_format.hpp b/src/prometheus_format.hpp index e6d59bf..394736c 100644 --- a/src/prometheus_format.hpp +++ b/src/prometheus_format.hpp @@ -11,7 +11,17 @@ #include "order_book.hpp" // Escapes a Prometheus label value per the text exposition format spec: -// backslash → \\, double-quote → \", newline → \n +/** + * Escape a Prometheus label value by replacing special characters with backslash-escaped sequences. + * + * Replaces: + * - backslash (`\`) with `\\` + * - double-quote (`"`) with `\"` + * - newline with `\n` + * + * @param v Label value to escape. + * @return Escaped string where backslash is replaced with `\\`, double-quote with `\"`, and newline with `\n`. + */ inline std::string escapeLabelValue(std::string_view v) { std::string out; out.reserve(v.size()); @@ -35,7 +45,26 @@ inline std::string escapeLabelValue(std::string_view v) { // from different adapters don't collide in a multi-exchange setup. // // Spread and best-price lines are skipped when the book is empty (no snapshot -// yet) to avoid zero-valued prices breaking min() aggregations. +/** + * Builds a Prometheus text exposition containing adapter metrics and order book statistics, + * emitting metric lines labeled with `exchange` and `symbol`. + * + * Emits the following metrics when available: + * - lob_messages_total + * - lob_event_lag_milliseconds + * - lob_processing_time_microseconds + * - lob_max_processing_time_microseconds + * - lob_orderbook_asks_count + * - lob_orderbook_bids_count + * - lob_orderbook_best_ask_price (only for positive best-ask) + * - lob_orderbook_best_bid_price (only for positive best-bid) + * - lob_orderbook_spread_price (only when both best-ask and best-bid are positive) + * + * @param exchange Identifier for the adapter; used as the `exchange` label value. + * @param metricsMap Map from symbol to Metrics; used for message counts and processing/lag values. + * @param books Map from symbol to OrderBook; used to obtain per-symbol order book stats (asks/bids counts, best prices). + * @return std::string Prometheus exposition text including HELP/TYPE headers and metric lines. Best-price and spread metrics are omitted when price values are not positive. + */ inline std::string buildPrometheusOutput( std::string_view exchange, const std::unordered_map>& metricsMap, diff --git a/tests/test_prometheus_format.cpp b/tests/test_prometheus_format.cpp index dda789c..62f2575 100644 --- a/tests/test_prometheus_format.cpp +++ b/tests/test_prometheus_format.cpp @@ -9,7 +9,18 @@ namespace { -// Builds a minimal JSON snapshot suitable for OrderBook::applySnapshot. +/** + * @brief Build a minimal JSON snapshot for OrderBook::applySnapshot. + * + * Constructs a JSON object containing a numeric `lastUpdateId` and two arrays + * `"asks"` and `"bids"`, where each entry is a two-element array holding + * price and quantity as strings. + * + * @param lastUpdateId The snapshot's last update identifier. + * @param asks Vector of (price, quantity) pairs to populate the `"asks"` array. + * @param bids Vector of (price, quantity) pairs to populate the `"bids"` array. + * @return nlohmann::json JSON object with keys `lastUpdateId`, `asks`, and `bids`. + */ nlohmann::json makeSnap(long long lastUpdateId, const std::vector>& asks, const std::vector>& bids) { @@ -22,7 +33,12 @@ nlohmann::json makeSnap(long long lastUpdateId, return snap; } -// Collect every non-comment, non-empty line from a Prometheus exposition block. +/** + * Extracts all non-empty, non-comment data lines from a Prometheus exposition text block. + * + * @param output The full Prometheus exposition text (may contain comment lines starting with `#` and blank lines). + * @return std::vector A vector containing each data line (lines that are not empty and do not start with `#`), in their original order without trailing newlines. + */ std::vector dataLines(const std::string& output) { std::vector result; std::istringstream ss(output); @@ -40,11 +56,23 @@ struct PrometheusFormatTest : testing::Test { std::unordered_map> metrics; std::unordered_map> books; + /** + * @brief Prepare test fixture state by creating default entries for "BTCUSDT". + * + * Initializes the fixture's metrics and books maps with a fresh Metrics and + * OrderBook instance at key "BTCUSDT". + */ void SetUp() override { metrics["BTCUSDT"] = std::make_unique(); books["BTCUSDT"] = std::make_unique(); } + /** + * @brief Builds a Prometheus exposition-format text block for the fixture's metrics and order books. + * + * @param exchange Exchange label value to include on each metric line; defaults to "testex". + * @return std::string Prometheus-formatted text containing HELP/TYPE headers and metric data lines for the fixture's metrics and books. + */ std::string build(std::string_view exchange = "testex") { return buildPrometheusOutput(exchange, metrics, books); } From 4ab6f385a01114653da597510a624452ad442083 Mon Sep 17 00:00:00 2001 From: Andrzej Pijanowski Date: Fri, 6 Mar 2026 14:41:20 +0100 Subject: [PATCH 5/6] Enhance BinanceAdapter with interruptible snapshot fetching and graceful shutdown support --- src/feed_handler.cpp | 81 ++++++++++++++++++++++++++++---------------- src/feed_handler.hpp | 7 ++-- src/main.cpp | 7 ++++ 3 files changed, 63 insertions(+), 32 deletions(-) diff --git a/src/feed_handler.cpp b/src/feed_handler.cpp index b67bb8b..e7838a3 100644 --- a/src/feed_handler.cpp +++ b/src/feed_handler.cpp @@ -59,9 +59,11 @@ void BinanceAdapter::stop() { * * @param symbol Symbol identifier used in the Binance REST request (e.g., "BTCUSDT"). * @param orderBook OrderBook instance to receive the applied snapshot and subsequent replayed updates. + * @param stoken Stop token used to interrupt any waits caused by rate limiting or IP bans, allowing for graceful shutdown. * @return `true` if the snapshot was applied and all buffered updates were successfully replayed; `false` otherwise. */ -bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook) { +bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook, + std::stop_token stoken) { ix::HttpClient httpClient; auto args = std::make_shared(); args->connectTimeout = 5; @@ -79,12 +81,14 @@ bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& } } fprintf(stderr, "[%s] got 429, waiting %ds before retry\n", symbol.c_str(), retryAfterSec); - std::this_thread::sleep_for(std::chrono::seconds(retryAfterSec)); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, stoken, std::chrono::seconds(retryAfterSec), [] { return false; }); return false; } if (response->statusCode == 418) { fprintf(stderr, "[%s] IP banned (418), waiting 120s\n", symbol.c_str()); - std::this_thread::sleep_for(std::chrono::seconds(120)); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, stoken, std::chrono::seconds(120), [] { return false; }); return false; } if (response->statusCode != 200) { @@ -226,13 +230,12 @@ void BinanceAdapter::handleWsMessage(const ix::WebSocketMessagePtr& msg) { * request graceful shutdown. */ void BinanceAdapter::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken) { - std::stop_callback wake(stoken, [this] { resyncCv.notify_one(); }); - while (!stoken.stop_requested()) { + while (true) { std::string symbol; { std::unique_lock lock(resyncMutex); - resyncCv.wait(lock, [&] { return !resyncQueue.empty() || stoken.stop_requested(); }); - if (stoken.stop_requested()) { + // wait() with stop_token: returns false immediately if stop is requested. + if (!resyncCv.wait(lock, stoken, [&] { return !resyncQueue.empty(); })) { break; } symbol = resyncQueue.front(); @@ -240,13 +243,21 @@ void BinanceAdapter::runResyncWorker(int maxSnapshotRetries, std::stop_token sto } for (int attempt = 1; attempt <= maxSnapshotRetries; ++attempt) { printf("[%s] resyncing (attempt %d/%d)\n", symbol.c_str(), attempt, maxSnapshotRetries); - if (fetchAndApplySnapshot(symbol, *books->at(symbol))) { + if (fetchAndApplySnapshot(symbol, *books->at(symbol), stoken)) { break; } - if (attempt < maxSnapshotRetries && !stoken.stop_requested()) { + if (stoken.stop_requested()) { + return; + } + if (attempt < maxSnapshotRetries) { int delayMs = 1000 * attempt; printf("[%s] resync failed, retrying in %dms\n", symbol.c_str(), delayMs); - std::this_thread::sleep_for(std::chrono::milliseconds(delayMs)); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, stoken, std::chrono::milliseconds(delayMs), + [] { return false; }); + if (stoken.stop_requested()) { + return; + } } } if (!books->at(symbol)->isSnapshotApplied()) { @@ -257,8 +268,12 @@ void BinanceAdapter::runResyncWorker(int maxSnapshotRetries, std::stop_token sto // Small gap before picking up the next symbol. When several symbols fall out of sync // at once (common after a network hiccup), they all land in the queue together, and // processing them without any delay would fire REST requests back-to-back. - if (!stoken.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + { + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, stoken, std::chrono::milliseconds(500), [] { return false; }); + } + if (stoken.stop_requested()) { + return; } } } @@ -343,6 +358,7 @@ bool BinanceAdapter::start(const std::vector& symbols, // which is fine in isolation, but Docker restarts compound this fast enough to earn a 429 // or 418 ban. A 300ms gap between launches keeps only a handful of requests in-flight at a // time. + std::stop_token startStoken = resyncThread.get_stop_token(); std::vector> futures; futures.reserve(symbols.size()); for (size_t i = 0; i < symbols.size(); ++i) { @@ -350,24 +366,28 @@ bool BinanceAdapter::start(const std::vector& symbols, std::this_thread::sleep_for(std::chrono::milliseconds(300)); } const auto& symbol = symbols[i]; - futures.push_back(std::async(std::launch::async, [this, symbol, maxSnapshotRetries]() { - for (int attempt = 1; attempt <= maxSnapshotRetries; ++attempt) { - printf("[%s] fetching snapshot (attempt %d/%d)\n", symbol.c_str(), attempt, - maxSnapshotRetries); - if (fetchAndApplySnapshot(symbol, *books->at(symbol))) { - return true; - } - if (attempt == maxSnapshotRetries) { - break; + futures.push_back( + std::async(std::launch::async, [this, symbol, maxSnapshotRetries, startStoken]() { + for (int attempt = 1; attempt <= maxSnapshotRetries; ++attempt) { + printf("[%s] fetching snapshot (attempt %d/%d)\n", symbol.c_str(), attempt, + maxSnapshotRetries); + if (fetchAndApplySnapshot(symbol, *books->at(symbol), startStoken)) { + return true; + } + if (startStoken.stop_requested() || attempt == maxSnapshotRetries) { + break; + } + int delayMs = 1000 * attempt; + printf("[%s] snapshot fetch failed, retrying in %dms\n", symbol.c_str(), + delayMs); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, startStoken, std::chrono::milliseconds(delayMs), + [] { return false; }); } - int delayMs = 1000 * attempt; - printf("[%s] snapshot fetch failed, retrying in %dms\n", symbol.c_str(), delayMs); - std::this_thread::sleep_for(std::chrono::milliseconds(delayMs)); - } - fprintf(stderr, "[%s] snapshot fetch gave up after %d attempts\n", symbol.c_str(), - maxSnapshotRetries); - return false; - })); + fprintf(stderr, "[%s] snapshot fetch gave up after %d attempts\n", symbol.c_str(), + maxSnapshotRetries); + return false; + })); } bool allOk = true; @@ -376,5 +396,8 @@ bool BinanceAdapter::start(const std::vector& symbols, allOk = false; } } + if (!allOk) { + stop(); + } return allOk; } diff --git a/src/feed_handler.hpp b/src/feed_handler.hpp index 100b85f..519c7d4 100644 --- a/src/feed_handler.hpp +++ b/src/feed_handler.hpp @@ -52,7 +52,7 @@ std::string_view exchangeName() const override { return "binance"; } std::queue resyncQueue; std::mutex resyncMutex; - std::condition_variable resyncCv; + std::condition_variable_any resyncCv; std::mutex wsReadyMutex; std::condition_variable wsReady; @@ -64,8 +64,9 @@ std::string_view exchangeName() const override { return "binance"; } // Fetches a depth snapshot from the Binance REST API and applies it to the order book. // On 429 (rate limited) or 418 (IP banned), sleeps before returning false so the caller - // doesn't immediately retry and dig the hole deeper. - bool fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook); + // doesn't immediately retry and dig the hole deeper. The sleep is interruptible via stoken. + bool fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook, + std::stop_token stoken); void handleWsMessage(const ix::WebSocketMessagePtr& msg); diff --git a/src/main.cpp b/src/main.cpp index 9b96239..e1b060f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "feed_handler.hpp" @@ -79,6 +80,7 @@ int main(int argc, char* argv[]) { // Validate each entry and canonicalise to UPPERCASE for map keys. std::vector symbols; + std::unordered_set seen; for (const auto& entry : config["symbols"]) { if (!entry.is_string()) { fprintf(stderr, "config error: every entry in 'symbols' must be a string\n"); @@ -86,6 +88,11 @@ int main(int argc, char* argv[]) { } std::string sym = entry.get(); std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper); + if (seen.count(sym)) { + fprintf(stderr, "config error: duplicate symbol '%s'\n", sym.c_str()); + return -1; + } + seen.insert(sym); symbols.push_back(sym); } From e28bd48c482a37682d36ae4b864b7201d19f13fb Mon Sep 17 00:00:00 2001 From: Andrzej Pijanowski Date: Fri, 6 Mar 2026 14:41:37 +0100 Subject: [PATCH 6/6] pre-commit --- src/binance_utils.hpp | 3 +- src/feed_handler.cpp | 78 +++++++++++++++++--------------- src/feed_handler.hpp | 10 ++-- src/i_exchange_adapter.hpp | 70 ++++++++++++++-------------- src/main.cpp | 12 +++-- src/metrics_server.hpp | 21 +++++---- src/prometheus_format.hpp | 9 ++-- tests/test_prometheus_format.cpp | 12 +++-- 8 files changed, 120 insertions(+), 95 deletions(-) diff --git a/src/binance_utils.hpp b/src/binance_utils.hpp index dbf6eab..b26994e 100644 --- a/src/binance_utils.hpp +++ b/src/binance_utils.hpp @@ -18,7 +18,8 @@ * string is returned. * * @param stream Combined-stream name (e.g., "btcusdt@depth" or "ethusdt@aggTrade"). - * @return Uppercased symbol extracted from before the first '@' (or the entire uppercased input if no '@'). + * @return Uppercased symbol extracted from before the first '@' (or the entire uppercased input if + * no '@'). */ inline std::string streamToSymbol(const std::string& stream) { std::string sym = stream.substr(0, stream.find('@')); diff --git a/src/feed_handler.cpp b/src/feed_handler.cpp index e7838a3..007fc7d 100644 --- a/src/feed_handler.cpp +++ b/src/feed_handler.cpp @@ -53,14 +53,18 @@ void BinanceAdapter::stop() { /** * @brief Fetches a depth snapshot for a symbol from Binance and applies it to the given order book. * - * Attempts to download the REST order-book snapshot for `symbol`, apply it to `orderBook`, and then replay any - * pre-snapshot buffered websocket updates for that symbol atomically. If the HTTP response indicates rate limiting - * or an IP ban the function will wait according to the server's guidance (or a default) and return `false`. + * Attempts to download the REST order-book snapshot for `symbol`, apply it to `orderBook`, and then + * replay any pre-snapshot buffered websocket updates for that symbol atomically. If the HTTP + * response indicates rate limiting or an IP ban the function will wait according to the server's + * guidance (or a default) and return `false`. * * @param symbol Symbol identifier used in the Binance REST request (e.g., "BTCUSDT"). - * @param orderBook OrderBook instance to receive the applied snapshot and subsequent replayed updates. - * @param stoken Stop token used to interrupt any waits caused by rate limiting or IP bans, allowing for graceful shutdown. - * @return `true` if the snapshot was applied and all buffered updates were successfully replayed; `false` otherwise. + * @param orderBook OrderBook instance to receive the applied snapshot and subsequent replayed + * updates. + * @param stoken Stop token used to interrupt any waits caused by rate limiting or IP bans, allowing + * for graceful shutdown. + * @return `true` if the snapshot was applied and all buffered updates were successfully replayed; + * `false` otherwise. */ bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook, std::stop_token stoken) { @@ -123,13 +127,14 @@ bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& /** * @brief Process an incoming WebSocket event from Binance and update state or metrics accordingly. * - * Handles connection lifecycle events (Open/Close/Error) and parses combo-stream messages for order-book updates. - * While a symbol's initial snapshot is missing, messages are buffered. After snapshot application, updates are applied to - * the corresponding OrderBook; on update failures the symbol is scheduled for resynchronization and its buffer is cleared. - * Processing time and event lag are recorded in the associated Metrics. + * Handles connection lifecycle events (Open/Close/Error) and parses combo-stream messages for + * order-book updates. While a symbol's initial snapshot is missing, messages are buffered. After + * snapshot application, updates are applied to the corresponding OrderBook; on update failures the + * symbol is scheduled for resynchronization and its buffer is cleared. Processing time and event + * lag are recorded in the associated Metrics. * - * @param msg Pointer to the WebSocket message; for data messages it is expected to contain a Binance combo stream JSON - * object with fields `"stream"` and `"data"`. + * @param msg Pointer to the WebSocket message; for data messages it is expected to contain a + * Binance combo stream JSON object with fields `"stream"` and `"data"`. */ void BinanceAdapter::handleWsMessage(const ix::WebSocketMessagePtr& msg) { if (msg->type == ix::WebSocketMessageType::Open) { @@ -291,9 +296,11 @@ void BinanceAdapter::runResyncWorker(int maxSnapshotRetries, std::stop_token sto * @param booksRef Map of symbol -> owned OrderBook instances; must contain every symbol. * @param metricsMapRef Map of symbol -> owned Metrics instances; must contain every symbol. * @param snapshotDepthArg Depth parameter used when requesting REST order book snapshots. - * @param maxSnapshotRetries Maximum number of attempts per-symbol to fetch and apply the initial snapshot. - * @return true if the adapter connected and all initial snapshots were successfully applied; `false` if - * validation failed, the WebSocket failed to connect, or any symbol failed to obtain a snapshot. + * @param maxSnapshotRetries Maximum number of attempts per-symbol to fetch and apply the initial + * snapshot. + * @return true if the adapter connected and all initial snapshots were successfully applied; + * `false` if validation failed, the WebSocket failed to connect, or any symbol failed to obtain a + * snapshot. */ bool BinanceAdapter::start(const std::vector& symbols, std::unordered_map>& booksRef, @@ -366,28 +373,27 @@ bool BinanceAdapter::start(const std::vector& symbols, std::this_thread::sleep_for(std::chrono::milliseconds(300)); } const auto& symbol = symbols[i]; - futures.push_back( - std::async(std::launch::async, [this, symbol, maxSnapshotRetries, startStoken]() { - for (int attempt = 1; attempt <= maxSnapshotRetries; ++attempt) { - printf("[%s] fetching snapshot (attempt %d/%d)\n", symbol.c_str(), attempt, - maxSnapshotRetries); - if (fetchAndApplySnapshot(symbol, *books->at(symbol), startStoken)) { - return true; - } - if (startStoken.stop_requested() || attempt == maxSnapshotRetries) { - break; - } - int delayMs = 1000 * attempt; - printf("[%s] snapshot fetch failed, retrying in %dms\n", symbol.c_str(), - delayMs); - std::unique_lock lock(resyncMutex); - resyncCv.wait_for(lock, startStoken, std::chrono::milliseconds(delayMs), - [] { return false; }); + futures.push_back(std::async(std::launch::async, [this, symbol, maxSnapshotRetries, + startStoken]() { + for (int attempt = 1; attempt <= maxSnapshotRetries; ++attempt) { + printf("[%s] fetching snapshot (attempt %d/%d)\n", symbol.c_str(), attempt, + maxSnapshotRetries); + if (fetchAndApplySnapshot(symbol, *books->at(symbol), startStoken)) { + return true; } - fprintf(stderr, "[%s] snapshot fetch gave up after %d attempts\n", symbol.c_str(), - maxSnapshotRetries); - return false; - })); + if (startStoken.stop_requested() || attempt == maxSnapshotRetries) { + break; + } + int delayMs = 1000 * attempt; + printf("[%s] snapshot fetch failed, retrying in %dms\n", symbol.c_str(), delayMs); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, startStoken, std::chrono::milliseconds(delayMs), + [] { return false; }); + } + fprintf(stderr, "[%s] snapshot fetch gave up after %d attempts\n", symbol.c_str(), + maxSnapshotRetries); + return false; + })); } bool allOk = true; diff --git a/src/feed_handler.hpp b/src/feed_handler.hpp index 519c7d4..034ac00 100644 --- a/src/feed_handler.hpp +++ b/src/feed_handler.hpp @@ -25,11 +25,11 @@ class BinanceAdapter : public IExchangeAdapter { ~BinanceAdapter() override; /** - * @brief Identifies the exchange implementation. - * - * @return std::string_view Exchange identifier "binance". - */ -std::string_view exchangeName() const override { return "binance"; } + * @brief Identifies the exchange implementation. + * + * @return std::string_view Exchange identifier "binance". + */ + std::string_view exchangeName() const override { return "binance"; } // Builds the WebSocket URL from symbols, connects, fetches snapshots, and starts streaming. // Returns false if the WebSocket doesn't connect or any snapshot can't be fetched. diff --git a/src/i_exchange_adapter.hpp b/src/i_exchange_adapter.hpp index 15aaa53..3351ced 100644 --- a/src/i_exchange_adapter.hpp +++ b/src/i_exchange_adapter.hpp @@ -13,12 +13,12 @@ class IExchangeAdapter { public: /** - * @brief Ensures derived adapters are destroyed correctly when deleted via the interface. - * - * Guarantees that deleting an IExchangeAdapter pointer calls the concrete adapter's destructor - * so derived resources (connections, threads, etc.) are released properly. - */ -virtual ~IExchangeAdapter() = default; + * @brief Ensures derived adapters are destroyed correctly when deleted via the interface. + * + * Guarantees that deleting an IExchangeAdapter pointer calls the concrete adapter's destructor + * so derived resources (connections, threads, etc.) are released properly. + */ + virtual ~IExchangeAdapter() = default; // Returns the exchange name (e.g. "binance", "kraken"). virtual std::string_view exchangeName() const = 0; @@ -35,38 +35,40 @@ virtual ~IExchangeAdapter() = default; virtual void stop() = 0; /** - * @brief Deleted copy constructor to prevent copying of adapter instances. - * - * Ensures that exchange adapter objects (and their derived types) cannot be copy-constructed, enforcing unique ownership of connection and thread resources. - */ -IExchangeAdapter(const IExchangeAdapter&) = delete; + * @brief Deleted copy constructor to prevent copying of adapter instances. + * + * Ensures that exchange adapter objects (and their derived types) cannot be copy-constructed, + * enforcing unique ownership of connection and thread resources. + */ + IExchangeAdapter(const IExchangeAdapter&) = delete; /** - * @brief Deleted copy assignment operator to prevent copying of adapters. - * - * Adapter instances cannot be copy-assigned; ownership and resources must be managed by the concrete implementation. - */ -IExchangeAdapter& operator=(const IExchangeAdapter&) = delete; + * @brief Deleted copy assignment operator to prevent copying of adapters. + * + * Adapter instances cannot be copy-assigned; ownership and resources must be managed by the + * concrete implementation. + */ + IExchangeAdapter& operator=(const IExchangeAdapter&) = delete; /** - * @brief Deleted move constructor to prevent moving an adapter instance. - * - * Ensures concrete adapter implementations are not movable so ownership of - * connections, threads, and other resources remains with the original instance. - */ -IExchangeAdapter(IExchangeAdapter&&) = delete; + * @brief Deleted move constructor to prevent moving an adapter instance. + * + * Ensures concrete adapter implementations are not movable so ownership of + * connections, threads, and other resources remains with the original instance. + */ + IExchangeAdapter(IExchangeAdapter&&) = delete; /** - * @brief Deleted move-assignment operator to prevent move-assignment of adapters. - * - * Prevents transferring ownership of adapter resources (connections, threads, etc.) - * via move-assignment; implementations must manage lifecycle without relying on move - * semantics. - */ -IExchangeAdapter& operator=(IExchangeAdapter&&) = delete; + * @brief Deleted move-assignment operator to prevent move-assignment of adapters. + * + * Prevents transferring ownership of adapter resources (connections, threads, etc.) + * via move-assignment; implementations must manage lifecycle without relying on move + * semantics. + */ + IExchangeAdapter& operator=(IExchangeAdapter&&) = delete; protected: /** - * @brief Protected default constructor. - * - * Allows derived exchange adapter implementations to construct the base interface subobject. - */ -IExchangeAdapter() = default; + * @brief Protected default constructor. + * + * Allows derived exchange adapter implementations to construct the base interface subobject. + */ + IExchangeAdapter() = default; }; diff --git a/src/main.cpp b/src/main.cpp index e1b060f..7296a60 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -17,13 +17,19 @@ #include "order_book.hpp" /** - * @brief Program entry point that loads configuration, initializes order books, metrics, the exchange adapter and metrics server, then runs the monitoring loop. + * @brief Program entry point that loads configuration, initializes order books, metrics, the + * exchange adapter and metrics server, then runs the monitoring loop. * - * The first command-line argument (if present) is treated as the path to the JSON configuration file; otherwise "config.json" is used. The configuration controls update interval, snapshot depth, the list of symbols to watch, and the primary symbol. After successful initialization the function starts the metrics HTTP server and the exchange adapter, then enters an infinite loop that periodically prints per-symbol order book statistics and metrics. + * The first command-line argument (if present) is treated as the path to the JSON configuration + * file; otherwise "config.json" is used. The configuration controls update interval, snapshot + * depth, the list of symbols to watch, and the primary symbol. After successful initialization the + * function starts the metrics HTTP server and the exchange adapter, then enters an infinite loop + * that periodically prints per-symbol order book statistics and metrics. * * @param argc Number of command-line arguments. * @param argv Array of command-line argument strings; argv[1] may provide the config file path. - * @return int `0` on normal termination (unreachable under normal operation), `-1` on configuration, initialization, or runtime startup errors. + * @return int `0` on normal termination (unreachable under normal operation), `-1` on + * configuration, initialization, or runtime startup errors. */ int main(int argc, char* argv[]) { diff --git a/src/metrics_server.hpp b/src/metrics_server.hpp index 8d5a086..f3ec438 100644 --- a/src/metrics_server.hpp +++ b/src/metrics_server.hpp @@ -16,14 +16,16 @@ class MetricsServer { public: /** - * @brief Construct a MetricsServer that serves Prometheus metrics and health for an exchange. - * - * @param exchange Identifier of the exchange whose metrics will be exposed. - * @param metricsMap Reference to the map of metric objects indexed by symbol; the server stores this reference and reads metrics from it. - * @param books Reference to the map of order books indexed by symbol; the server stores this reference and reads book state from it. - * @param port TCP port to bind the HTTP metrics/health server to (default 9090). - */ - MetricsServer(std::string_view exchange, + * @brief Construct a MetricsServer that serves Prometheus metrics and health for an exchange. + * + * @param exchange Identifier of the exchange whose metrics will be exposed. + * @param metricsMap Reference to the map of metric objects indexed by symbol; the server stores + * this reference and reads metrics from it. + * @param books Reference to the map of order books indexed by symbol; the server stores this + * reference and reads book state from it. + * @param port TCP port to bind the HTTP metrics/health server to (default 9090). + */ + MetricsServer(std::string_view exchange, std::unordered_map>& metricsMap, std::unordered_map>& books, int port = 9090) @@ -85,7 +87,8 @@ class MetricsServer { * Builds the full Prometheus exposition text representing the current state of the stored * metrics and order books for the exchange associated with this MetricsServer. * - * @return std::string Prometheus exposition body ready to be served (plain text in Prometheus format). + * @return std::string Prometheus exposition body ready to be served (plain text in Prometheus + * format). */ std::string buildPrometheusMetrics() { return buildPrometheusOutput(exchange, metricsMap, books); diff --git a/src/prometheus_format.hpp b/src/prometheus_format.hpp index 394736c..20c0e8d 100644 --- a/src/prometheus_format.hpp +++ b/src/prometheus_format.hpp @@ -20,7 +20,8 @@ * - newline with `\n` * * @param v Label value to escape. - * @return Escaped string where backslash is replaced with `\\`, double-quote with `\"`, and newline with `\n`. + * @return Escaped string where backslash is replaced with `\\`, double-quote with `\"`, and newline + * with `\n`. */ inline std::string escapeLabelValue(std::string_view v) { std::string out; @@ -62,8 +63,10 @@ inline std::string escapeLabelValue(std::string_view v) { * * @param exchange Identifier for the adapter; used as the `exchange` label value. * @param metricsMap Map from symbol to Metrics; used for message counts and processing/lag values. - * @param books Map from symbol to OrderBook; used to obtain per-symbol order book stats (asks/bids counts, best prices). - * @return std::string Prometheus exposition text including HELP/TYPE headers and metric lines. Best-price and spread metrics are omitted when price values are not positive. + * @param books Map from symbol to OrderBook; used to obtain per-symbol order book stats (asks/bids + * counts, best prices). + * @return std::string Prometheus exposition text including HELP/TYPE headers and metric lines. + * Best-price and spread metrics are omitted when price values are not positive. */ inline std::string buildPrometheusOutput( std::string_view exchange, diff --git a/tests/test_prometheus_format.cpp b/tests/test_prometheus_format.cpp index 62f2575..9a0a21d 100644 --- a/tests/test_prometheus_format.cpp +++ b/tests/test_prometheus_format.cpp @@ -36,8 +36,10 @@ nlohmann::json makeSnap(long long lastUpdateId, /** * Extracts all non-empty, non-comment data lines from a Prometheus exposition text block. * - * @param output The full Prometheus exposition text (may contain comment lines starting with `#` and blank lines). - * @return std::vector A vector containing each data line (lines that are not empty and do not start with `#`), in their original order without trailing newlines. + * @param output The full Prometheus exposition text (may contain comment lines starting with `#` + * and blank lines). + * @return std::vector A vector containing each data line (lines that are not empty and + * do not start with `#`), in their original order without trailing newlines. */ std::vector dataLines(const std::string& output) { std::vector result; @@ -68,10 +70,12 @@ struct PrometheusFormatTest : testing::Test { } /** - * @brief Builds a Prometheus exposition-format text block for the fixture's metrics and order books. + * @brief Builds a Prometheus exposition-format text block for the fixture's metrics and order + * books. * * @param exchange Exchange label value to include on each metric line; defaults to "testex". - * @return std::string Prometheus-formatted text containing HELP/TYPE headers and metric data lines for the fixture's metrics and books. + * @return std::string Prometheus-formatted text containing HELP/TYPE headers and metric data + * lines for the fixture's metrics and books. */ std::string build(std::string_view exchange = "testex") { return buildPrometheusOutput(exchange, metrics, books);