diff --git a/CMakeLists.txt b/CMakeLists.txt index 72f6cb8..e0cac0d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,6 +46,8 @@ if(BUILD_TESTS) tests/test_order_book.cpp tests/test_decimal.cpp tests/test_safe_arithmetic.cpp + tests/test_binance_utils.cpp + tests/test_prometheus_format.cpp src/order_book.cpp ) diff --git a/grafana/dashboards/lob.json b/grafana/dashboards/lob.json index 8bf0611..3e1b630 100644 --- a/grafana/dashboards/lob.json +++ b/grafana/dashboards/lob.json @@ -14,12 +14,27 @@ "annotations": { "list": [] }, "templating": { "list": [ + { + "name": "exchange", + "label": "Exchange", + "type": "query", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "query": "label_values(lob_messages_total, exchange)", + "refresh": 1, + "multi": false, + "includeAll": true, + "allValue": ".*", + "sort": 1, + "hide": 0, + "options": [], + "current": {} + }, { "name": "symbol", "label": "Symbol", "type": "query", "datasource": { "type": "prometheus", "uid": "prometheus" }, - "query": "label_values(lob_messages_total, symbol)", + "query": "label_values(lob_messages_total{exchange=~\"$exchange\"}, symbol)", "refresh": 1, "multi": false, "includeAll": false, @@ -45,7 +60,7 @@ "refId": "A" }, { - "expr": "rate(lob_messages_total{symbol=\"$symbol\"}[1m])", + "expr": "rate(lob_messages_total{exchange=~\"$exchange\",symbol=\"$symbol\"}[1m])", "legendFormat": "{{symbol}} msg/s", "refId": "B" } @@ -66,19 +81,19 @@ { "id": 2, "title": "Event Lag", - "description": "Worst-case lag across all symbols, with selected symbol overlaid for comparison", + "description": "Worst-case lag per exchange, with selected symbol overlaid for comparison", "type": "timeseries", "gridPos": { "x": 12, "y": 0, "w": 12, "h": 8 }, "datasource": { "type": "prometheus", "uid": "prometheus" }, "targets": [ { - "expr": "max(lob_event_lag_milliseconds)", - "legendFormat": "worst lag ms (all)", + "expr": "max by(exchange) (lob_event_lag_milliseconds)", + "legendFormat": "{{exchange}} worst lag ms", "refId": "A" }, { - "expr": "lob_event_lag_milliseconds{symbol=\"$symbol\"}", - "legendFormat": "{{symbol}} lag ms", + "expr": "lob_event_lag_milliseconds{exchange=~\"$exchange\",symbol=\"$symbol\"}", + "legendFormat": "{{exchange}} {{symbol}} lag ms", "refId": "B" } ], @@ -122,7 +137,7 @@ "refId": "B" }, { - "expr": "lob_processing_time_microseconds{symbol=\"$symbol\"}", + "expr": "lob_processing_time_microseconds{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "{{symbol}} µs", "refId": "C" } @@ -157,12 +172,12 @@ "datasource": { "type": "prometheus", "uid": "prometheus" }, "targets": [ { - "expr": "lob_orderbook_best_ask_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_best_ask_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Best Ask", "refId": "A" }, { - "expr": "lob_orderbook_best_bid_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_best_bid_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Best Bid", "refId": "B" } @@ -190,7 +205,7 @@ "datasource": { "type": "prometheus", "uid": "prometheus" }, "targets": [ { - "expr": "lob_orderbook_spread_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_spread_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "spread USD", "refId": "A" } @@ -218,12 +233,12 @@ "datasource": { "type": "prometheus", "uid": "prometheus" }, "targets": [ { - "expr": "lob_orderbook_asks_count{symbol=\"$symbol\"}", + "expr": "lob_orderbook_asks_count{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Asks", "refId": "A" }, { - "expr": "lob_orderbook_bids_count{symbol=\"$symbol\"}", + "expr": "lob_orderbook_bids_count{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Bids", "refId": "B" } @@ -255,8 +270,8 @@ "refId": "A" }, { - "expr": "max(lob_event_lag_milliseconds)", - "legendFormat": "Worst Lag (ms)", + "expr": "max by(exchange) (lob_event_lag_milliseconds)", + "legendFormat": "Event Lag (ms)", "refId": "B" }, { @@ -265,17 +280,17 @@ "refId": "C" }, { - "expr": "lob_orderbook_best_ask_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_best_ask_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Best Ask", "refId": "D" }, { - "expr": "lob_orderbook_best_bid_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_best_bid_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Best Bid", "refId": "E" }, { - "expr": "lob_orderbook_spread_price{symbol=\"$symbol\"}", + "expr": "lob_orderbook_spread_price{exchange=~\"$exchange\",symbol=\"$symbol\"}", "legendFormat": "Spread", "refId": "F" } diff --git a/src/binance_utils.hpp b/src/binance_utils.hpp new file mode 100644 index 0000000..b26994e --- /dev/null +++ b/src/binance_utils.hpp @@ -0,0 +1,29 @@ +#pragma once +#include +#include +#include + +// Extracts the symbol from a Binance combined-stream name and uppercases it. +// +// Binance stream names look like: +// "btcusdt@depth" (basic depth stream) +// "ethusdt@depth@100ms" (depth stream with interval suffix) +// +// Returns everything before the first '@', uppercased, which is the key +/** + * Extracts the symbol portion from a Binance combined-stream name and returns it uppercased. + * + * The function takes the substring before the first '@' in `stream` and converts it to uppercase. + * If `stream` contains no '@', the entire input is uppercased. If `stream` is empty, an empty + * string is returned. + * + * @param stream Combined-stream name (e.g., "btcusdt@depth" or "ethusdt@aggTrade"). + * @return Uppercased symbol extracted from before the first '@' (or the entire uppercased input if + * no '@'). + */ +inline std::string streamToSymbol(const std::string& stream) { + std::string sym = stream.substr(0, stream.find('@')); + std::transform(sym.begin(), sym.end(), sym.begin(), + [](unsigned char c) { return ::toupper(c); }); + return sym; +} diff --git a/src/feed_handler.cpp b/src/feed_handler.cpp index 288e0dc..007fc7d 100644 --- a/src/feed_handler.cpp +++ b/src/feed_handler.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -13,13 +14,60 @@ namespace { constexpr std::size_t kMaxBufferedMsgsPerSymbol = 5000; } -std::string FeedHandler::streamToSymbol(const std::string& stream) { - std::string sym = stream.substr(0, stream.find('@')); - std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper); - return sym; +/** + * @brief Construct a BinanceAdapter configured for the specified depth update interval. + * + * Initializes the adapter's update interval used when subscribing to Binance depth streams. + * + * @param updateIntervalMs Milliseconds between depth update snapshots; must be either 100 or 1000. + * @throws std::invalid_argument if `updateIntervalMs` is not 100 or 1000. + */ +BinanceAdapter::BinanceAdapter(int updateIntervalMs) : updateIntervalMs(updateIntervalMs) { + if (updateIntervalMs != 100 && updateIntervalMs != 1000) { + throw std::invalid_argument("BinanceAdapter: updateIntervalMs must be 100 or 1000, got " + + std::to_string(updateIntervalMs)); + } +} + +/** + * @brief Cleans up the adapter, ensuring background workers and connections are stopped. + * + * Ensures the resync worker is requested to stop, the resync condition variable is + * notified, the resync thread is joined/reset, and the WebSocket client is stopped. + */ +BinanceAdapter::~BinanceAdapter() { stop(); } + +/** + * @brief Stops background processing and shuts down the WebSocket connection. + * + * Requests cancellation of the resync worker, wakes the resync condition variable, + * joins and destroys the resync thread, and stops the WebSocket client. + */ +void BinanceAdapter::stop() { + resyncThread.request_stop(); + resyncCv.notify_one(); + resyncThread = {}; // join + destroy + webSocket.stop(); } -bool FeedHandler::fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook) { +/** + * @brief Fetches a depth snapshot for a symbol from Binance and applies it to the given order book. + * + * Attempts to download the REST order-book snapshot for `symbol`, apply it to `orderBook`, and then + * replay any pre-snapshot buffered websocket updates for that symbol atomically. If the HTTP + * response indicates rate limiting or an IP ban the function will wait according to the server's + * guidance (or a default) and return `false`. + * + * @param symbol Symbol identifier used in the Binance REST request (e.g., "BTCUSDT"). + * @param orderBook OrderBook instance to receive the applied snapshot and subsequent replayed + * updates. + * @param stoken Stop token used to interrupt any waits caused by rate limiting or IP bans, allowing + * for graceful shutdown. + * @return `true` if the snapshot was applied and all buffered updates were successfully replayed; + * `false` otherwise. + */ +bool BinanceAdapter::fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook, + std::stop_token stoken) { ix::HttpClient httpClient; auto args = std::make_shared(); args->connectTimeout = 5; @@ -37,12 +85,14 @@ bool FeedHandler::fetchAndApplySnapshot(const std::string& symbol, OrderBook& or } } fprintf(stderr, "[%s] got 429, waiting %ds before retry\n", symbol.c_str(), retryAfterSec); - std::this_thread::sleep_for(std::chrono::seconds(retryAfterSec)); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, stoken, std::chrono::seconds(retryAfterSec), [] { return false; }); return false; } if (response->statusCode == 418) { fprintf(stderr, "[%s] IP banned (418), waiting 120s\n", symbol.c_str()); - std::this_thread::sleep_for(std::chrono::seconds(120)); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, stoken, std::chrono::seconds(120), [] { return false; }); return false; } if (response->statusCode != 200) { @@ -53,9 +103,12 @@ bool FeedHandler::fetchAndApplySnapshot(const std::string& symbol, OrderBook& or try { auto snapshot = nlohmann::json::parse(response->body); - orderBook.applySnapshot(snapshot); + // Hold bufferMutex across applySnapshot + buffer replay so that + // handleWsMessage cannot observe isSnapshotApplied()==true and apply + // a live update while we are still draining the pre-snapshot buffer. std::lock_guard lock(bufferMutex); + orderBook.applySnapshot(snapshot); for (const auto& msg : symbolBuffers[symbol]) { if (!orderBook.applyUpdate(msg)) { fprintf(stderr, "[%s] buffered message out of order, resyncing\n", symbol.c_str()); @@ -71,7 +124,19 @@ bool FeedHandler::fetchAndApplySnapshot(const std::string& symbol, OrderBook& or } } -void FeedHandler::handleWsMessage(const ix::WebSocketMessagePtr& msg) { +/** + * @brief Process an incoming WebSocket event from Binance and update state or metrics accordingly. + * + * Handles connection lifecycle events (Open/Close/Error) and parses combo-stream messages for + * order-book updates. While a symbol's initial snapshot is missing, messages are buffered. After + * snapshot application, updates are applied to the corresponding OrderBook; on update failures the + * symbol is scheduled for resynchronization and its buffer is cleared. Processing time and event + * lag are recorded in the associated Metrics. + * + * @param msg Pointer to the WebSocket message; for data messages it is expected to contain a + * Binance combo stream JSON object with fields `"stream"` and `"data"`. + */ +void BinanceAdapter::handleWsMessage(const ix::WebSocketMessagePtr& msg) { if (msg->type == ix::WebSocketMessageType::Open) { printf("connected\n"); std::lock_guard lock(wsReadyMutex); @@ -156,14 +221,26 @@ void FeedHandler::handleWsMessage(const ix::WebSocketMessagePtr& msg) { } } -void FeedHandler::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken) { - std::stop_callback wake(stoken, [this] { resyncCv.notify_one(); }); - while (!stoken.stop_requested()) { +/** + * @brief Background worker that processes symbols needing resynchronization. + * + * Waits for symbols enqueued for resync, then attempts up to `maxSnapshotRetries` + * to fetch and apply a fresh snapshot for each symbol. Retries use an increasing + * delay (1000ms * attempt) between attempts, and a 500ms pause is added after + * each symbol to avoid bursting REST requests. The worker wakes and exits when + * `stoken` requests stop. + * + * @param maxSnapshotRetries Maximum number of snapshot fetch attempts per symbol. + * @param stoken Stop token used to interrupt waiting, wake the worker, and + * request graceful shutdown. + */ +void BinanceAdapter::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken) { + while (true) { std::string symbol; { std::unique_lock lock(resyncMutex); - resyncCv.wait(lock, [&] { return !resyncQueue.empty() || stoken.stop_requested(); }); - if (stoken.stop_requested()) { + // wait() with stop_token: returns false immediately if stop is requested. + if (!resyncCv.wait(lock, stoken, [&] { return !resyncQueue.empty(); })) { break; } symbol = resyncQueue.front(); @@ -171,13 +248,21 @@ void FeedHandler::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken } for (int attempt = 1; attempt <= maxSnapshotRetries; ++attempt) { printf("[%s] resyncing (attempt %d/%d)\n", symbol.c_str(), attempt, maxSnapshotRetries); - if (fetchAndApplySnapshot(symbol, *books->at(symbol))) { + if (fetchAndApplySnapshot(symbol, *books->at(symbol), stoken)) { break; } - if (attempt < maxSnapshotRetries && !stoken.stop_requested()) { + if (stoken.stop_requested()) { + return; + } + if (attempt < maxSnapshotRetries) { int delayMs = 1000 * attempt; printf("[%s] resync failed, retrying in %dms\n", symbol.c_str(), delayMs); - std::this_thread::sleep_for(std::chrono::milliseconds(delayMs)); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, stoken, std::chrono::milliseconds(delayMs), + [] { return false; }); + if (stoken.stop_requested()) { + return; + } } } if (!books->at(symbol)->isSnapshotApplied()) { @@ -188,17 +273,48 @@ void FeedHandler::runResyncWorker(int maxSnapshotRetries, std::stop_token stoken // Small gap before picking up the next symbol. When several symbols fall out of sync // at once (common after a network hiccup), they all land in the queue together, and // processing them without any delay would fire REST requests back-to-back. - if (!stoken.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + { + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, stoken, std::chrono::milliseconds(500), [] { return false; }); + } + if (stoken.stop_requested()) { + return; } } } -bool FeedHandler::initialize( - const std::vector& symbols, - std::unordered_map>& booksRef, ix::WebSocket& webSocket, - std::unordered_map>& metricsMapRef, int snapshotDepthArg, - int maxSnapshotRetries) { +/** + * @brief Initialize the adapter for a set of trading symbols, establish streaming, start + * the resync worker, and load initial order book snapshots. + * + * Validates that each symbol has an associated OrderBook and Metrics, configures internal + * references and snapshot depth, opens the Binance combined WebSocket stream for the symbols, + * starts the background resync thread, and concurrently fetches and applies initial REST + * snapshots for each symbol with retry/backoff and a staggered launch to avoid rate limits. + * + * @param symbols List of symbol names to subscribe and initialize (e.g., "BTCUSDT"). + * @param booksRef Map of symbol -> owned OrderBook instances; must contain every symbol. + * @param metricsMapRef Map of symbol -> owned Metrics instances; must contain every symbol. + * @param snapshotDepthArg Depth parameter used when requesting REST order book snapshots. + * @param maxSnapshotRetries Maximum number of attempts per-symbol to fetch and apply the initial + * snapshot. + * @return true if the adapter connected and all initial snapshots were successfully applied; + * `false` if validation failed, the WebSocket failed to connect, or any symbol failed to obtain a + * snapshot. + */ +bool BinanceAdapter::start(const std::vector& symbols, + std::unordered_map>& booksRef, + std::unordered_map>& metricsMapRef, + int snapshotDepthArg, int maxSnapshotRetries) { + for (const auto& sym : symbols) { + if (booksRef.find(sym) == booksRef.end() || + metricsMapRef.find(sym) == metricsMapRef.end()) { + fprintf(stderr, "BinanceAdapter::start: symbol '%s' missing from books or metricsMap\n", + sym.c_str()); + return false; + } + } + books = &booksRef; metricsMap = &metricsMapRef; snapshotDepth = snapshotDepthArg; @@ -215,6 +331,16 @@ bool FeedHandler::initialize( wsConnected = false; } + // Build the combo stream URL from the symbol list. + std::string urlStreams; + for (const auto& sym : symbols) { + std::string lower = sym; + std::transform(lower.begin(), lower.end(), lower.begin(), ::tolower); + if (!urlStreams.empty()) urlStreams += "/"; + urlStreams += lower + "@depth@" + std::to_string(updateIntervalMs) + "ms"; + } + webSocket.setUrl("wss://stream.binance.com:9443/stream?streams=" + urlStreams); + webSocket.setOnMessageCallback( [this](const ix::WebSocketMessagePtr& msg) { handleWsMessage(msg); }); @@ -225,6 +351,8 @@ bool FeedHandler::initialize( wsReady.wait_for(lock, std::chrono::seconds(30), [this] { return wsConnected; }); if (!connected) { fprintf(stderr, "WebSocket didn't connect within 30s\n"); + lock.unlock(); + webSocket.stop(); return false; } } @@ -237,6 +365,7 @@ bool FeedHandler::initialize( // which is fine in isolation, but Docker restarts compound this fast enough to earn a 429 // or 418 ban. A 300ms gap between launches keeps only a handful of requests in-flight at a // time. + std::stop_token startStoken = resyncThread.get_stop_token(); std::vector> futures; futures.reserve(symbols.size()); for (size_t i = 0; i < symbols.size(); ++i) { @@ -244,19 +373,22 @@ bool FeedHandler::initialize( std::this_thread::sleep_for(std::chrono::milliseconds(300)); } const auto& symbol = symbols[i]; - futures.push_back(std::async(std::launch::async, [this, symbol, maxSnapshotRetries]() { + futures.push_back(std::async(std::launch::async, [this, symbol, maxSnapshotRetries, + startStoken]() { for (int attempt = 1; attempt <= maxSnapshotRetries; ++attempt) { printf("[%s] fetching snapshot (attempt %d/%d)\n", symbol.c_str(), attempt, maxSnapshotRetries); - if (fetchAndApplySnapshot(symbol, *books->at(symbol))) { + if (fetchAndApplySnapshot(symbol, *books->at(symbol), startStoken)) { return true; } - if (attempt == maxSnapshotRetries) { + if (startStoken.stop_requested() || attempt == maxSnapshotRetries) { break; } int delayMs = 1000 * attempt; printf("[%s] snapshot fetch failed, retrying in %dms\n", symbol.c_str(), delayMs); - std::this_thread::sleep_for(std::chrono::milliseconds(delayMs)); + std::unique_lock lock(resyncMutex); + resyncCv.wait_for(lock, startStoken, std::chrono::milliseconds(delayMs), + [] { return false; }); } fprintf(stderr, "[%s] snapshot fetch gave up after %d attempts\n", symbol.c_str(), maxSnapshotRetries); @@ -270,5 +402,8 @@ bool FeedHandler::initialize( allOk = false; } } + if (!allOk) { + stop(); + } return allOk; } diff --git a/src/feed_handler.hpp b/src/feed_handler.hpp index 0be865f..034ac00 100644 --- a/src/feed_handler.hpp +++ b/src/feed_handler.hpp @@ -8,25 +8,42 @@ #include #include #include +#include #include #include #include +#include "binance_utils.hpp" +#include "i_exchange_adapter.hpp" #include "metrics.hpp" #include "order_book.hpp" -class FeedHandler { +class BinanceAdapter : public IExchangeAdapter { public: - // Connects the WebSocket, starts the resync worker, and fetches initial snapshots for all - // symbols. Snapshot requests are staggered to stay within Binance REST rate limits. + // updateIntervalMs must be 100 or 1000, matching the Binance stream suffix (@depth@100ms). + explicit BinanceAdapter(int updateIntervalMs = 100); + ~BinanceAdapter() override; + + /** + * @brief Identifies the exchange implementation. + * + * @return std::string_view Exchange identifier "binance". + */ + std::string_view exchangeName() const override { return "binance"; } + + // Builds the WebSocket URL from symbols, connects, fetches snapshots, and starts streaming. // Returns false if the WebSocket doesn't connect or any snapshot can't be fetched. - bool initialize(const std::vector& symbols, - std::unordered_map>& books, - ix::WebSocket& webSocket, - std::unordered_map>& metricsMap, - int snapshotDepth = 1000, int maxSnapshotRetries = 5); + bool start(const std::vector& symbols, + std::unordered_map>& books, + std::unordered_map>& metricsMap, + int snapshotDepth = 1000, int maxSnapshotRetries = 5) override; + + void stop() override; private: + ix::WebSocket webSocket; + int updateIntervalMs; + std::unordered_map>* books = nullptr; std::unordered_map>* metricsMap = nullptr; @@ -35,7 +52,7 @@ class FeedHandler { std::queue resyncQueue; std::mutex resyncMutex; - std::condition_variable resyncCv; + std::condition_variable_any resyncCv; std::mutex wsReadyMutex; std::condition_variable wsReady; @@ -43,15 +60,13 @@ class FeedHandler { std::jthread resyncThread; - // "btcusdt@depth" → "BTCUSDT" - static std::string streamToSymbol(const std::string& stream); - int snapshotDepth = 1000; // Fetches a depth snapshot from the Binance REST API and applies it to the order book. // On 429 (rate limited) or 418 (IP banned), sleeps before returning false so the caller - // doesn't immediately retry and dig the hole deeper. - bool fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook); + // doesn't immediately retry and dig the hole deeper. The sleep is interruptible via stoken. + bool fetchAndApplySnapshot(const std::string& symbol, OrderBook& orderBook, + std::stop_token stoken); void handleWsMessage(const ix::WebSocketMessagePtr& msg); diff --git a/src/i_exchange_adapter.hpp b/src/i_exchange_adapter.hpp new file mode 100644 index 0000000..3351ced --- /dev/null +++ b/src/i_exchange_adapter.hpp @@ -0,0 +1,74 @@ +#pragma once +#include +#include +#include +#include +#include + +#include "metrics.hpp" +#include "order_book.hpp" + +// Interface for exchange feed adapters. +// Each adapter owns its WebSocket connection, snapshot logic, and rate-limit handling. +class IExchangeAdapter { +public: + /** + * @brief Ensures derived adapters are destroyed correctly when deleted via the interface. + * + * Guarantees that deleting an IExchangeAdapter pointer calls the concrete adapter's destructor + * so derived resources (connections, threads, etc.) are released properly. + */ + virtual ~IExchangeAdapter() = default; + + // Returns the exchange name (e.g. "binance", "kraken"). + virtual std::string_view exchangeName() const = 0; + + // Connects to the exchange, fetches initial snapshots for all symbols, + // and begins streaming live updates into books and metricsMap. + // Returns false if the connection or any snapshot fails. + virtual bool start(const std::vector& symbols, + std::unordered_map>& books, + std::unordered_map>& metricsMap, + int snapshotDepth = 1000, int maxSnapshotRetries = 5) = 0; + + // Stops the WebSocket connection and all background threads. + virtual void stop() = 0; + + /** + * @brief Deleted copy constructor to prevent copying of adapter instances. + * + * Ensures that exchange adapter objects (and their derived types) cannot be copy-constructed, + * enforcing unique ownership of connection and thread resources. + */ + IExchangeAdapter(const IExchangeAdapter&) = delete; + /** + * @brief Deleted copy assignment operator to prevent copying of adapters. + * + * Adapter instances cannot be copy-assigned; ownership and resources must be managed by the + * concrete implementation. + */ + IExchangeAdapter& operator=(const IExchangeAdapter&) = delete; + /** + * @brief Deleted move constructor to prevent moving an adapter instance. + * + * Ensures concrete adapter implementations are not movable so ownership of + * connections, threads, and other resources remains with the original instance. + */ + IExchangeAdapter(IExchangeAdapter&&) = delete; + /** + * @brief Deleted move-assignment operator to prevent move-assignment of adapters. + * + * Prevents transferring ownership of adapter resources (connections, threads, etc.) + * via move-assignment; implementations must manage lifecycle without relying on move + * semantics. + */ + IExchangeAdapter& operator=(IExchangeAdapter&&) = delete; + +protected: + /** + * @brief Protected default constructor. + * + * Allows derived exchange adapter implementations to construct the base interface subobject. + */ + IExchangeAdapter() = default; +}; diff --git a/src/main.cpp b/src/main.cpp index 3fd021f..7296a60 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,5 +1,3 @@ -#include - #include #include #include @@ -9,25 +7,30 @@ #include #include #include +#include #include #include "feed_handler.hpp" +#include "i_exchange_adapter.hpp" #include "metrics.hpp" #include "metrics_server.hpp" #include "order_book.hpp" -/* - Setup (do once): - 1. Open WebSocket to combo @depth stream — start buffering messages immediately - 2. For each symbol, fetch REST snapshot: GET /api/v3/depth?symbol=X&limit=5000 - 3. Apply buffered messages that arrived during snapshot fetch - 4. Process normally - - Every subsequent message: - - If u < your current book's lastUpdateId → stale, ignore it - - If U > your current book's lastUpdateId + 1 → you missed events, restart from scratch - - Otherwise → apply the update, set your lastUpdateId = u -*/ +/** + * @brief Program entry point that loads configuration, initializes order books, metrics, the + * exchange adapter and metrics server, then runs the monitoring loop. + * + * The first command-line argument (if present) is treated as the path to the JSON configuration + * file; otherwise "config.json" is used. The configuration controls update interval, snapshot + * depth, the list of symbols to watch, and the primary symbol. After successful initialization the + * function starts the metrics HTTP server and the exchange adapter, then enters an infinite loop + * that periodically prints per-symbol order book statistics and metrics. + * + * @param argc Number of command-line arguments. + * @param argv Array of command-line argument strings; argv[1] may provide the config file path. + * @return int `0` on normal termination (unreachable under normal operation), `-1` on + * configuration, initialization, or runtime startup errors. + */ int main(int argc, char* argv[]) { const char* configPath = (argc > 1) ? argv[1] : "config.json"; @@ -81,9 +84,9 @@ int main(int argc, char* argv[]) { return -1; } - // Validate each entry and canonicalise: UPPERCASE for map keys, lowercase for the URL. + // Validate each entry and canonicalise to UPPERCASE for map keys. std::vector symbols; - std::string urlStreams; + std::unordered_set seen; for (const auto& entry : config["symbols"]) { if (!entry.is_string()) { fprintf(stderr, "config error: every entry in 'symbols' must be a string\n"); @@ -91,16 +94,13 @@ int main(int argc, char* argv[]) { } std::string sym = entry.get(); std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper); - symbols.push_back(sym); - - std::string lower = sym; - std::transform(lower.begin(), lower.end(), lower.begin(), ::tolower); - if (!urlStreams.empty()) { - urlStreams += "/"; + if (seen.count(sym)) { + fprintf(stderr, "config error: duplicate symbol '%s'\n", sym.c_str()); + return -1; } - urlStreams += lower + "@depth@" + std::to_string(updateIntervalMs) + "ms"; + seen.insert(sym); + symbols.push_back(sym); } - const std::string url = "wss://stream.binance.com:9443/stream?streams=" + urlStreams; std::string primarySymbol = config["primary_symbol"].get(); std::transform(primarySymbol.begin(), primarySymbol.end(), primarySymbol.begin(), ::toupper); @@ -118,10 +118,9 @@ int main(int argc, char* argv[]) { metricsMap[sym] = std::make_unique(); } - ix::WebSocket webSocket; - webSocket.setUrl(url); + BinanceAdapter adapter(updateIntervalMs); - MetricsServer metricsServer(metricsMap, books); + MetricsServer metricsServer(adapter.exchangeName(), metricsMap, books); if (!metricsServer.start()) { fprintf(stderr, "couldn't bind metrics server on port 9090\n"); return -1; @@ -133,9 +132,8 @@ int main(int argc, char* argv[]) { } printf("\n"); - FeedHandler feedHandler; - if (!feedHandler.initialize(symbols, books, webSocket, metricsMap, snapshotDepth)) { - fprintf(stderr, "failed to start feed handler\n"); + if (!adapter.start(symbols, books, metricsMap, snapshotDepth)) { + fprintf(stderr, "failed to start exchange adapter\n"); return -1; } diff --git a/src/metrics_server.hpp b/src/metrics_server.hpp index 5653097..f3ec438 100644 --- a/src/metrics_server.hpp +++ b/src/metrics_server.hpp @@ -5,19 +5,38 @@ #include #include #include +#include #include #include #include "metrics.hpp" #include "order_book.hpp" +#include "prometheus_format.hpp" class MetricsServer { public: - MetricsServer(std::unordered_map>& metricsMap, + /** + * @brief Construct a MetricsServer that serves Prometheus metrics and health for an exchange. + * + * @param exchange Identifier of the exchange whose metrics will be exposed. + * @param metricsMap Reference to the map of metric objects indexed by symbol; the server stores + * this reference and reads metrics from it. + * @param books Reference to the map of order books indexed by symbol; the server stores this + * reference and reads book state from it. + * @param port TCP port to bind the HTTP metrics/health server to (default 9090). + */ + MetricsServer(std::string_view exchange, + std::unordered_map>& metricsMap, std::unordered_map>& books, int port = 9090) - : metricsMap(metricsMap), books(books), port(port) {} - + : exchange(exchange), metricsMap(metricsMap), books(books), port(port) {} + + /** + * @brief Stop the internal HTTP server and join the worker thread. + * + * Ensures the HTTP server is stopped and, if the internal thread is joinable, + * waits for the thread to finish before destruction. + */ ~MetricsServer() { svr.stop(); if (thread.joinable()) { @@ -62,91 +81,20 @@ class MetricsServer { } private: + /** + * @brief Produce Prometheus-formatted metrics for the server's configured exchange. + * + * Builds the full Prometheus exposition text representing the current state of the stored + * metrics and order books for the exchange associated with this MetricsServer. + * + * @return std::string Prometheus exposition body ready to be served (plain text in Prometheus + * format). + */ std::string buildPrometheusMetrics() { - std::ostringstream ss; - - // Helper: emit HELP+TYPE header once, then one line per symbol. - auto writeCounterHeader = [&](const char* name, const char* help) { - ss << "# HELP " << name << " " << help << "\n"; - ss << "# TYPE " << name << " counter\n"; - }; - auto writeGaugeHeader = [&](const char* name, const char* help) { - ss << "# HELP " << name << " " << help << "\n"; - ss << "# TYPE " << name << " gauge\n"; - }; - auto writeLine = [&](const char* name, const std::string& symbol, double value) { - ss << name << "{symbol=\"" << symbol << "\"} " << value << "\n"; - }; - - writeCounterHeader("lob_messages_total", "Total messages received from Binance"); - for (const auto& [sym, m] : metricsMap) { - writeLine("lob_messages_total", sym, static_cast(m->msgCount.load())); - } - - writeGaugeHeader("lob_event_lag_milliseconds", "Last event lag in milliseconds"); - for (const auto& [sym, m] : metricsMap) { - writeLine("lob_event_lag_milliseconds", sym, - static_cast(m->lastEventLagMs.load())); - } - - writeGaugeHeader("lob_processing_time_microseconds", - "Last update processing time in microseconds"); - for (const auto& [sym, m] : metricsMap) { - writeLine("lob_processing_time_microseconds", sym, - static_cast(m->lastProcessingUs.load())); - } - - writeGaugeHeader("lob_max_processing_time_microseconds", - "Maximum observed processing time in microseconds"); - for (const auto& [sym, m] : metricsMap) { - writeLine("lob_max_processing_time_microseconds", sym, - static_cast(m->maxProcessingUs.load())); - } - - // Collect stats once per symbol to avoid locking each book multiple times. - std::vector> allStats; - allStats.reserve(books.size()); - for (const auto& [sym, book] : books) { - allStats.emplace_back(sym, book->getStats()); - } - - writeGaugeHeader("lob_orderbook_asks_count", - "Number of ask price levels in the order book"); - for (const auto& [sym, stats] : allStats) { - writeLine("lob_orderbook_asks_count", sym, static_cast(stats.asksCount)); - } - - writeGaugeHeader("lob_orderbook_bids_count", - "Number of bid price levels in the order book"); - for (const auto& [sym, stats] : allStats) { - writeLine("lob_orderbook_bids_count", sym, static_cast(stats.bidsCount)); - } - - writeGaugeHeader("lob_orderbook_best_ask_price", "Best ask price in USD"); - for (const auto& [sym, stats] : allStats) { - if (stats.bestAsk > 0.0) { - writeLine("lob_orderbook_best_ask_price", sym, stats.bestAsk); - } - } - - writeGaugeHeader("lob_orderbook_best_bid_price", "Best bid price in USD"); - for (const auto& [sym, stats] : allStats) { - if (stats.bestBid > 0.0) { - writeLine("lob_orderbook_best_bid_price", sym, stats.bestBid); - } - } - - writeGaugeHeader("lob_orderbook_spread_price", - "Spread between best ask and best bid in USD"); - for (const auto& [sym, stats] : allStats) { - if (stats.bestAsk > 0.0 && stats.bestBid > 0.0) { - writeLine("lob_orderbook_spread_price", sym, stats.bestAsk - stats.bestBid); - } - } - - return ss.str(); + return buildPrometheusOutput(exchange, metricsMap, books); } + std::string exchange; std::unordered_map>& metricsMap; std::unordered_map>& books; int port; diff --git a/src/prometheus_format.hpp b/src/prometheus_format.hpp new file mode 100644 index 0000000..20c0e8d --- /dev/null +++ b/src/prometheus_format.hpp @@ -0,0 +1,162 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include + +#include "metrics.hpp" +#include "order_book.hpp" + +// Escapes a Prometheus label value per the text exposition format spec: +/** + * Escape a Prometheus label value by replacing special characters with backslash-escaped sequences. + * + * Replaces: + * - backslash (`\`) with `\\` + * - double-quote (`"`) with `\"` + * - newline with `\n` + * + * @param v Label value to escape. + * @return Escaped string where backslash is replaced with `\\`, double-quote with `\"`, and newline + * with `\n`. + */ +inline std::string escapeLabelValue(std::string_view v) { + std::string out; + out.reserve(v.size()); + for (char c : v) { + if (c == '\\') { + out += "\\\\"; + } else if (c == '"') { + out += "\\\""; + } else if (c == '\n') { + out += "\\n"; + } else { + out += c; + } + } + return out; +} + +// Builds a Prometheus text exposition for all symbols managed by one adapter. +// +// Each metric line gets both exchange="..." and symbol="..." labels so series +// from different adapters don't collide in a multi-exchange setup. +// +// Spread and best-price lines are skipped when the book is empty (no snapshot +/** + * Builds a Prometheus text exposition containing adapter metrics and order book statistics, + * emitting metric lines labeled with `exchange` and `symbol`. + * + * Emits the following metrics when available: + * - lob_messages_total + * - lob_event_lag_milliseconds + * - lob_processing_time_microseconds + * - lob_max_processing_time_microseconds + * - lob_orderbook_asks_count + * - lob_orderbook_bids_count + * - lob_orderbook_best_ask_price (only for positive best-ask) + * - lob_orderbook_best_bid_price (only for positive best-bid) + * - lob_orderbook_spread_price (only when both best-ask and best-bid are positive) + * + * @param exchange Identifier for the adapter; used as the `exchange` label value. + * @param metricsMap Map from symbol to Metrics; used for message counts and processing/lag values. + * @param books Map from symbol to OrderBook; used to obtain per-symbol order book stats (asks/bids + * counts, best prices). + * @return std::string Prometheus exposition text including HELP/TYPE headers and metric lines. + * Best-price and spread metrics are omitted when price values are not positive. + */ +inline std::string buildPrometheusOutput( + std::string_view exchange, + const std::unordered_map>& metricsMap, + const std::unordered_map>& books) { + std::ostringstream ss; + + auto writeCounterHeader = [&](const char* name, const char* help) { + ss << "# HELP " << name << " " << help << "\n"; + ss << "# TYPE " << name << " counter\n"; + }; + auto writeGaugeHeader = [&](const char* name, const char* help) { + ss << "# HELP " << name << " " << help << "\n"; + ss << "# TYPE " << name << " gauge\n"; + }; + auto writeLine = [&](const char* name, const std::string& symbol, double value) { + ss << name << "{exchange=\"" << escapeLabelValue(exchange) << "\",symbol=\"" + << escapeLabelValue(symbol) << "\"} " << value << "\n"; + }; + + writeCounterHeader("lob_messages_total", "Total messages received from exchange"); + for (const auto& [sym, m] : metricsMap) { + writeLine("lob_messages_total", sym, static_cast(m->msgCount.load())); + } + + writeGaugeHeader("lob_event_lag_milliseconds", "Last event lag in milliseconds"); + for (const auto& [sym, m] : metricsMap) { + writeLine("lob_event_lag_milliseconds", sym, static_cast(m->lastEventLagMs.load())); + } + + writeGaugeHeader("lob_processing_time_microseconds", + "Last update processing time in microseconds"); + for (const auto& [sym, m] : metricsMap) { + writeLine("lob_processing_time_microseconds", sym, + static_cast(m->lastProcessingUs.load())); + } + + writeGaugeHeader("lob_max_processing_time_microseconds", + "Maximum observed processing time in microseconds"); + for (const auto& [sym, m] : metricsMap) { + writeLine("lob_max_processing_time_microseconds", sym, + static_cast(m->maxProcessingUs.load())); + } + + // Collect stats once per symbol to avoid locking each book multiple times. + std::vector> allStats; + allStats.reserve(books.size()); + for (const auto& [sym, book] : books) { + allStats.emplace_back(sym, book->getStats()); + } + + writeGaugeHeader("lob_orderbook_asks_count", "Number of ask price levels in the order book"); + for (const auto& [sym, stats] : allStats) { + writeLine("lob_orderbook_asks_count", sym, static_cast(stats.asksCount)); + } + + writeGaugeHeader("lob_orderbook_bids_count", "Number of bid price levels in the order book"); + for (const auto& [sym, stats] : allStats) { + writeLine("lob_orderbook_bids_count", sym, static_cast(stats.bidsCount)); + } + + writeGaugeHeader("lob_orderbook_best_ask_price", "Best ask price in USD"); + for (const auto& [sym, stats] : allStats) { + if (stats.bestAsk > 0.0) { + writeLine("lob_orderbook_best_ask_price", sym, stats.bestAsk); + } + } + + writeGaugeHeader("lob_orderbook_best_bid_price", "Best bid price in USD"); + for (const auto& [sym, stats] : allStats) { + if (stats.bestBid > 0.0) { + writeLine("lob_orderbook_best_bid_price", sym, stats.bestBid); + } + } + + { + std::vector> spreadLines; + for (const auto& [sym, stats] : allStats) { + if (stats.bestAsk > 0.0 && stats.bestBid > 0.0) { + spreadLines.emplace_back(sym, stats.bestAsk - stats.bestBid); + } + } + if (!spreadLines.empty()) { + writeGaugeHeader("lob_orderbook_spread_price", + "Spread between best ask and best bid in USD"); + for (const auto& [sym, spread] : spreadLines) { + writeLine("lob_orderbook_spread_price", sym, spread); + } + } + } + + return ss.str(); +} diff --git a/tests/test_binance_utils.cpp b/tests/test_binance_utils.cpp new file mode 100644 index 0000000..9f3912f --- /dev/null +++ b/tests/test_binance_utils.cpp @@ -0,0 +1,24 @@ +#include + +#include "binance_utils.hpp" + +// ─── streamToSymbol ─────────────────────────────────────────────────────────── +// Covers the stream name to uppercase symbol conversion used as map keys. + +TEST(StreamToSymbol, BasicDepthStreamExtractsSymbol) { + EXPECT_EQ(streamToSymbol("btcusdt@depth"), "BTCUSDT"); +} + +TEST(StreamToSymbol, DepthWithIntervalSuffixIsStripped) { + // The @100ms / @1000ms parameter must not bleed into the symbol. + EXPECT_EQ(streamToSymbol("ethusdt@depth@100ms"), "ETHUSDT"); + EXPECT_EQ(streamToSymbol("solusdt@depth@1000ms"), "SOLUSDT"); +} + +TEST(StreamToSymbol, OutputIsAlwaysUppercase) { + EXPECT_EQ(streamToSymbol("BtcUsdt@depth"), "BTCUSDT"); +} + +TEST(StreamToSymbol, AlreadyUppercaseInputIsIdempotent) { + EXPECT_EQ(streamToSymbol("BNBUSDT@depth"), "BNBUSDT"); +} diff --git a/tests/test_prometheus_format.cpp b/tests/test_prometheus_format.cpp new file mode 100644 index 0000000..9a0a21d --- /dev/null +++ b/tests/test_prometheus_format.cpp @@ -0,0 +1,158 @@ +#include + +#include +#include +#include +#include + +#include "prometheus_format.hpp" + +namespace { + +/** + * @brief Build a minimal JSON snapshot for OrderBook::applySnapshot. + * + * Constructs a JSON object containing a numeric `lastUpdateId` and two arrays + * `"asks"` and `"bids"`, where each entry is a two-element array holding + * price and quantity as strings. + * + * @param lastUpdateId The snapshot's last update identifier. + * @param asks Vector of (price, quantity) pairs to populate the `"asks"` array. + * @param bids Vector of (price, quantity) pairs to populate the `"bids"` array. + * @return nlohmann::json JSON object with keys `lastUpdateId`, `asks`, and `bids`. + */ +nlohmann::json makeSnap(long long lastUpdateId, + const std::vector>& asks, + const std::vector>& bids) { + nlohmann::json snap; + snap["lastUpdateId"] = lastUpdateId; + snap["asks"] = nlohmann::json::array(); + for (const auto& [p, q] : asks) snap["asks"].push_back({p, q}); + snap["bids"] = nlohmann::json::array(); + for (const auto& [p, q] : bids) snap["bids"].push_back({p, q}); + return snap; +} + +/** + * Extracts all non-empty, non-comment data lines from a Prometheus exposition text block. + * + * @param output The full Prometheus exposition text (may contain comment lines starting with `#` + * and blank lines). + * @return std::vector A vector containing each data line (lines that are not empty and + * do not start with `#`), in their original order without trailing newlines. + */ +std::vector dataLines(const std::string& output) { + std::vector result; + std::istringstream ss(output); + std::string line; + while (std::getline(ss, line)) { + if (!line.empty() && line[0] != '#') { + result.push_back(line); + } + } + return result; +} + +// Fixture that owns a single-symbol metrics + book pair for BTCUSDT on "testex". +struct PrometheusFormatTest : testing::Test { + std::unordered_map> metrics; + std::unordered_map> books; + + /** + * @brief Prepare test fixture state by creating default entries for "BTCUSDT". + * + * Initializes the fixture's metrics and books maps with a fresh Metrics and + * OrderBook instance at key "BTCUSDT". + */ + void SetUp() override { + metrics["BTCUSDT"] = std::make_unique(); + books["BTCUSDT"] = std::make_unique(); + } + + /** + * @brief Builds a Prometheus exposition-format text block for the fixture's metrics and order + * books. + * + * @param exchange Exchange label value to include on each metric line; defaults to "testex". + * @return std::string Prometheus-formatted text containing HELP/TYPE headers and metric data + * lines for the fixture's metrics and books. + */ + std::string build(std::string_view exchange = "testex") { + return buildPrometheusOutput(exchange, metrics, books); + } +}; + +} // namespace + +// ─── Label correctness ──────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, ExchangeLabelAppearsOnEveryDataLine) { + const auto lines = dataLines(build("binance")); + ASSERT_FALSE(lines.empty()); + for (const auto& line : lines) { + EXPECT_NE(line.find("exchange=\"binance\""), std::string::npos) << line; + } +} + +TEST_F(PrometheusFormatTest, SymbolLabelAppearsOnEveryDataLine) { + const auto lines = dataLines(build()); + ASSERT_FALSE(lines.empty()); + for (const auto& line : lines) { + EXPECT_NE(line.find("symbol=\"BTCUSDT\""), std::string::npos) << line; + } +} + +// ─── Counter value ──────────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, MessageCountIsReflectedInOutput) { + metrics["BTCUSDT"]->msgCount.store(42); + const auto output = build(); + // The exact label+value fragment must be present. + EXPECT_NE(output.find("lob_messages_total{exchange=\"testex\",symbol=\"BTCUSDT\"} 42"), + std::string::npos); +} + +// ─── Spread guard ───────────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, SpreadOmittedWhenBookIsEmpty) { + // No snapshot applied → both ask and bid are 0.0 → spread must not appear. + const auto output = build(); + EXPECT_EQ(output.find("lob_orderbook_spread_price"), std::string::npos); +} + +TEST_F(PrometheusFormatTest, SpreadOmittedWhenOnlyAsksPopulated) { + books["BTCUSDT"]->applySnapshot(makeSnap(1, {{"50001.00", "1.0"}}, {})); + const auto output = build(); + EXPECT_EQ(output.find("lob_orderbook_spread_price"), std::string::npos); +} + +TEST_F(PrometheusFormatTest, SpreadEmittedWhenBothSidesPopulated) { + books["BTCUSDT"]->applySnapshot(makeSnap(1, {{"50002.00", "1.0"}}, {{"50000.00", "1.0"}})); + const auto output = build(); + EXPECT_NE(output.find("lob_orderbook_spread_price"), std::string::npos); +} + +// ─── HELP / TYPE headers ────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, HelpAndTypeHeadersPresentForAllMetrics) { + const auto output = build(); + for (const auto* name : + {"lob_messages_total", "lob_event_lag_milliseconds", "lob_processing_time_microseconds", + "lob_orderbook_asks_count", "lob_orderbook_bids_count"}) { + EXPECT_NE(output.find(std::string("# HELP ") + name), std::string::npos) << name; + EXPECT_NE(output.find(std::string("# TYPE ") + name), std::string::npos) << name; + } +} + +// ─── Multi-symbol ───────────────────────────────────────────────────────────── + +TEST_F(PrometheusFormatTest, AllSymbolsAppearInOutput) { + // Add a second symbol. + metrics["ETHUSDT"] = std::make_unique(); + books["ETHUSDT"] = std::make_unique(); + metrics["ETHUSDT"]->msgCount.store(7); + + const auto output = build(); + EXPECT_NE(output.find("symbol=\"BTCUSDT\""), std::string::npos); + EXPECT_NE(output.find("symbol=\"ETHUSDT\""), std::string::npos); +}