-
Notifications
You must be signed in to change notification settings - Fork 6
feat: add height stream example and enhance hypersync-client with streaming capabilities #79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a new example crate Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App as Example binary
participant Client as Arc<Client>
participant Req as reqwest/EventSource
participant Server as Hypersync SSE
participant Chan as mpsc::Sender/Receiver
App->>Client: stream_height()
activate Client
Client->>Req: GET /height/sse (Accept: text/event-stream, Bearer?)
Req->>Server: HTTP request
Server-->>Req: 200 OK + SSE stream
Client->>Chan: spawn background task -> forward parsed events
deactivate Client
loop SSE events
Req-->>Client: SSE event (data: "123" or {"height":123} / ping / open)
Client->>Client: parse & normalize
alt height parsed
Client-->>Chan: Height(u64)
else connection opened
Client-->>Chan: Connected
else keep-alive / ping
Client-->>Client: ignore
end
end
alt connection lost / read timeout
Client->>Client: compute backoff delay
Client-->>Chan: Reconnecting{delay}
Client->>Req: reconnect after delay
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
Cargo.toml(1 hunks)examples/height_stream/Cargo.toml(1 hunks)examples/height_stream/src/main.rs(1 hunks)hypersync-client/Cargo.toml(1 hunks)hypersync-client/src/lib.rs(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
examples/height_stream/src/main.rs (1)
hypersync-client/src/lib.rs (1)
new(71-93)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: test_release
- GitHub Check: test_dev
🔇 Additional comments (4)
hypersync-client/Cargo.toml (1)
57-57: Enabling reqwest streaming support looks good.Adding the
streamfeature is necessary for accessingbytes_stream()and similar SSE helpers, so this change is spot on.Cargo.toml (1)
4-15: Thanks for registering the new example in the workspace.Including
examples/height_streamhere keeps the workspace consistent and ensures the example builds with the rest of the project.examples/height_stream/Cargo.toml (1)
1-10: Example crate manifest looks solid.The dependencies line up with the runtime you’re using, and the path link back to
hypersync-clientkeeps everything in sync.examples/height_stream/src/main.rs (1)
6-30: Nice, concise example.This is a clear illustration of the new API, and the error handling is straightforward.
hypersync-client/src/lib.rs
Outdated
| /// | ||
| /// Returns a channel receiver that yields `u64` heights. The sender task runs in the background | ||
| /// and closes the channel if the connection drops or an error occurs. Messages that cannot be | ||
| /// parsed are ignored. | ||
| pub async fn stream_height(self: Arc<Self>) -> Result<mpsc::Receiver<Result<u64>>> { | ||
| let mut url = self.url.clone(); | ||
| let mut segments = url.path_segments_mut().ok().context("get path segments")?; | ||
| segments.push("height"); | ||
| segments.push("sse"); | ||
| std::mem::drop(segments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the /height/stream endpoint path.
The doc comment (and PR objective) call out /height/stream, but the code currently appends "sse", so the client will receive a 404. Please point the request at the documented path instead.
- segments.push("height");
- segments.push("sse");
+ segments.push("height");
+ segments.push("stream");🤖 Prompt for AI Agents
In hypersync-client/src/lib.rs around lines 555 to 564 the path segments
appended for the height streaming endpoint use "sse" causing requests to hit the
wrong URL; change the appended segment from "sse" to "stream" so the client
targets "/height/stream" as documented (update the segments.push call
accordingly and keep the rest of the function unchanged).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (2)
hypersync-client/src/lib.rs (2)
628-631: Fix the endpoint path to use/height/stream.The code still appends
"sse"(line 630) instead of"stream", causing requests to hit the wrong URL. The PR objectives specify the/height/streamendpoint.Apply this diff:
let mut url = client.url.clone(); let mut segments = url.path_segments_mut().ok().unwrap(); segments.push("height"); - segments.push("sse"); + segments.push("stream"); std::mem::drop(segments);Also update the doc comment to match:
- /// Streams latest archive height updates from the server using the `/height/sse` SSE endpoint. + /// Streams latest archive height updates from the server using the `/height/stream` SSE endpoint.
553-561: Fix inconsistent timeout and delay documentation.Multiple documentation mismatches:
INITIAL_RECONNECT_DELAYis 200ms, but the comment at line 619 says "1 second" and line 828 shows pattern "0.5s → 1s → 2s..." which doesn't match (actual: 200ms → 400ms → 800ms → 1.6s → ...).The constant documentation says server pings every 5s with 15s timeout (3x), but the implementation comment at lines 684-685 says "Server sends keepalive pings every 20s" with "60s" timeout.
Apply this diff to align the documentation:
-const INITIAL_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_millis(200); +/// Initial delay before first reconnection attempt after connection loss +const INITIAL_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_millis(200); const MAX_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(30); const MAX_CONNECTION_AGE: std::time::Duration = std::time::Duration::from_secs(24 * 60 * 60); /// Timeout for detecting dead connections. Server sends keepalive pings every 5s (when idle), /// so we timeout after 15s (3x the ping interval) if no data is received.And update the implementation comment:
while connection_active { // Wait for next chunk with timeout to detect dead connections. - // Server sends keepalive pings every 20s, so if we don't receive - // ANY data (including pings) within 60s, the connection is dead. + // Server sends keepalive pings every 5s (when idle), so if we don't receive + // ANY data (including pings) within 15s, the connection is dead. let next_chunk = tokio::time::timeout(READ_TIMEOUT, byte_stream.next()).await;And fix the reconnection pattern comment:
// Double the delay for the next attempt, up to the maximum. - // Pattern: 0.5s → 1s → 2s → 4s → 8s → 16s → 30s (max) → 30s → ... + // Pattern: 200ms → 400ms → 800ms → 1.6s → 3.2s → 6.4s → 12.8s → 25.6s → 30s (max) reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY);
🧹 Nitpick comments (1)
hypersync-client/src/lib.rs (1)
783-790: Consider handling persistent parse failures.Setting
connection_active = falseon parse failure triggers reconnection, but if the server consistently sends malformed data, this creates an infinite reconnect loop.Consider either:
- Sending parse errors through the channel so the caller can decide:
} else { log::warn!( "❌ Failed to parse height: {}", data ); - connection_active = false; - continue; + // Optionally send parse error to caller + let _ = tx.send(Err(anyhow!("Failed to parse height: {}", data))).await; + // Continue processing other events }
- Or tracking consecutive failures:
let mut consecutive_parse_failures = 0; const MAX_PARSE_FAILURES: usize = 10; // In parse failure branch: consecutive_parse_failures += 1; if consecutive_parse_failures >= MAX_PARSE_FAILURES { log::error!("❌ Too many consecutive parse failures, reconnecting"); connection_active = false; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
hypersync-client/src/lib.rs(2 hunks)
🔇 Additional comments (2)
hypersync-client/src/lib.rs (2)
6-6: LGTM! Imports are correctly used.The new imports support the SSE streaming functionality:
StreamExtfor byte stream iteration,headerfor the Accept header, andDeserializefor payload parsing.Also applies to: 9-9, 11-11
610-834: Well-designed SSE streaming implementation.The overall architecture is solid:
- Proper use of
Arc<Self>and background task spawning for concurrent operation- Channel-based communication pattern is appropriate for streaming data
- SSE protocol parsing correctly follows W3C EventSource specification (event/data fields, blank line delimiters, comment handling)
- Exponential backoff reconnection strategy prevents server overload
- Read timeout detection handles silent connection failures
- Comprehensive logging aids debugging
The main issues flagged in other comments are implementation details that need fixing (endpoint path, unused code, documentation consistency).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
hypersync-client/src/lib.rs (2)
557-557: Fix the documentation to reference the correct endpoint path.The doc comment references
/height/sse, but based on the PR objectives and past review feedback, this should be/height/streamto match the intended API endpoint.- /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects + /// Establishes a long-lived SSE connection to `/height/stream` that automatically reconnects
594-596: Replace.expect()with proper error handling and fix the endpoint path.Two critical issues here:
Panic risk: The
.expect()will panic if the base URL is a cannot-be-a-base URL, silently terminating the background task.Wrong endpoint: The code pushes
"sse"but should push"stream"to match the documented/height/streamendpoint per the PR objectives.let mut url = client.url.clone(); - url.path_segments_mut() - .expect("base URL cannot be a cannot-be-a-base URL") - .extend(&["height", "sse"]); + match url.path_segments_mut() { + Some(mut segments) => { + segments.extend(&["height", "stream"]); + } + None => { + log::error!("❌ Invalid URL configuration: cannot append path segments"); + tokio::time::sleep(reconnect_delay).await; + reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); + continue; + } + }
🧹 Nitpick comments (1)
hypersync-client/src/lib.rs (1)
664-673: Consider continuing on parse errors rather than forcing reconnection.A single malformed height value triggers a full reconnect by setting
connection_active = false. This is overly aggressive—the server might send a valid height next. Consider logging the error and continuing to process subsequent events.if let Ok(h) = data.trim().parse::<u64>() { log::debug!("Height update: {}", h); if tx.send(Ok(h)).await.is_err() { log::info!("Receiver dropped, exiting"); return; } } else { log::warn!("Failed to parse height: {}", data); - connection_active = false; + // Continue processing; don't force reconnect on single parse error }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
Cargo.toml(1 hunks)examples/height_stream/src/main.rs(1 hunks)hypersync-client/src/lib.rs(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
examples/height_stream/src/main.rs (1)
hypersync-client/src/lib.rs (1)
new(70-92)
🔇 Additional comments (9)
Cargo.toml (1)
5-15: LGTM!The addition of the
examples/height_streamworkspace member is properly integrated into the workspace manifest.examples/height_stream/src/main.rs (2)
10-17: LGTM!The client initialization is clear and demonstrates the API effectively. The
.unwrap()on the hardcoded URL is acceptable for example code.
19-21: LGTM!Clean usage of the
stream_height()API with appropriate error propagation.hypersync-client/src/lib.rs (6)
6-6: LGTM!The new imports (
futures::StreamExtandreqwest::header) appropriately support the SSE streaming implementation.Also applies to: 9-9
547-552: LGTM!The timeout and reconnection constants are well-chosen: exponential backoff (200ms → 30s), 3× safety margin for keepalive detection, and reasonable 24-hour max connection age.
585-588: LGTM!The function signature and channel setup are well-designed:
Arc<Self>enables safe background task spawning, and the 16-message buffer provides reasonable backpressure.
639-640: LGTM!The UTF-8 handling correctly uses
String::from_utf8_lossy, which safely handles invalid UTF-8 sequences without panicking.
617-691: Well-structured SSE stream processing.The implementation correctly:
- Detects dead connections via READ_TIMEOUT (15s)
- Parses SSE event blocks delimited by
\n\n- Handles both
heightandpingevents- Gracefully handles stream errors and server closures
698-700: LGTM!The reconnection logic properly implements exponential backoff with a maximum delay cap, ensuring the client doesn't overwhelm the server while also not waiting too long to reconnect.
hypersync-client/src/lib.rs
Outdated
| } | ||
| } | ||
|
|
||
| const INITIAL_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_millis(200); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should make this 0, and only if initial reconnect fails then back off.
…eaming capabilities * Introduced a new example for streaming height updates in `examples/height_stream`. * Updated `hypersync-client` to support streaming from the `/height/stream` SSE endpoint. * Modified dependencies in `Cargo.toml` to include necessary libraries for the new functionality.
… appropriately when connection is lost.
f388202 to
0b3e7e4
Compare
hypersync-client/src/lib.rs
Outdated
| /// # Ok(()) | ||
| /// # } | ||
| /// ``` | ||
| pub async fn stream_height(self: Arc<Self>) -> Result<mpsc::Receiver<HeightStreamEvent>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we just make this a watch channel? Since that's what's actually coming from the events. Not guaranteeing intermediary updates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 - There are events about connecting and triggering a reconnect so maybe that wouldn't be ideal.
I get you that we only care about the latest height but in practice these will come through at a steady rate so the queue should never fill up.
hypersync-client/src/lib.rs
Outdated
| while let Some(idx) = buf.find("\n\n") { | ||
| let event_block = buf[..idx].to_string(); | ||
| buf.drain(..idx + 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned about how low level and unsafe this could be without breaking it up as a standalone function with unit tests.
I don't know all the lower level details about how chunks of these byte streams could be formed. So I'm hesitant to be confident in this.
Did you consider using a lib with tests like https://github.com/jpopesculian/eventsource-stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ended up trying: https://github.com/jpopesculian/reqwest-eventsource (by the same author, but newer/more maintained it seems).
I also looked at: https://github.com/launchdarkly/rust-eventsource-client - but the other one looked nicer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JasoonS perfect. reqwest-eventsource uses eventsource-stream internally. It's just an additional abstraction 👍🏼 perfectly happy with that for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
Cargo.toml(1 hunks)examples/height_stream/Cargo.toml(1 hunks)examples/height_stream/README.md(1 hunks)examples/height_stream/src/main.rs(1 hunks)hypersync-client/Cargo.toml(2 hunks)hypersync-client/src/lib.rs(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- examples/height_stream/README.md
🚧 Files skipped from review as they are similar to previous changes (3)
- examples/height_stream/Cargo.toml
- hypersync-client/Cargo.toml
- Cargo.toml
🧰 Additional context used
🧬 Code graph analysis (1)
examples/height_stream/src/main.rs (1)
hypersync-client/src/lib.rs (2)
new(74-78)event(770-773)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test_release
| fn get_delay(consecutive_failures: u32) -> Duration { | ||
| if consecutive_failures > 0 { | ||
| /// helper function to calculate 2^x | ||
| /// optimization using bit shifting | ||
| const fn two_to_pow(x: u32) -> u32 { | ||
| 1 << x | ||
| } | ||
| // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s | ||
| INITIAL_RECONNECT_DELAY | ||
| .saturating_mul(two_to_pow(consecutive_failures - 1)) | ||
| .min(MAX_RECONNECT_DELAY) | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix exponential backoff overflow panic.
two_to_pow(consecutive_failures - 1) shifts 1u32 by the retry count. Once consecutive_failures reaches 33 the shift amount becomes 32, triggering Rust’s runtime panic (“attempt to shift left with overflow”) and killing the background task. Please clamp the exponent (or switch to a wider checked shift) before multiplying so the stream can retry indefinitely without crashing.
- /// helper function to calculate 2^x
- /// optimization using bit shifting
- const fn two_to_pow(x: u32) -> u32 {
- 1 << x
- }
- // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
- INITIAL_RECONNECT_DELAY
- .saturating_mul(two_to_pow(consecutive_failures - 1))
- .min(MAX_RECONNECT_DELAY)
+ // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
+ let exponent = consecutive_failures.saturating_sub(1).min(8);
+ let multiplier = 1u32 << exponent;
+ INITIAL_RECONNECT_DELAY
+ .saturating_mul(multiplier)
+ .min(MAX_RECONNECT_DELAY)🤖 Prompt for AI Agents
In hypersync-client/src/lib.rs around lines 805 to 816, the exponential backoff
computes two_to_pow(consecutive_failures - 1) which will panic when shifting by
>=32; clamp the exponent before shifting (e.g. let exp =
(consecutive_failures.saturating_sub(1)).min(31)) or use a checked/wider shift
(cast to u64 and use checked_shl) and then compute the multiplier from that
clamped/checked value so the shift cannot overflow and the Duration
multiplication uses a safe u32 multiplier (or is cast safely) to preserve the
saturating/min behavior up to MAX_RECONNECT_DELAY.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
hypersync-client/src/lib.rs (2)
725-756: Fix the endpoint path to/height/stream.Line 732 still uses
"sse"but should use"stream"per the PR objectives and the unresolved past review. The current path will result in 404 errors.Apply this diff:
url.path_segments_mut() .ok() .context("invalid base URL")? - .extend(&["height", "sse"]); + .extend(&["height", "stream"]);
805-820: Fix exponential backoff overflow panic.The
two_to_pow(consecutive_failures - 1)will panic whenconsecutive_failures >= 33because1 << 32overflowsu32. This will crash the background streaming task. The test only checks up toconsecutive_failures = 10, missing this edge case.Apply this diff to clamp the exponent:
fn get_delay(consecutive_failures: u32) -> Duration { if consecutive_failures > 0 { - /// helper function to calculate 2^x - /// optimization using bit shifting - const fn two_to_pow(x: u32) -> u32 { - 1 << x - } // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s + let exponent = (consecutive_failures - 1).min(31); + let multiplier = 1u32 << exponent; INITIAL_RECONNECT_DELAY - .saturating_mul(two_to_pow(consecutive_failures - 1)) + .saturating_mul(multiplier) .min(MAX_RECONNECT_DELAY) } else { // On zero consecutive failures, 0 delay Duration::from_millis(0) } }
🧹 Nitpick comments (2)
hypersync-client/src/lib.rs (2)
822-870: Fix typo in comment.Line 829: "creat" should be "create".
Apply this diff:
- // should always be able to creat a new es stream + // should always be able to create a new es stream // something is wrong with the req builder otherwise let mut es = self.clone().get_es_stream().context("get es stream")?;
872-918: Update doc comment to reference correct endpoint.Line 874 doc comment says
/height/sse, but this should match the actual endpoint path (likely/height/streamonce Line 732 is fixed).Update the doc comment after fixing the endpoint path:
/// Streams archive height updates from the server via Server-Sent Events. /// - /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects + /// Establishes a long-lived SSE connection to `/height/stream` that automatically reconnects /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
hypersync-client/src/lib.rs(3 hunks)
🔇 Additional comments (5)
hypersync-client/src/lib.rs (5)
698-703: LGTM: Well-documented timeout constants.The constants are sensible, and the READ_TIMEOUT documentation clearly explains the 3× keepalive-interval rationale.
705-717: LGTM: Clean event enum design.The public
HeightStreamEventAPI is well-structured and provides clear connection lifecycle visibility to consumers.
758-783: LGTM: Proper event parsing with timeout protection.The timeout wrapping and event parsing logic correctly handle SSE events, including the keepalive pings.
785-803: LGTM: Clean event loop with proper receiver lifecycle handling.The event processing correctly tracks received events and exits gracefully when the receiver drops.
938-997: LGTM: Tests cover the happy path.The unit tests verify basic delay calculations and the integration test (appropriately ignored) validates the event stream lifecycle.
examples/height_stream.hypersync-clientto support streaming from the/height/streamSSE endpoint.Cargo.tomlto include necessary libraries for the new functionality.Summary by CodeRabbit
New Features
Examples
Chores