Skip to content

Implement Binance exchange adapter and Prometheus metrics integration#21

Merged
bountx merged 6 commits into
mainfrom
feat/iexchange-adapter-interface
Mar 6, 2026
Merged

Implement Binance exchange adapter and Prometheus metrics integration#21
bountx merged 6 commits into
mainfrom
feat/iexchange-adapter-interface

Conversation

@bountx
Copy link
Copy Markdown
Owner

@bountx bountx commented Mar 6, 2026

  • Added BinanceAdapter class for handling WebSocket connections and snapshot logic.
  • Introduced IExchangeAdapter interface for exchange feed adapters.
  • Created utility functions for symbol extraction and Prometheus metrics formatting.
  • Updated main application logic to utilize the new BinanceAdapter.
  • Added unit tests for streamToSymbol function and Prometheus output generation.

Summary by CodeRabbit

  • New Features

    • Grafana: added an "exchange" dashboard variable; dashboards now filter and show event lag and stats per exchange with updated legends.
    • Metrics/Prometheus: Prometheus output now includes an exchange label on all metrics, consistent HELP/TYPE headers, and conditional price/spread metrics only when valid.
  • Chores

    • Added unit tests for symbol parsing and Prometheus formatting.

- Added BinanceAdapter class for handling WebSocket connections and snapshot logic.
- Introduced IExchangeAdapter interface for exchange feed adapters.
- Created utility functions for symbol extraction and Prometheus metrics formatting.
- Updated main application logic to utilize the new BinanceAdapter.
- Added unit tests for streamToSymbol function and Prometheus output generation.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 6, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 5077627d-4784-43c2-afba-91a9c1e94c4f

📥 Commits

Reviewing files that changed from the base of the PR and between 7d104fd and e28bd48.

📒 Files selected for processing (8)
  • src/binance_utils.hpp
  • src/feed_handler.cpp
  • src/feed_handler.hpp
  • src/i_exchange_adapter.hpp
  • src/main.cpp
  • src/metrics_server.hpp
  • src/prometheus_format.hpp
  • tests/test_prometheus_format.cpp

📝 Walkthrough

Walkthrough

Replace FeedHandler with a BinanceAdapter implementing IExchangeAdapter, move WebSocket/resync/snapshot lifecycle into the adapter, add per-exchange Prometheus exposition and MetricsServer exchange awareness, add binance stream-to-symbol utility, update main and Grafana dashboard templating, and add unit tests for utilities and Prometheus output.

Changes

Cohort / File(s) Summary
Adapter & Interface
src/i_exchange_adapter.hpp, src/feed_handler.hpp, src/feed_handler.cpp
Introduce IExchangeAdapter; replace FeedHandler with BinanceAdapter (ctor, start, stop, dtor). Adapter owns ix::WebSocket, composes stream URL, manages resync/snapshot worker, handles WS messages and snapshot application.
Prometheus & MetricsServer
src/prometheus_format.hpp, src/metrics_server.hpp
Add escapeLabelValue and buildPrometheusOutput(exchange, ...); MetricsServer now accepts exchange (string_view), delegates text generation, is non-copyable, and joins its worker thread on destruction.
Binance Utilities & Tests
src/binance_utils.hpp, tests/test_binance_utils.cpp
Add streamToSymbol(const std::string&) (extract before '@' and uppercase) and unit tests covering suffixes, casing, and idempotence.
Prometheus Tests & Test CMake
tests/test_prometheus_format.cpp, CMakeLists.txt
Add GoogleTest suite validating Prometheus output (labels, metrics, conditional fields) and include new test sources in the lob_tests target.
Main & Integration
src/main.cpp
Swap FeedHandler/WebSocket wiring for BinanceAdapter; pass adapter.exchangeName() into MetricsServer; adjust startup ordering and error handling around adapter.start.
Grafana Dashboard
grafana/dashboards/lob.json
Add exchange templating variable; scope symbol query by $exchange; update event lag and current-stats queries and legend formats to aggregate/display per-exchange.

Sequence Diagram(s)

