feat(transport): add mantis-transport crate with blocking WS feed threads#19
feat(transport): add mantis-transport crate with blocking WS feed threads#19
Conversation
|
PR Opened, @Milerias Proceeding with Phase B: What's NextPhase B is exchange-specific connections — wiring the generic FeedThread to actual Polymarket and Binance endpoints. The key work:
|
Transport Layer PR Design ReviewOverall Direction
What This PR Gets Right Architecturally
Main Design Risks Introduced By This PR
Compatibility With Future Instrument Registry
Polymarket Rotation Compatibility
What Should Be Corrected Now
What Can Safely Wait For Later Phases
Final Recommendation
|
Benchmark ReportCommit: LinuxCPU: Latency (ns/op, lower is better)Single Push+Pop
Burst 100
Burst 1000
Batch 100
Batch 1000
Full Drain
Instructions per Op (lower is better)Full results (all fields)
macOSCPU: Latency (ns/op, lower is better)Single Push+Pop
Burst 100
Burst 1000
Batch 100
Batch 1000
Full Drain
Instructions per Op (lower is better)Full results (all fields)
Fixed-Point Arithmetic (mantis-fixed)LinuxCPU: checked_add
checked_div
checked_mul_trunc
display
mul_round_vs_trunc
parse
rescale
macOSCPU: checked_add
checked_div
checked_mul_trunc
display
mul_round_vs_trunc
parse
rescale
|
The rest (enforce hot/control split in types, finalize the normalization API) are Phase C concerns. Phase A is intentionally minimal. |
📝 WalkthroughWalkthroughAdds two new workspace crates—mantis-transport (blocking, CPU‑pinned WebSocket ingest layer with venue adapters and feed threading) and mantis-registry (instrument registry with bindings and reverse indexes). Also updates workspace metadata, license allowlist, adds tests, a Polymarket probe script, and a .gitignore entry. Changes
Sequence DiagramsequenceDiagram
participant Config as Venue Config
participant Adapter as Venue Adapter
participant FeedThread as FeedThread\n(Pinned)
participant Ws as WsConnection\n(TLS / Heartbeat)
participant Venue as Venue WebSocket
participant Callback as User Callback
Config->>Adapter: build FeedConfig + callback
Adapter->>FeedThread: spawn(config, on_message)
FeedThread->>FeedThread: apply affinity / tuning
loop Reconnect Loop
FeedThread->>Ws: WsConnection::connect(WsConfig)
Ws->>Venue: TLS handshake + WS upgrade
alt connected
Ws->>Venue: send subscribe_msg (optional)
loop Read Loop
FeedThread->>Ws: read_text()
alt Text message
Ws-->>FeedThread: Ok(Some(text))
FeedThread->>FeedThread: msg_count += 1
FeedThread->>Callback: on_message(&str)
alt Callback -> true
Callback-->>FeedThread: continue
else Callback -> false
Callback-->>FeedThread: stop -> exit
end
else Control / PONG / Non-text
Ws-->>FeedThread: Ok(None)
else Closed
Ws-->>FeedThread: WsError::Closed -> reconnect
end
end
else connect error
FeedThread->>FeedThread: exponential backoff + reconnects += 1
end
end
FeedThread->>FeedThread: await shutdown signal / join
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 13
🧹 Nitpick comments (3)
.gitignore (1)
50-50: Consider adding a descriptive comment for consistency.Other ignore patterns in this file have explanatory comments (e.g., lines 1-2, 6-7, 12-13). Adding a brief comment above
.claude/datapipelinewould improve maintainability and keep the file consistent with the established pattern.📋 Example
+# Claude development pipeline artifacts .claude/datapipeline🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.gitignore at line 50, Add a short descriptive comment above the .claude/datapipeline ignore entry in .gitignore explaining what that directory contains or why it's ignored (e.g., "Claude local data pipeline cache" or "generated datapipeline artifacts for Claude - do not commit"). Locate the line with the pattern ".claude/datapipeline" and insert a one-line comment immediately above it to match the existing explanatory style used for other entries.scripts/polymarket_ws_probe.py (1)
294-312: Optional: Remove extraneous f-string prefixes.Ruff flags several f-strings without placeholders (lines 294, 303, 306, 310). While functionally harmless, removing the
fprefix improves clarity.Example fix
- print(f" -> 1 Trade event") + print(" -> 1 Trade event")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/polymarket_ws_probe.py` around lines 294 - 312, Several print statements in the event_type handling branches use f-strings but contain no placeholders; remove the unnecessary f-prefix to satisfy linters. Locate the prints inside the event_type branches (e.g., the "-> 1 Trade event" print, the literal prints in the "best_bid_ask" branch that print labels like " best_bid:" and " best_ask:", and the literal bracketed messages in the cold-path and unknown-event branches) and change those print(...) calls from f-strings to plain string literals (print("...")) while keeping any prints that do use msg.get(...) as f-strings intact.crates/transport/src/feed.rs (1)
58-72: Use an actual randomness source for reconnect jitter.
Instant::now().elapsed()on a freshly createdInstantjust measures call overhead, so this "jitter" is really scheduler noise rather than deliberate spread. If herd avoidance matters, feed this calculation from a real RNG or a per-thread PRNG.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/transport/src/feed.rs` around lines 58 - 72, The delay() method currently derives jitter from Instant::now().elapsed() which is not true randomness; replace that with a proper RNG (e.g., thread-local or SmallRng) to compute jitter when jitter_factor > 0.0: use the feed struct fields (initial, max, jitter_factor) and the delay(attempt: u32) function to compute the base/capped duration as before, then draw a random value in [-jitter_range, +jitter_range] (where jitter_range = capped.as_millis() as f64 * jitter_factor) and add it to capped.as_millis(), finally returning Duration::from_millis(ms.max(100.0) as u64); ensure you import and seed the RNG appropriately (thread_rng() or per-thread PRNG) and avoid using Instant for randomness.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.gitignore:
- Line 50: The .gitignore currently ends without a trailing newline (the last
entry ".claude/datapipeline"), which breaks POSIX text-file conventions; open
the .gitignore and add a single newline character after the final line so the
file ends with a trailing newline, then save and commit the change so tools and
diffs no longer warn about the missing newline.
In `@crates/transport/README.md`:
- Around line 114-117: Clarify the contradictory safety statement: change the
wording to say that the crate root includes #![deny(unsafe_code)] so the default
build contains no unsafe code, and explicitly note that the optional "tuning"
feature enables a single, audited unsafe block used only to call
setsockopt(SO_BUSY_POLL); i.e., state that the guarantee is "no unsafe in the
default build" and that the lone unsafe call is behind the tuning feature and
has been audited. Reference the crate root attribute (#![deny(unsafe_code)]),
the "tuning" feature, and the setsockopt(SO_BUSY_POLL) call in the updated
sentence.
In `@crates/transport/src/binance/reference.rs`:
- Around line 58-67: spawn_reference_feed currently allows an empty
config.symbols which makes book_ticker_url(&[]) produce an invalid "/ws/" URL
and spawns a worker that will reconnect forever; validate the input early in
spawn_reference_feed (taking BinanceReferenceConfig and before calling
book_ticker_url) and return an Err(std::io::Error) (or appropriate error) when
config.symbols.is_empty(), so callers fail fast instead of returning
Ok(FeedHandle) and starting the background thread.
- Around line 17-24: book_ticker_url currently concatenates multiple streams
into /ws/stream1/stream2 which is invalid for Binance combined streams; update
the book_ticker_url function to emit a single-symbol URL using
"{WS_BASE_URL}/ws/{stream}" when symbols.len()==1 and to use the combined-stream
syntax "{WS_BASE_URL}/stream?streams=stream1/stream2/..." (or
"{WS_BASE_URL}/public/stream?streams=..." if you prefer the newer endpoint) when
symbols.len()>1, joining the same "{s}@bookTicker" pieces with '/' to build the
streams parameter.
In `@crates/transport/src/feed.rs`:
- Around line 172-189: The code calls config.tuning.apply_affinity() but never
applies busy-poll; after a successful WsConnection::connect(&config.ws) return
(inside the match arm that yields conn) call the tuning method that enables busy
polling (e.g., config.tuning.apply_busy_poll(&conn.socket) or the appropriate
SocketTuning::apply_busy_poll method) before resetting attempt to 0 and using
conn; ensure you reference the same tuning struct used earlier (config.tuning /
SocketTuning) and pass the connection's underlying socket or file descriptor so
busy_poll_us is actually applied on each successful reconnection.
- Around line 190-203: The reconnect path uses thread::sleep(delay) (with delay
= config.backoff.delay(attempt)) which is uninterruptible and lets shutdown()
block; replace the single long sleep with an interruptible wait: either loop in
small chunks (e.g., compute remaining = delay and while remaining > 0 { if
is_shutdown() { break } sleep(min(chunk, remaining)); remaining -= chunk })
checking the shared shutdown flag, or use a wait primitive
(Condvar::wait_timeout or parking_lot::Condvar or an AtomicBool +
tokio::sync::Notify) to wake early; update the places using thread::sleep(delay)
(the shown Err(e) branch and the similar block around lines 237-246) to use this
interruptible-wait pattern and ensure attempt and backoff logic remain unchanged
when interrupted.
- Around line 112-130: The public FeedThread::spawn signature currently exposes
FnMut(&str) as the durable transport boundary (via FeedThread, spawn,
FeedConfig, FeedHandle), which we must not stabilize; change this by marking the
API as provisional and preventing downstream use: add a clear doc comment on
FeedThread::spawn and the FnMut(&str) parameter stating it is
provisional/internal-only, annotate the function with #[doc(hidden)] (or
#[deprecated(note = "internal/provisional API - do not depend on")] if hiding is
inappropriate), and consider restricting visibility to pub(crate) for spawn (or
introduce a sealed callback trait) so parse/normalize/HotEvent emission stays on
the IO-thread side until a stable API is designed. Ensure the doc comment
references FeedThread, spawn, FeedConfig, and FeedHandle so callers see the
warning.
In `@crates/transport/src/polymarket/market.rs`:
- Around line 29-39: The subscribe_msg function currently hand-rolls JSON by
quoting token_ids (using token_ids and format!("\"{id}\"")), which fails to
escape special characters; instead build a proper JSON value and serialize it
with serde_json (e.g., construct a struct or serde_json::json! with assets_ids =
self.token_ids, type = "market", custom_feature_enabled = true) and return
serde_json::to_string(...) so all token_ids are correctly escaped and you can
drop the manual quote mapping.
In `@crates/transport/src/tuning.rs`:
- Line 44: The call to us.cast_signed() (producing val) uses a method stabilized
in Rust 1.87, but our workspace MSRV is 1.85; replace that call with an
MSRV-safe cast (e.g., cast the unsigned value `us` to the desired signed type
with `as` — matching the intended target type used later — or use an explicit
conversion helper) or alternatively bump the workspace `rust-version` to 1.87.
Locate the occurrence `let val = us.cast_signed();` in tuning.rs and either
change it to an explicit `as <signed_type>` cast that matches subsequent uses of
`val`, or update the workspace MSRV if you intend to rely on `cast_signed()`.
In `@crates/transport/src/ws.rs`:
- Around line 36-45: WsConfig::binance() currently sets a 30s text-ping
heartbeat and send_ping() always sends a text "PING", which incorrectly applies
Polymarket's heartbeat to Binance and relies on read_timeout for scheduling;
change the design so heartbeat behavior is venue-configurable: introduce a
heartbeat mode/interval on WsConfig (e.g., enum HeartbeatMode { None,
Text(String), Frame } plus Duration ping_interval or Option<Duration>) and
update WsConfig::binance() to set HeartbeatMode::None (or no ping_interval) for
Binance, while Polymarket config sets HeartbeatMode::Text("PING") with the 10s
cadence; modify send_ping() to branch on the new HeartbeatMode to send text
messages or rely on protocol-level pongs (or do nothing for None/Frame), and
rewrite the read loop to use an independent timer (tokio::time::interval or a
separate timeout task) that enforces the ping_interval and triggers send_ping()
on schedule rather than waiting on read()/read_timeout so heartbeats occur on
their own cadence.
In `@crates/transport/tests/feed_thread.rs`:
- Around line 118-123: Stop the feed worker before comparing the two counters:
call handle.shutdown() (or the worker join method if available) before loading
received and handle.msg_count so both samples are taken after the thread is
stopped; alternatively relax the equality check to
assert!(handle.msg_count.load(Ordering::Relaxed) >=
received.load(Ordering::Relaxed)) referencing the existing symbols received,
handle.msg_count, and handle.shutdown() to locate and update the test.
In `@crates/transport/tests/live_feeds.rs`:
- Around line 49-53: The test currently only asserts handle.is_running() while
subscribing with token_ids: vec![], which can pass during a perpetual reconnect
loop; update the test in live_feeds.rs to wait for a positive success signal
instead of mere liveness: subscribe with a known-valid token id (replace
token_ids: vec![] with a concrete token id), and assert a successful
subscription acknowledgement or the arrival of the first message for that token
(e.g. wait for a subscription ack event or a first-message callback from
FeedThread), or assert an explicit successful-connect metric emitted by the
feed; replace the single handle.is_running() check with a timed wait for that
ack/message and fail the test if it does not arrive within a short timeout.
In `@scripts/polymarket_ws_probe.py`:
- Around line 354-359: The timeout passed into asyncio.wait_for can become
negative if time advances between the while check and the call; in the loop
surrounding ws.recv() compute the timeout value once (e.g., timeout = deadline -
time.time()), clamp it to a non-negative value or skip/wrap if timeout <= 0, and
then call asyncio.wait_for(ws.recv(), timeout=min(5.0, timeout)) to ensure you
never pass a negative timeout to asyncio.wait_for; update the block using
ws.recv, asyncio.wait_for, and deadline accordingly.
---
Nitpick comments:
In @.gitignore:
- Line 50: Add a short descriptive comment above the .claude/datapipeline ignore
entry in .gitignore explaining what that directory contains or why it's ignored
(e.g., "Claude local data pipeline cache" or "generated datapipeline artifacts
for Claude - do not commit"). Locate the line with the pattern
".claude/datapipeline" and insert a one-line comment immediately above it to
match the existing explanatory style used for other entries.
In `@crates/transport/src/feed.rs`:
- Around line 58-72: The delay() method currently derives jitter from
Instant::now().elapsed() which is not true randomness; replace that with a
proper RNG (e.g., thread-local or SmallRng) to compute jitter when jitter_factor
> 0.0: use the feed struct fields (initial, max, jitter_factor) and the
delay(attempt: u32) function to compute the base/capped duration as before, then
draw a random value in [-jitter_range, +jitter_range] (where jitter_range =
capped.as_millis() as f64 * jitter_factor) and add it to capped.as_millis(),
finally returning Duration::from_millis(ms.max(100.0) as u64); ensure you import
and seed the RNG appropriately (thread_rng() or per-thread PRNG) and avoid using
Instant for randomness.
In `@scripts/polymarket_ws_probe.py`:
- Around line 294-312: Several print statements in the event_type handling
branches use f-strings but contain no placeholders; remove the unnecessary
f-prefix to satisfy linters. Locate the prints inside the event_type branches
(e.g., the "-> 1 Trade event" print, the literal prints in the "best_bid_ask"
branch that print labels like " best_bid:" and " best_ask:", and the literal
bracketed messages in the cold-path and unknown-event branches) and change those
print(...) calls from f-strings to plain string literals (print("...")) while
keeping any prints that do use msg.get(...) as f-strings intact.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 30fbe313-7dce-4971-bdac-c273973ff57d
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (16)
.gitignoreCargo.tomlcrates/transport/Cargo.tomlcrates/transport/README.mdcrates/transport/src/binance/mod.rscrates/transport/src/binance/reference.rscrates/transport/src/feed.rscrates/transport/src/lib.rscrates/transport/src/polymarket/market.rscrates/transport/src/polymarket/mod.rscrates/transport/src/tuning.rscrates/transport/src/ws.rscrates/transport/tests/feed_thread.rscrates/transport/tests/live_feeds.rsdeny.tomlscripts/polymarket_ws_probe.py
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
crates/transport/tests/feed_thread.rs (1)
118-123:⚠️ Potential issue | 🟡 MinorPotential race: counter comparison while feed is still running.
The
receivedandhandle.msg_countare sampled while the feed thread is active. A message arriving between the twoload()calls can causeassert_eqto fail intermittently. Either shut down before comparing, or relax the assertion.🔧 Suggested fix: relax assertion or stop first
tokio::time::sleep(Duration::from_millis(500)).await; + handle.shutdown(); let count = received.load(Ordering::Relaxed); assert!(count >= 10, "expected >= 10 messages, got {count}"); - assert_eq!(handle.msg_count.load(Ordering::Relaxed), u64::from(count)); - handle.shutdown(); + // After shutdown, both counters are stable + assert_eq!(handle.msg_count.load(Ordering::Relaxed), u64::from(count));Alternatively, keep shutdown at the end but relax to
>=:- assert_eq!(handle.msg_count.load(Ordering::Relaxed), u64::from(count)); + assert!(handle.msg_count.load(Ordering::Relaxed) >= u64::from(count));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/transport/tests/feed_thread.rs` around lines 118 - 123, The test samples two counters while the feed thread may still be running which can race: move handle.shutdown() to before reading/comparing counters (call handle.shutdown() then await any join if necessary), then load received and handle.msg_count and assert equality; alternatively relax the strict equality to assert!(handle.msg_count.load(Ordering::Relaxed) >= u64::from(received.load(Ordering::Relaxed))) if you prefer not to stop the feed. Ensure you reference the received atomic, handle.msg_count, and handle.shutdown() when making the change.
🧹 Nitpick comments (8)
crates/registry/src/types.rs (2)
6-13: Minor: Extra spaces in doc comments.Lines 6 and 10 have extra spaces before the opening parenthesis:
"( BTC/USDT)"and"( Polymarket Up/Down)".📝 Fix typos
- /// Spot market ( BTC/USDT). + /// Spot market (BTC/USDT). Spot, /// Perpetual futures. Perp, - /// Binary prediction market ( Polymarket Up/Down). + /// Binary prediction market (Polymarket Up/Down). PredictionBinary,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/registry/src/types.rs` around lines 6 - 13, The doc comments for the enum variants Spot and PredictionBinary contain stray spaces before the opening parentheses; update the comments in types.rs for the Spot variant to "Spot market (BTC/USDT)." and for PredictionBinary to "Binary prediction market (Polymarket Up/Down)." by removing the extra space before "(" so the parentheses are immediately adjacent to the preceding word.
81-91: Naming clarification:spot_referencecreatesOracleReferenceclass.The method is named
spot_referencebut setsclass: InstrumentClass::OracleReference. This is semantically correct (reference price from a spot market acting as oracle), but the dual terminology could confuse readers. Consider either renaming tooracle_referenceor adding a doc comment clarifying the relationship.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/registry/src/types.rs` around lines 81 - 91, The method spot_reference currently constructs an Instrument with class InstrumentClass::OracleReference which can be confusing; update the public API by adding a clarifying doc comment to the spot_reference function explaining that it returns an OracleReference derived from a spot market (or if you prefer a breaking change, rename the function to oracle_reference and update all call sites). Specifically, edit the spot_reference function's doc comment to state that spot_reference creates an OracleReference (reference price sourced from a spot market) so readers understand the dual terminology; include the function name spot_reference and the enum InstrumentClass::OracleReference in the comment so maintainers can find and review the change.crates/registry/src/bindings.rs (1)
21-34: Consider validatingwindow_start <= window_endinvariant.
PolymarketWindowBindingaccepts anyTimestampvalues without validation. Invalid windows wherewindow_start > window_endcould silently propagate. Consider adding a constructor that enforces this invariant, or document that validation occurs in the registry's bind methods.🛠️ Example constructor with validation
impl PolymarketWindowBinding { /// Creates a new window binding, validating that start <= end. pub fn new( token_id: String, market_slug: String, window_start: Timestamp, window_end: Timestamp, condition_id: Option<String>, ) -> Result<Self, &'static str> { if window_start > window_end { return Err("window_start must not exceed window_end"); } Ok(Self { token_id, market_slug, window_start, window_end, condition_id }) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/registry/src/bindings.rs` around lines 21 - 34, PolymarketWindowBinding currently allows window_start > window_end, so add a validated constructor (e.g., impl PolymarketWindowBinding::new) that checks if window_start <= window_end and returns an Err on violation, or alternatively update any factory functions that create PolymarketWindowBinding to perform this check before constructing; reference the struct name PolymarketWindowBinding and implement a new(...) method that validates the invariant and returns Result<Self, &'static str> (or propagate a crate error type) to prevent invalid windows from being created.crates/transport/README.md (1)
9-16: Add language specifiers to ASCII diagram code blocks.The architecture diagram (lines 9-16) and lifecycle diagram (lines 77-83) lack language specifiers. Adding
textwould satisfy markdown linters.📝 Suggested fix
-``` +```text VENUE FEEDS MANTIS HOT PATHAlso applies to: 77-83
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/transport/README.md` around lines 9 - 16, Add a markdown language specifier "text" to the ASCII diagram code fences so linters recognize them as plain text; locate the fenced blocks containing the diagram lines like "VENUE FEEDS MANTIS HOT PATH" and the lifecycle block around lines showing "Polymarket Market WS", "Polymarket User WS", "Binance Reference WS", "Timer" (and the second diagram at lines 77-83) and change the opening triple-backtick from ``` to ```text for both diagrams.crates/registry/README.md (1)
11-19: Add language specifiers to fenced code blocks for consistency.The ASCII diagram and workflow code blocks lack language specifiers. While these aren't actual code, adding
textor leaving empty with just triple backticks is cleaner for markdown linters.📝 Suggested fix
-``` +```text token_id "72160714677..." (changes every 15 min) ↓ registry.by_polymarket_token_id()Apply similar changes to the architecture diagram (line 23) and rotation workflow (line 75).
Also applies to: 23-31, 75-82
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/registry/README.md` around lines 11 - 19, Update the fenced code blocks in the README diagrams to include a language specifier (use "text") so markdown linters parse them consistently; specifically add ```text``` for the ASCII workflow block containing token_id "72160714677...", the block showing registry.by_polymarket_token_id(), registry.meta(), price_to_ticks(), qty_to_lots() (InstrumentId/InstrumentMeta/Ticks/Lots), and likewise add ```text``` to the architecture diagram and the rotation workflow blocks referenced around the other diagram sections.crates/registry/src/lib.rs (1)
96-100:active_polymarket_token_idsallocates a newVecon each call.For hot-path subscription scenarios, this could be optimized by returning an iterator. However, for WS subscription (control path, infrequent), this is acceptable.
♻️ Optional: Return an iterator for zero-allocation access
/// All currently active Polymarket token IDs (for WS subscription). - #[must_use] - pub fn active_polymarket_token_ids(&self) -> Vec<String> { - self.by_polymarket_token_id.keys().cloned().collect() - } + /// Returns an iterator over currently active Polymarket token IDs. + pub fn active_polymarket_token_ids(&self) -> impl Iterator<Item = &str> { + self.by_polymarket_token_id.keys().map(String::as_str) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/registry/src/lib.rs` around lines 96 - 100, The current active_polymarket_token_ids method allocates a Vec on each call; to avoid that, add a zero-allocation iterator variant (e.g. active_polymarket_token_ids_iter) that returns impl Iterator<Item=&String> by forwarding to self.by_polymarket_token_id.keys() (or the concrete Keys type) so callers can iterate without collecting; keep the existing Vec-returning active_polymarket_token_ids for compatibility if needed.crates/transport/src/feed.rs (1)
106-110:Dropsets shutdown flag but doesn't join the thread.When a
FeedHandleis dropped without callingshutdown(), the flag is set but the thread continues running until it checks the flag. This is intentional (non-blocking drop), but the thread may outlive the handle briefly. Consider documenting this behavior explicitly.📝 Optional: Add doc comment clarifying drop behavior
impl Drop for FeedHandle { + /// Sets the shutdown flag but does not block waiting for the thread to exit. + /// Call [`FeedHandle::shutdown()`] explicitly if you need to wait for termination. fn drop(&mut self) { self.shutdown.store(true, Ordering::Release); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/transport/src/feed.rs` around lines 106 - 110, The Drop impl for FeedHandle currently sets the shutdown flag via shutdown.store(true, Ordering::Release) but does not join or block on the background thread, which can let the thread briefly outlive the handle; add a clear doc comment on the FeedHandle type (and optionally on the Drop impl) describing that dropping is non-blocking: it flips the internal shutdown flag, does not join the worker thread, and users should call shutdown() when they need deterministic thread termination. Reference the FeedHandle struct, its shutdown field and method shutdown(), and the Drop impl to ensure the documentation is placed where users will see the intended lifetime semantics.crates/transport/src/ws.rs (1)
79-90: Consider logging unhandledMaybeTlsStreamvariants instead of silently skipping TCP tuning.The
MaybeTlsStreamenum is#[non_exhaustive], so while currently onlyPlainandRustlsvariants are available with the enabled features, future versions of tungstenite or feature changes (e.g., addingnative-tls) could introduce new variants. The wildcard pattern silently drops these cases, making TCP timeouts and nodelay configuration fail without visibility.Add a debug log to track if variants are unexpectedly unhandled:
🔧 Suggested improvement
let tcp_result = match ws.get_ref() { MaybeTlsStream::Plain(tcp) => Some(tcp), MaybeTlsStream::Rustls(tls) => Some(tls.get_ref()), _ => { + debug!("unhandled MaybeTlsStream variant, skipping TCP tuning"); None } };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/transport/src/ws.rs` around lines 79 - 90, The code silently ignores unhandled MaybeTlsStream variants when matching ws.get_ref(), so add a debug log when the match falls through to None to aid future debugging; specifically, after computing tcp_result (from MaybeTlsStream::Plain and ::Rustls) log a debug message (including the ws.get_ref() or a descriptive string) if tcp_result is None before returning, so callers see that TCP tuning via set_read_timeout and set_nodelay did not run; keep existing error handling for set_read_timeout and set_nodelay and reference WsError::Connect and config.read_timeout in the surrounding context to ensure logs tie to the same operation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/registry/src/error.rs`:
- Around line 39-45: The current byte-slice truncation in the Display arm for
DuplicatePolymarketTokenId (using &t[..t.len().min(20)]) can panic on multi-byte
UTF-8; replace it with a char-safe truncation: create a truncated string from t
via chars().take(20).collect::<String>() (optionally append "…" if
t.chars().count() > 20) and use that truncated value in the write! call inside
the DuplicatePolymarketTokenId match arm to avoid slicing by byte indices.
In `@crates/transport/src/ws.rs`:
- Around line 118-153: read_text can double-ping because maybe_send_ping() is
called at the top and the timeout branch unconditionally calls send_ping() when
ping_interval.is_some(); change the logic so a second ping is suppressed if a
ping was just sent — e.g. have maybe_send_ping() return a bool (or
update/send_ping to check a last_ping timestamp) and in read_text's timeout
branch only call send_ping() when maybe_send_ping() did not already send one (or
when last_ping is older than the configured interval), referencing read_text,
maybe_send_ping, send_ping and self.config.ping_interval.
---
Duplicate comments:
In `@crates/transport/tests/feed_thread.rs`:
- Around line 118-123: The test samples two counters while the feed thread may
still be running which can race: move handle.shutdown() to before
reading/comparing counters (call handle.shutdown() then await any join if
necessary), then load received and handle.msg_count and assert equality;
alternatively relax the strict equality to
assert!(handle.msg_count.load(Ordering::Relaxed) >=
u64::from(received.load(Ordering::Relaxed))) if you prefer not to stop the feed.
Ensure you reference the received atomic, handle.msg_count, and
handle.shutdown() when making the change.
---
Nitpick comments:
In `@crates/registry/README.md`:
- Around line 11-19: Update the fenced code blocks in the README diagrams to
include a language specifier (use "text") so markdown linters parse them
consistently; specifically add ```text``` for the ASCII workflow block
containing token_id "72160714677...", the block showing
registry.by_polymarket_token_id(), registry.meta(), price_to_ticks(),
qty_to_lots() (InstrumentId/InstrumentMeta/Ticks/Lots), and likewise add
```text``` to the architecture diagram and the rotation workflow blocks
referenced around the other diagram sections.
In `@crates/registry/src/bindings.rs`:
- Around line 21-34: PolymarketWindowBinding currently allows window_start >
window_end, so add a validated constructor (e.g., impl
PolymarketWindowBinding::new) that checks if window_start <= window_end and
returns an Err on violation, or alternatively update any factory functions that
create PolymarketWindowBinding to perform this check before constructing;
reference the struct name PolymarketWindowBinding and implement a new(...)
method that validates the invariant and returns Result<Self, &'static str> (or
propagate a crate error type) to prevent invalid windows from being created.
In `@crates/registry/src/lib.rs`:
- Around line 96-100: The current active_polymarket_token_ids method allocates a
Vec on each call; to avoid that, add a zero-allocation iterator variant (e.g.
active_polymarket_token_ids_iter) that returns impl Iterator<Item=&String> by
forwarding to self.by_polymarket_token_id.keys() (or the concrete Keys type) so
callers can iterate without collecting; keep the existing Vec-returning
active_polymarket_token_ids for compatibility if needed.
In `@crates/registry/src/types.rs`:
- Around line 6-13: The doc comments for the enum variants Spot and
PredictionBinary contain stray spaces before the opening parentheses; update the
comments in types.rs for the Spot variant to "Spot market (BTC/USDT)." and for
PredictionBinary to "Binary prediction market (Polymarket Up/Down)." by removing
the extra space before "(" so the parentheses are immediately adjacent to the
preceding word.
- Around line 81-91: The method spot_reference currently constructs an
Instrument with class InstrumentClass::OracleReference which can be confusing;
update the public API by adding a clarifying doc comment to the spot_reference
function explaining that it returns an OracleReference derived from a spot
market (or if you prefer a breaking change, rename the function to
oracle_reference and update all call sites). Specifically, edit the
spot_reference function's doc comment to state that spot_reference creates an
OracleReference (reference price sourced from a spot market) so readers
understand the dual terminology; include the function name spot_reference and
the enum InstrumentClass::OracleReference in the comment so maintainers can find
and review the change.
In `@crates/transport/README.md`:
- Around line 9-16: Add a markdown language specifier "text" to the ASCII
diagram code fences so linters recognize them as plain text; locate the fenced
blocks containing the diagram lines like "VENUE FEEDS
MANTIS HOT PATH" and the lifecycle block around lines showing "Polymarket Market
WS", "Polymarket User WS", "Binance Reference WS", "Timer" (and the second
diagram at lines 77-83) and change the opening triple-backtick from ``` to
```text for both diagrams.
In `@crates/transport/src/feed.rs`:
- Around line 106-110: The Drop impl for FeedHandle currently sets the shutdown
flag via shutdown.store(true, Ordering::Release) but does not join or block on
the background thread, which can let the thread briefly outlive the handle; add
a clear doc comment on the FeedHandle type (and optionally on the Drop impl)
describing that dropping is non-blocking: it flips the internal shutdown flag,
does not join the worker thread, and users should call shutdown() when they need
deterministic thread termination. Reference the FeedHandle struct, its shutdown
field and method shutdown(), and the Drop impl to ensure the documentation is
placed where users will see the intended lifetime semantics.
In `@crates/transport/src/ws.rs`:
- Around line 79-90: The code silently ignores unhandled MaybeTlsStream variants
when matching ws.get_ref(), so add a debug log when the match falls through to
None to aid future debugging; specifically, after computing tcp_result (from
MaybeTlsStream::Plain and ::Rustls) log a debug message (including the
ws.get_ref() or a descriptive string) if tcp_result is None before returning, so
callers see that TCP tuning via set_read_timeout and set_nodelay did not run;
keep existing error handling for set_read_timeout and set_nodelay and reference
WsError::Connect and config.read_timeout in the surrounding context to ensure
logs tie to the same operation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d99031cd-763b-44b8-9b90-679681e32afa
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
Cargo.tomlcrates/registry/Cargo.tomlcrates/registry/README.mdcrates/registry/src/bindings.rscrates/registry/src/error.rscrates/registry/src/lib.rscrates/registry/src/record.rscrates/registry/src/types.rscrates/transport/README.mdcrates/transport/src/binance/reference.rscrates/transport/src/feed.rscrates/transport/src/polymarket/market.rscrates/transport/src/ws.rscrates/transport/tests/feed_thread.rscrates/transport/tests/live_feeds.rs
✅ Files skipped from review due to trivial changes (2)
- crates/registry/src/record.rs
- crates/registry/Cargo.toml
🚧 Files skipped from review as they are similar to previous changes (2)
- Cargo.toml
- crates/transport/src/polymarket/market.rs
There was a problem hiding this comment.
🧹 Nitpick comments (3)
crates/transport/README.md (2)
77-83: Add language specifier to the lifecycle diagram.Same as above — use
textfor the ASCII lifecycle diagram.📝 Suggested fix
-``` +```text spawn() → connect → subscribe → read loop ──> callback🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/transport/README.md` around lines 77 - 83, The ASCII lifecycle diagram block lacks a language specifier; update the fenced code block that contains "spawn() → connect → subscribe → read loop ──> callback" (and mentions on error/backoff and shutdown()) to use the "text" language tag so it's treated as plain text (i.e., change the opening ``` to ```text for the diagram that includes spawn(), connect, subscribe, read loop, callback, and shutdown()).
9-16: Add language specifier to fenced code blocks.Static analysis flagged missing language specifiers. For ASCII diagrams, use
textorplaintextto satisfy linters while preserving readability.📝 Suggested fix
-``` +```text VENUE FEEDS MANTIS HOT PATH🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/transport/README.md` around lines 9 - 16, The fenced code block containing the ASCII diagram in the README (the triple-backtick block that starts the VENUE FEEDS / MANTIS HOT PATH diagram) is missing a language specifier; update the opening fence from ``` to ```text (or ```plaintext) so linters recognize it as plain text while preserving the ASCII diagram formatting.crates/transport/src/ws.rs (1)
153-158: Consider documenting the bypass caveat.Exposing
inner_mut()allows callers to bypass the configuredread_timeoutand heartbeat scheduling if they callws.read()directly. This is probably intentional for subscription updates, but a brief doc note could prevent misuse on the hot path.📝 Suggested doc improvement
/// Get a mutable reference to the underlying tungstenite socket. /// /// Useful for sending messages (e.g., dynamic subscription updates). + /// + /// **Note:** Calling `read()` directly on the returned socket bypasses + /// heartbeat scheduling and timeout handling — use `read_text()` for the + /// normal read loop. pub fn inner_mut(&mut self) -> &mut WebSocket<MaybeTlsStream<TcpStream>> {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/transport/src/ws.rs` around lines 153 - 158, Add a brief doc note to the inner_mut() method warning that returning a mutable reference to the underlying tungstenite WebSocket (inner_mut) lets callers bypass the transport's read_timeout and heartbeat scheduling (e.g., by calling ws.read() directly), and recommend using provided higher-level helpers or ensuring the caller respects timeouts/heartbeats when operating on the raw socket; reference the function name inner_mut and the underlying field ws in the comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@crates/transport/README.md`:
- Around line 77-83: The ASCII lifecycle diagram block lacks a language
specifier; update the fenced code block that contains "spawn() → connect →
subscribe → read loop ──> callback" (and mentions on error/backoff and
shutdown()) to use the "text" language tag so it's treated as plain text (i.e.,
change the opening ``` to ```text for the diagram that includes spawn(),
connect, subscribe, read loop, callback, and shutdown()).
- Around line 9-16: The fenced code block containing the ASCII diagram in the
README (the triple-backtick block that starts the VENUE FEEDS / MANTIS HOT PATH
diagram) is missing a language specifier; update the opening fence from ``` to
```text (or ```plaintext) so linters recognize it as plain text while preserving
the ASCII diagram formatting.
In `@crates/transport/src/ws.rs`:
- Around line 153-158: Add a brief doc note to the inner_mut() method warning
that returning a mutable reference to the underlying tungstenite WebSocket
(inner_mut) lets callers bypass the transport's read_timeout and heartbeat
scheduling (e.g., by calling ws.read() directly), and recommend using provided
higher-level helpers or ensuring the caller respects timeouts/heartbeats when
operating on the raw socket; reference the function name inner_mut and the
underlying field ws in the comment.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f091b326-0630-4997-97b3-83ecc2a76cc3
📒 Files selected for processing (3)
crates/transport/README.mdcrates/transport/src/ws.rscrates/transport/tests/feed_thread.rs
✅ Files skipped from review due to trivial changes (1)
- crates/transport/tests/feed_thread.rs
|
@Milerius please review and merge when ready |
Summary:
mantis-transportcrate — WebSocket transport ingest layerTest plan:
Summary by CodeRabbit
New Features
Documentation
Tests
Chores