sequenceDiagram
    participant Main as Main
    participant Adapter as BinanceAdapter
    participant WS as WebSocket
    participant REST as REST/Snapshot
    participant Books as OrderBooks
    participant Metrics as MetricsMap
    participant MetricsSrv as MetricsServer

    Main->>Adapter: create(updateIntervalMs) / start(symbols, books, metrics, snapshotDepth)
    Adapter->>WS: open(stream URL composed from symbols@depth@interval)
    Adapter->>REST: fetch snapshot per symbol (with retry/backoff)
    REST-->>Adapter: snapshot JSON
    Adapter->>Books: fetchAndApplySnapshot(symbol, snapshot)
    Adapter->>Metrics: init/update metrics for symbol
    WS->>Adapter: onMessage(ws message)
    Adapter->>Books: apply incremental update
    Adapter->>Metrics: update counters & lags
    MetricsSrv->>Metrics: read metrics map
    MetricsSrv->>Books: read order book state
    MetricsSrv->>MetricsSrv: buildPrometheusOutput(Adapter.exchangeName(), metrics, books)
    MetricsSrv-->>HTTP Client: expose /metrics text
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

I nibble streams and split at the '@',
Uppercase symbols hop into the map.
Adapters hum, snapshots line by line,
Metrics bloom per-exchange, tidy and fine.
A rabbit cheers — observability is mine! 🐰✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 32.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main changes: implementing a Binance exchange adapter (class BinanceAdapter, IExchangeAdapter interface) and Prometheus metrics integration (prometheus_format.hpp, metrics updates in Grafana dashboard).

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/iexchange-adapter-interface

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/feed_handler.cpp (3)

202-209: ⚠️ Potential issue | 🟠 Major

Preflight-validate symbol state before launching async snapshot fetches

Startup dereferences books->at(symbol) in async tasks (Line 265). If symbol keys drift from symbols (including case differences), this throws in background startup flow.

Suggested preflight check
 bool BinanceAdapter::start(const std::vector<std::string>& symbols,
                            std::unordered_map<std::string, std::unique_ptr<OrderBook>>& booksRef,
                            std::unordered_map<std::string, std::unique_ptr<Metrics>>& metricsMapRef,
                            int snapshotDepthArg, int maxSnapshotRetries) {
     books = &booksRef;
     metricsMap = &metricsMapRef;
     snapshotDepth = snapshotDepthArg;
+
+    if (symbols.empty()) {
+        fprintf(stderr, "No symbols configured\n");
+        return false;
+    }
+    for (const auto& symbol : symbols) {
+        if (books->find(symbol) == books->end() || metricsMap->find(symbol) == metricsMap->end()) {
+            fprintf(stderr, "Missing book/metrics state for symbol %s\n", symbol.c_str());
+            return false;
+        }
+    }

Also applies to: 256-266

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/feed_handler.cpp` around lines 202 - 209, BinanceAdapter::start launches
async snapshot fetches that dereference books->at(symbol) (and similarly access
metricsMap), which can throw if the provided symbols list and the keys in
booksRef/metricsMapRef differ (including casing); before launching any async
tasks, iterate the symbols vector and validate that each symbol key exists in
booksRef and metricsMapRef (or normalize casing to the map key convention), log
a clear error and return false (or remove/skip invalid symbols) if any are
missing, and only then proceed to set books, metricsMap and spawn the background
tasks so the async code never dereferences missing keys.

235-243: ⚠️ Potential issue | 🟠 Major

Call webSocket.stop() on connect-timeout failure path

When webSocket.start() (line 236) times out, the code returns false (line 242) without stopping the socket. This leaves the socket in a started but not connected state, preventing subsequent connection attempts since the condition at line 235 (getReadyState() == Closed) will be false.

Proposed fix
         if (!connected) {
             fprintf(stderr, "WebSocket didn't connect within 30s\n");
+            webSocket.stop();
             return false;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/feed_handler.cpp` around lines 235 - 243, The timeout failure path after
calling webSocket.start() leaves the socket started but not connected; modify
the block that checks the wsReady.wait_for result so that when connected is
false you call webSocket.stop() before returning false. Update the failure
branch in the code around webSocket.getReadyState(), webSocket.start(), wsReady,
wsReadyMutex and wsConnected to ensure webSocket.stop() is invoked on timeout
(while preserving the existing locking around wsReady if needed) so subsequent
connection attempts can succeed.

27-71: ⚠️ Potential issue | 🔴 Critical

Serialize snapshot application and buffered replay to prevent live updates from racing ahead

fetchAndApplySnapshot applies the snapshot (line 61) and sets snapshotApplied=true before replaying buffered updates (lines 63–71). handleWsMessage checks the snapshot flag (line 119) outside the buffer lock, then applies live updates (line 134) without synchronization. This creates a window where live messages bypass the buffer queue and race with replay, corrupting update sequence ordering that the gap check (U > lastUpdateId + 1) depends on.

Acquire a shared state lock across snapshot application, buffer replay, and the check that gates live updates to ensure they form an atomic unit.

Example fix
+// feed_handler.hpp
+std::mutex stateMutex; // Guards snapshot application, replay, and live update gating.

 // fetchAndApplySnapshot(...)
-        orderBook.applySnapshot(snapshot);
-        std::lock_guard<std::mutex> lock(bufferMutex);
+        std::lock_guard<std::mutex> stateLock(stateMutex);
+        orderBook.applySnapshot(snapshot);
         for (const auto& msg : symbolBuffers[symbol]) {
             if (!orderBook.applyUpdate(msg)) {
                 ...
             }
         }

 // handleWsMessage(...)
-        {
-            std::lock_guard<std::mutex> lock(bufferMutex);
+        {
+            std::lock_guard<std::mutex> stateLock(stateMutex);
             if (!book.isSnapshotApplied()) {
                 symbolBuffers[symbol].push_back(jsonMsg);
                 return;
             }
         }
         if (!book.applyUpdate(jsonMsg)) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/feed_handler.cpp` around lines 27 - 71, Summary: live WebSocket updates
can race with snapshot application/replay because snapshotApplied and buffer
replay aren't synchronized with handleWsMessage checks, corrupting update
ordering. Fix: make snapshot application and buffered replay atomic with the
live-update gate by reusing the same mutex (bufferMutex) or a new shared state
mutex: in fetchAndApplySnapshot (function fetchAndApplySnapshot) acquire the
mutex before calling orderBook.applySnapshot, keep it held while setting
snapshotApplied=true, iterating symbolBuffers[symbol] and calling
orderBook.applyUpdate for each buffered msg, clearing symbolBuffers[symbol],
then release; in handleWsMessage, hold the same mutex when reading
snapshotApplied and when deciding to enqueue vs apply live updates (and when
calling orderBook.applyUpdate) so the check-and-apply path is synchronized with
snapshot replay. Ensure symbols referenced: fetchAndApplySnapshot,
handleWsMessage, snapshotApplied, bufferMutex, symbolBuffers, applySnapshot,
applyUpdate.
🧹 Nitpick comments (1)
tests/test_prometheus_format.cpp (1)

79-80: Avoid exact label-order matching in metric assertions

Line 79 hardcodes label order (exchange, then symbol). Prometheus label order is not semantically significant, so this is fragile against harmless formatter changes.

Suggested test hardening
-    EXPECT_NE(output.find("lob_messages_total{exchange=\"testex\",symbol=\"BTCUSDT\"} 42"),
-              std::string::npos);
+    EXPECT_NE(output.find("lob_messages_total{"), std::string::npos);
+    EXPECT_NE(output.find("exchange=\"testex\""), std::string::npos);
+    EXPECT_NE(output.find("symbol=\"BTCUSDT\""), std::string::npos);
+    EXPECT_NE(output.find(" 42"), std::string::npos);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_prometheus_format.cpp` around lines 79 - 80, The test currently
asserts the exact label order by checking
EXPECT_NE(output.find("lob_messages_total{exchange=\"testex\",symbol=\"BTCUSDT\"}
42"), std::string::npos); — change this to a label-order-agnostic check: either
use a regex that matches
lob_messages_total\{[^}]*exchange="testex"[^}]*symbol="BTCUSDT"[^}]*\}\s+42 (or
the same with symbol before exchange) or parse the metric labels from the output
and assert both exchange=="testex" and symbol=="BTCUSDT" are present for
lob_messages_total; update the EXPECT_NE usage accordingly so the assertion no
longer depends on label ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@grafana/dashboards/lob.json`:
- Around line 273-275: The legendFormat was changed to "{{exchange}} Lag (ms)"
but the stat override matcher still targets "Event Lag (ms)"; update the
override matcher to match the new legend text so the ms unit is applied. Locate
the panel's series override configuration (the override matcher referencing
"Event Lag (ms)") and change its match string to "{{exchange}} Lag (ms)" (or
switch the legendFormat back to "Event Lag (ms)" if you prefer); ensure the
override and the legendFormat are identical so the unit override on the series
will take effect.
- Around line 17-37: The symbol selector and downstream panel queries are
missing the exchange filter and thus mix identical symbols across exchanges;
update the symbol variable definition and every metric expression that is
symbol-scoped to include the exchange label matcher (use exchange=~"$exchange"),
e.g. change label_values(lob_messages_total, symbol) to
label_values(lob_messages_total{exchange=~"$exchange"}, symbol) and add
{exchange=~"$exchange"} to the metric selectors used in panels that reference
the symbol variable (those using lob_messages_total and related metrics such as
in the panels currently filtering only by symbol).

In `@src/binance_utils.hpp`:
- Line 15: The call std::transform(sym.begin(), sym.end(), sym.begin(),
::toupper) can invoke undefined behavior for negative char values; fix it by
including <cctype> and changing the transform to pass each character cast to
unsigned char before calling ::toupper (i.e., use a lambda that calls
::toupper(static_cast<unsigned char>(c))) so the code in src/binance_utils.hpp
that manipulates sym is safe and standards-compliant.

In `@src/feed_handler.cpp`:
- Line 16: The BinanceAdapter constructor currently accepts any int and can
produce unsupported stream names; add validation in
BinanceAdapter::BinanceAdapter(int updateIntervalMs) to only accept 100 or 1000
and reject others (e.g., throw std::invalid_argument or assert) so invalid
values cannot be constructed; include a clear error message mentioning the
invalid updateIntervalMs and the allowed values.

In `@src/main.cpp`:
- Around line 125-127: The startup failure message is outdated; when
adapter.start(symbols, books, metricsMap, snapshotDepth) fails, update the
stderr message to mention the exchange adapter (e.g., "failed to start exchange
adapter") and include context from the call (use the adapter.start invocation
and its parameters symbols, books, metricsMap, snapshotDepth to locate the
check) so the error clearly indicates the exchange adapter startup failure.

In `@src/prometheus_format.hpp`:
- Around line 34-36: The Prometheus label values are written raw in the
writeLine lambda, which can break the exposition format if exchange or symbol
contain backslashes, quotes, or newlines; implement a helper (e.g.,
escapeLabelValue or escapePrometheusLabel) that replaces '\' -> "\\", '"' ->
"\\\"", and newline -> "\\n", then call that helper on both exchange and symbol
inside the writeLine lambda before interpolating them into the label string so
all written labels are properly escaped.

---

Outside diff comments:
In `@src/feed_handler.cpp`:
- Around line 202-209: BinanceAdapter::start launches async snapshot fetches
that dereference books->at(symbol) (and similarly access metricsMap), which can
throw if the provided symbols list and the keys in booksRef/metricsMapRef differ
(including casing); before launching any async tasks, iterate the symbols vector
and validate that each symbol key exists in booksRef and metricsMapRef (or
normalize casing to the map key convention), log a clear error and return false
(or remove/skip invalid symbols) if any are missing, and only then proceed to
set books, metricsMap and spawn the background tasks so the async code never
dereferences missing keys.
- Around line 235-243: The timeout failure path after calling webSocket.start()
leaves the socket started but not connected; modify the block that checks the
wsReady.wait_for result so that when connected is false you call
webSocket.stop() before returning false. Update the failure branch in the code
around webSocket.getReadyState(), webSocket.start(), wsReady, wsReadyMutex and
wsConnected to ensure webSocket.stop() is invoked on timeout (while preserving
the existing locking around wsReady if needed) so subsequent connection attempts
can succeed.
- Around line 27-71: Summary: live WebSocket updates can race with snapshot
application/replay because snapshotApplied and buffer replay aren't synchronized
with handleWsMessage checks, corrupting update ordering. Fix: make snapshot
application and buffered replay atomic with the live-update gate by reusing the
same mutex (bufferMutex) or a new shared state mutex: in fetchAndApplySnapshot
(function fetchAndApplySnapshot) acquire the mutex before calling
orderBook.applySnapshot, keep it held while setting snapshotApplied=true,
iterating symbolBuffers[symbol] and calling orderBook.applyUpdate for each
buffered msg, clearing symbolBuffers[symbol], then release; in handleWsMessage,
hold the same mutex when reading snapshotApplied and when deciding to enqueue vs
apply live updates (and when calling orderBook.applyUpdate) so the
check-and-apply path is synchronized with snapshot replay. Ensure symbols
referenced: fetchAndApplySnapshot, handleWsMessage, snapshotApplied,
bufferMutex, symbolBuffers, applySnapshot, applyUpdate.

---

Nitpick comments:
In `@tests/test_prometheus_format.cpp`:
- Around line 79-80: The test currently asserts the exact label order by
checking
EXPECT_NE(output.find("lob_messages_total{exchange=\"testex\",symbol=\"BTCUSDT\"}
42"), std::string::npos); — change this to a label-order-agnostic check: either
use a regex that matches
lob_messages_total\{[^}]*exchange="testex"[^}]*symbol="BTCUSDT"[^}]*\}\s+42 (or
the same with symbol before exchange) or parse the metric labels from the output
and assert both exchange=="testex" and symbol=="BTCUSDT" are present for
lob_messages_total; update the EXPECT_NE usage accordingly so the assertion no
longer depends on label ordering.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: a46ad5d1-d6ab-449e-a605-578408437b50

📥 Commits

Reviewing files that changed from the base of the PR and between e6bfff1 and 6165cfb.

📒 Files selected for processing (11)
  • CMakeLists.txt
  • grafana/dashboards/lob.json
  • src/binance_utils.hpp
  • src/feed_handler.cpp
  • src/feed_handler.hpp
  • src/i_exchange_adapter.hpp
  • src/main.cpp
  • src/metrics_server.hpp
  • src/prometheus_format.hpp
  • tests/test_binance_utils.cpp
  • tests/test_prometheus_format.cpp

Comment thread grafana/dashboards/lob.json
Comment thread grafana/dashboards/lob.json
Comment thread src/binance_utils.hpp Outdated
Comment thread src/feed_handler.cpp Outdated
Comment thread src/main.cpp
Comment thread src/prometheus_format.hpp Outdated
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 6, 2026

Note

Docstrings generation - SUCCESS
Generated docstrings and committed to branch feat/iexchange-adapter-interface (commit: 7d104fd57b63149058dbadcad9a183cfc45f86a0)

Docstrings generation was requested by @bountx.

The following files were modified:

* `src/binance_utils.hpp`
* `src/feed_handler.cpp`
* `src/feed_handler.hpp`
* `src/i_exchange_adapter.hpp`
* `src/main.cpp`
* `src/metrics_server.hpp`
* `src/prometheus_format.hpp`
* `tests/test_prometheus_format.cpp`

These files were kept as they were:
* `tests/test_binance_utils.cpp`

These file types are not supported:
* `CMakeLists.txt`
* `grafana/dashboards/lob.json`
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/main.cpp (1)

83-93: ⚠️ Potential issue | 🟠 Major

Reject duplicate symbols after uppercase canonicalization.

After Line 91 canonicalizes entries, duplicates are still pushed into symbols. This can create duplicate stream subscriptions for the same market and unstable update ordering for that symbol.

💡 Suggested fix
+#include <unordered_set>
...
     std::vector<std::string> symbols;
+    std::unordered_set<std::string> seenSymbols;
     for (const auto& entry : config["symbols"]) {
         if (!entry.is_string()) {
             fprintf(stderr, "config error: every entry in 'symbols' must be a string\n");
             return -1;
         }
         std::string sym = entry.get<std::string>();
         std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper);
+        if (!seenSymbols.insert(sym).second) {
+            fprintf(stderr, "config error: duplicate symbol '%s'\n", sym.c_str());
+            return -1;
+        }
         symbols.push_back(sym);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.cpp` around lines 83 - 93, After canonicalizing each entry from
config["symbols"] into sym (uppercase), check for duplicates and reject them:
maintain an unordered_set<std::string> (e.g., seen) alongside the symbols
vector, and after transforming sym verify if seen contains sym — if so, print an
error (fprintf) and return -1; otherwise insert sym into seen and push_back into
symbols. Update the loop that processes config["symbols"] to perform this
duplicate detection using the sym variable and the symbols/seen containers.
src/feed_handler.cpp (1)

301-307: ⚠️ Potential issue | 🟠 Major

Clean up running resources before returning false from start.

At Line 266 the resync thread and websocket can already be active; if any initial snapshot future fails, Line 307 returns false without stopping them. That leaves the adapter in a partially running failed state for callers that keep the object alive.

💡 Suggested fix
     bool allOk = true;
     for (auto& f : futures) {
         if (!f.get()) {
             allOk = false;
         }
     }
-    return allOk;
+    if (!allOk) {
+        stop();
+        return false;
+    }
+    return true;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/feed_handler.cpp` around lines 301 - 307, In start(), when iterating the
futures vector any failed future should trigger cleanup before returning false:
if any f.get() is false, stop and join the resync thread, close/terminate the
websocket, and perform any adapter shutdown/cleanup steps (e.g. signal threads,
join resync_thread, call the websocket close/cleanup method) before returning
false; ensure you reference the same members used in this class (futures,
resync_thread, websocket or the class-specific shutdown/join methods) so the
adapter is not left in a partially running state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/feed_handler.cpp`:
- Around line 301-307: In start(), when iterating the futures vector any failed
future should trigger cleanup before returning false: if any f.get() is false,
stop and join the resync thread, close/terminate the websocket, and perform any
adapter shutdown/cleanup steps (e.g. signal threads, join resync_thread, call
the websocket close/cleanup method) before returning false; ensure you reference
the same members used in this class (futures, resync_thread, websocket or the
class-specific shutdown/join methods) so the adapter is not left in a partially
running state.

In `@src/main.cpp`:
- Around line 83-93: After canonicalizing each entry from config["symbols"] into
sym (uppercase), check for duplicates and reject them: maintain an
unordered_set<std::string> (e.g., seen) alongside the symbols vector, and after
transforming sym verify if seen contains sym — if so, print an error (fprintf)
and return -1; otherwise insert sym into seen and push_back into symbols. Update
the loop that processes config["symbols"] to perform this duplicate detection
using the sym variable and the symbols/seen containers.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: dd36ed9b-cefc-4801-ae43-00cc56761c8e

📥 Commits

Reviewing files that changed from the base of the PR and between 6a9965c and 3deac56.

📒 Files selected for processing (5)
  • grafana/dashboards/lob.json
  • src/binance_utils.hpp
  • src/feed_handler.cpp
  • src/main.cpp
  • src/prometheus_format.hpp
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/binance_utils.hpp

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/main.cpp (1)

80-90: ⚠️ Potential issue | 🟠 Major

Reject duplicate symbols after canonicalization.

If config contains duplicates (including case variants like btcusdt + BTCUSDT), this path will subscribe to the same stream multiple times. That can produce duplicate update application and unnecessary resync churn.

💡 Proposed fix
+#include <unordered_set>
 ...
     // Validate each entry and canonicalise to UPPERCASE for map keys.
     std::vector<std::string> symbols;
+    std::unordered_set<std::string> seenSymbols;
     for (const auto& entry : config["symbols"]) {
         if (!entry.is_string()) {
             fprintf(stderr, "config error: every entry in 'symbols' must be a string\n");
             return -1;
         }
         std::string sym = entry.get<std::string>();
         std::transform(sym.begin(), sym.end(), sym.begin(), ::toupper);
+        if (!seenSymbols.insert(sym).second) {
+            fprintf(stderr, "config error: duplicate symbol '%s' in symbols list\n", sym.c_str());
+            return -1;
+        }
         symbols.push_back(sym);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.cpp` around lines 80 - 90, After canonicalizing each entry from
config["symbols"] into sym and before pushing into the symbols vector, detect
duplicates and reject the config: maintain a std::unordered_set<std::string>
(e.g., seenSymbols) and if sym already exists in seenSymbols log an error (same
format as other checks) and return -1; otherwise insert into seenSymbols and
push_back into symbols. Update the block that iterates config["symbols"] (the
loop using entry, sym, symbols) to perform this duplicate check so case-variants
like "btcusdt" and "BTCUSDT" are treated as the same symbol and rejected.
src/feed_handler.cpp (1)

373-379: ⚠️ Potential issue | 🟠 Major

Stop partially-started resources before returning startup failure.

If any initial snapshot fails, start() returns false but leaves the WebSocket and resync worker running. That leaks a partially initialized adapter state to callers.

💡 Proposed fix
     bool allOk = true;
     for (auto& f : futures) {
         if (!f.get()) {
             allOk = false;
         }
     }
+    if (!allOk) {
+        stop();
+    }
     return allOk;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/feed_handler.cpp` around lines 373 - 379, If any future indicates
failure, stop any partially started components before returning false: after
collecting results from futures (the loop over futures and the allOk boolean in
start()), if allOk is false invoke the adapter's WebSocket and resync worker
shutdown routines (e.g., websocket->stop()/shutdown()/close() and
resyncWorker.stop()/join()/shutdown() — use the actual member names in this
file), wait for their termination (join threads or await completion) and then
return false so no partially initialized resources are left running.
🧹 Nitpick comments (2)
tests/test_prometheus_format.cpp (1)

85-154: Add a regression test for escaped label values.

Current tests only exercise plain alphanumeric exchange/symbol values. A targeted test with quotes/backslashes/newlines would lock in the escapeLabelValue behavior.

💡 Proposed test addition
 TEST_F(PrometheusFormatTest, SymbolLabelAppearsOnEveryDataLine) {
     const auto lines = dataLines(build());
     ASSERT_FALSE(lines.empty());
     for (const auto& line : lines) {
         EXPECT_NE(line.find("symbol=\"BTCUSDT\""), std::string::npos) << line;
     }
 }
+
+TEST_F(PrometheusFormatTest, LabelValuesAreEscapedInOutput) {
+    const std::string weirdSymbol = "BTC\"USDT\\X";
+    metrics[weirdSymbol] = std::make_unique<Metrics>();
+    books[weirdSymbol] = std::make_unique<OrderBook>();
+
+    const auto output = build("bin\"ance\n");
+    EXPECT_NE(output.find("exchange=\"bin\\\"ance\\n\""), std::string::npos);
+    EXPECT_NE(output.find("symbol=\"BTC\\\"USDT\\\\X\""), std::string::npos);
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_prometheus_format.cpp` around lines 85 - 154, Add a regression
test that verifies label values with special characters are escaped by
escapeLabelValue: create a new TEST_F (e.g., EscapedLabelValuesAreHandled) that
calls build(...) and/or dataLines(...) with an exchange and/or symbol containing
quotes, backslashes, and newlines (e.g., "ex\"\\\nch") and assert the output
contains the corresponding escaped label fragments (e.g.,
exchange="ex\\\"\\\\\\nch" or symbol="...") using EXPECT_NE(output.find(...),
std::string::npos); locate this around the other PrometheusFormatTest cases near
build() and dataLines() usages so it runs with the existing fixtures.
src/prometheus_format.hpp (1)

82-85: Cache escaped exchange once per build.

exchange is invariant during a scrape, but it is re-escaped for every data line.

💡 Proposed refactor
+    const std::string escapedExchange = escapeLabelValue(exchange);
     auto writeLine = [&](const char* name, const std::string& symbol, double value) {
-        ss << name << "{exchange=\"" << escapeLabelValue(exchange) << "\",symbol=\""
+        ss << name << "{exchange=\"" << escapedExchange << "\",symbol=\""
            << escapeLabelValue(symbol) << "\"} " << value << "\n";
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prometheus_format.hpp` around lines 82 - 85, The code repeatedly
re-escapes the invariant exchange for every metric line via
escapeLabelValue(exchange); cache the escaped value once (e.g., compute
std::string escapedExchange = escapeLabelValue(exchange) before defining or
entering the loop and update the lambda writeLine to use escapedExchange instead
of calling escapeLabelValue(exchange) each time) so writeLine(name, symbol,
value) concatenates "{exchange=\"" << escapedExchange << "\",symbol=\"" <<
escapeLabelValue(symbol) << "\"} ..." thereby avoiding redundant work; reference
the writeLine lambda and the escapeLabelValue(exchange) call to locate the
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/feed_handler.cpp`:
- Around line 46-50: The stop() shutdown can block because resyncThread's worker
uses long, non-interruptible backoff/sleep delays (e.g., 429/418 retry waits) so
when BinanceAdapter::stop() requests cancellation it still waits on
resyncThread.join; make backoff sleeps stop-aware or move retry logic out of
fetchAndApplySnapshot so the caller can honor std::stop_token: update the resync
worker loop and fetchAndApplySnapshot to accept a std::stop_token (or check a
shared atomic) and replace long std::this_thread::sleep_for calls and
non-interruptible backoffs with std::condition_variable_any::wait_until or
stop_token-aware sleeps that return early on stop, and ensure resyncThread is
signalled (resyncCv.notify_one) to wake any waits so resyncThread can exit
promptly before resyncThread = {} in BinanceAdapter::stop().

---

Outside diff comments:
In `@src/feed_handler.cpp`:
- Around line 373-379: If any future indicates failure, stop any partially
started components before returning false: after collecting results from futures
(the loop over futures and the allOk boolean in start()), if allOk is false
invoke the adapter's WebSocket and resync worker shutdown routines (e.g.,
websocket->stop()/shutdown()/close() and resyncWorker.stop()/join()/shutdown() —
use the actual member names in this file), wait for their termination (join
threads or await completion) and then return false so no partially initialized
resources are left running.

In `@src/main.cpp`:
- Around line 80-90: After canonicalizing each entry from config["symbols"] into
sym and before pushing into the symbols vector, detect duplicates and reject the
config: maintain a std::unordered_set<std::string> (e.g., seenSymbols) and if
sym already exists in seenSymbols log an error (same format as other checks) and
return -1; otherwise insert into seenSymbols and push_back into symbols. Update
the block that iterates config["symbols"] (the loop using entry, sym, symbols)
to perform this duplicate check so case-variants like "btcusdt" and "BTCUSDT"
are treated as the same symbol and rejected.

---

Nitpick comments:
In `@src/prometheus_format.hpp`:
- Around line 82-85: The code repeatedly re-escapes the invariant exchange for
every metric line via escapeLabelValue(exchange); cache the escaped value once
(e.g., compute std::string escapedExchange = escapeLabelValue(exchange) before
defining or entering the loop and update the lambda writeLine to use
escapedExchange instead of calling escapeLabelValue(exchange) each time) so
writeLine(name, symbol, value) concatenates "{exchange=\"" << escapedExchange <<
"\",symbol=\"" << escapeLabelValue(symbol) << "\"} ..." thereby avoiding
redundant work; reference the writeLine lambda and the
escapeLabelValue(exchange) call to locate the change.

In `@tests/test_prometheus_format.cpp`:
- Around line 85-154: Add a regression test that verifies label values with
special characters are escaped by escapeLabelValue: create a new TEST_F (e.g.,
EscapedLabelValuesAreHandled) that calls build(...) and/or dataLines(...) with
an exchange and/or symbol containing quotes, backslashes, and newlines (e.g.,
"ex\"\\\nch") and assert the output contains the corresponding escaped label
fragments (e.g., exchange="ex\\\"\\\\\\nch" or symbol="...") using
EXPECT_NE(output.find(...), std::string::npos); locate this around the other
PrometheusFormatTest cases near build() and dataLines() usages so it runs with
the existing fixtures.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: a1e44a0a-9c22-4b1c-a025-cf6b671e3990

📥 Commits

Reviewing files that changed from the base of the PR and between 3deac56 and 7d104fd.

📒 Files selected for processing (8)
  • src/binance_utils.hpp
  • src/feed_handler.cpp
  • src/feed_handler.hpp
  • src/i_exchange_adapter.hpp
  • src/main.cpp
  • src/metrics_server.hpp
  • src/prometheus_format.hpp
  • tests/test_prometheus_format.cpp
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/i_exchange_adapter.hpp
  • src/binance_utils.hpp

Comment thread src/feed_handler.cpp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant