diff --git a/Cargo.toml b/Cargo.toml index 47e3e9c..fd8a906 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,14 +2,15 @@ resolver = "2" members = [ - "hypersync-client", - "hypersync-format", - "hypersync-net-types", - "hypersync-schema", - "examples/all_erc20", - "examples/wallet", - "examples/watch", - "examples/reverse_wallet", - "examples/call_watch", - "examples/call_decode_output", + "hypersync-client", + "hypersync-format", + "hypersync-net-types", + "hypersync-schema", + "examples/all_erc20", + "examples/wallet", + "examples/watch", + "examples/reverse_wallet", + "examples/call_watch", + "examples/call_decode_output", + "examples/height_stream", ] diff --git a/examples/height_stream/Cargo.toml b/examples/height_stream/Cargo.toml new file mode 100644 index 0000000..5616d3a --- /dev/null +++ b/examples/height_stream/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "height_stream" +version = "0.1.0" +edition = "2021" + +[dependencies] +hypersync-client = { path = "../../hypersync-client" } +tokio = { version = "1", features = ["full"] } +env_logger = "0.11" +anyhow = "1" diff --git a/examples/height_stream/README.md b/examples/height_stream/README.md new file mode 100644 index 0000000..a0f0c00 --- /dev/null +++ b/examples/height_stream/README.md @@ -0,0 +1,24 @@ +# Height Stream Example + +This example demonstrates using the `stream_height()` API to receive real-time height updates from a HyperSync server. + +## Running the Example + +```bash +# With connection events visible (no env_logger needed) +cargo run -p height_stream + +# With debug logs (shows keepalive pings, etc) +RUST_LOG=debug cargo run -p height_stream + +# With trace logs (shows all SSE bytes received) +RUST_LOG=trace cargo run -p height_stream +``` + +## API Design + +The stream emits `HeightStreamEvent` enum variants: + +- **`Connected`**: Emitted when successfully connected/reconnected to the server +- **`Height(u64)`**: Emitted for each height update received +- **`Reconnecting { delay }`**: Emitted when connection is lost, before waiting to reconnect diff --git a/examples/height_stream/src/main.rs b/examples/height_stream/src/main.rs new file mode 100644 index 0000000..d8db833 --- /dev/null +++ b/examples/height_stream/src/main.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use anyhow::Result; +use hypersync_client::{Client, ClientConfig, HeightStreamEvent}; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + + let client = Arc::new(Client::new(ClientConfig { + url: Some("https://arbitrum-sepolia.hypersync.xyz".parse().unwrap()), + ..Default::default() + })?); + + let mut rx = client.clone().stream_height(); + + println!("listening for height updates... (Ctrl+C to quit)"); + + while let Some(event) = rx.recv().await { + match event { + HeightStreamEvent::Connected => println!("✓ Connected to stream"), + HeightStreamEvent::Height(height) => println!("height: {}", height), + HeightStreamEvent::Reconnecting { delay } => { + println!("⟳ Reconnecting in {:?}...", delay) + } + } + } + + Ok(()) +} diff --git a/hypersync-client/Cargo.toml b/hypersync-client/Cargo.toml index 00545bb..b1f3400 100644 --- a/hypersync-client/Cargo.toml +++ b/hypersync-client/Cargo.toml @@ -30,6 +30,7 @@ tokio = { version = "1", default-features = false, features = [ "test-util", "rt", "macros", + "sync", ] } log = "0.4" fastrange-rs = "0.1" @@ -50,11 +51,12 @@ alloy-primitives = "1.1" hypersync-net-types = { path = "../hypersync-net-types", version = "0.11.0-rc.3" } hypersync-format = { path = "../hypersync-format", version = "0.5.8" } hypersync-schema = { path = "../hypersync-schema", version = "0.3" } +reqwest-eventsource = "0.6" [dependencies.reqwest] version = "0.12" default-features = false -features = ["json", "rustls-tls"] +features = ["json", "rustls-tls", "stream"] [dev-dependencies] maplit = "1" diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index 091c505..c019885 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -3,9 +3,12 @@ use std::{num::NonZeroU64, sync::Arc, time::Duration}; use anyhow::{anyhow, Context, Result}; +use futures::StreamExt; use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query}; use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk}; -use reqwest::Method; +use reqwest::{header, Method}; +use reqwest_eventsource::retry::ExponentialBackoff; +use reqwest_eventsource::{Event, EventSource}; mod column_mapping; mod config; @@ -692,6 +695,229 @@ impl Client { } } +/// 200ms +const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200); +const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); +/// Timeout for detecting dead connections. Server sends keepalive pings every 5s, +/// so we timeout after 15s (3x the ping interval). +const READ_TIMEOUT: Duration = Duration::from_secs(15); + +/// Events emitted by the height stream. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum HeightStreamEvent { + /// Successfully connected or reconnected to the SSE stream. + Connected, + /// Received a height update from the server. + Height(u64), + /// Connection lost, will attempt to reconnect after the specified delay. + Reconnecting { + /// Duration to wait before attempting reconnection. + delay: Duration, + }, +} + +enum InternalStreamEvent { + Publish(HeightStreamEvent), + Ping, + Unknown(String), +} + +impl Client { + fn get_es_stream(self: Arc) -> Result { + // Build the GET /height/sse request + let mut url = self.url.clone(); + url.path_segments_mut() + .ok() + .context("invalid base URL")? + .extend(&["height", "sse"]); + + let mut req = self + .http_client + .get(url) + .header(header::ACCEPT, "text/event-stream"); + + if let Some(bearer) = &self.bearer_token { + req = req.bearer_auth(bearer); + } + + // Configure exponential backoff for library-level retries + let retry_policy = ExponentialBackoff::new( + INITIAL_RECONNECT_DELAY, + 2.0, + Some(MAX_RECONNECT_DELAY), + None, // unlimited retries + ); + + // Turn the request into an EventSource stream with retries + let mut es = reqwest_eventsource::EventSource::new(req) + .context("unexpected error creating EventSource")?; + es.set_retry_policy(Box::new(retry_policy)); + Ok(es) + } + + async fn next_height(event_source: &mut EventSource) -> Result> { + let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next()) + .await + .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))? + else { + return Ok(None); + }; + + let e = match res.context("failed response")? { + Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected), + Event::Message(event) => match event.event.as_str() { + "height" => { + let height = event + .data + .trim() + .parse::() + .context("parsing height from event data")?; + InternalStreamEvent::Publish(HeightStreamEvent::Height(height)) + } + "ping" => InternalStreamEvent::Ping, + _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)), + }, + }; + + Ok(Some(e)) + } + + async fn stream_height_events( + es: &mut EventSource, + tx: &mpsc::Sender, + ) -> Result { + let mut received_an_event = false; + while let Some(event) = Self::next_height(es).await.context("failed next height")? { + match event { + InternalStreamEvent::Publish(event) => { + received_an_event = true; + if tx.send(event).await.is_err() { + return Ok(received_an_event); // Receiver dropped, exit task + } + } + InternalStreamEvent::Ping => (), // ignore pings + InternalStreamEvent::Unknown(_event) => (), // ignore unknown events + } + } + Ok(received_an_event) + } + + fn get_delay(consecutive_failures: u32) -> Duration { + if consecutive_failures > 0 { + /// helper function to calculate 2^x + /// optimization using bit shifting + const fn two_to_pow(x: u32) -> u32 { + 1 << x + } + // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s + INITIAL_RECONNECT_DELAY + .saturating_mul(two_to_pow(consecutive_failures - 1)) + .min(MAX_RECONNECT_DELAY) + } else { + // On zero consecutive failures, 0 delay + Duration::from_millis(0) + } + } + + async fn stream_height_events_with_retry( + self: Arc, + tx: &mpsc::Sender, + ) -> Result<()> { + let mut consecutive_failures = 0u32; + + loop { + // should always be able to creat a new es stream + // something is wrong with the req builder otherwise + let mut es = self.clone().get_es_stream().context("get es stream")?; + + match Self::stream_height_events(&mut es, tx).await { + Ok(received_an_event) => { + if received_an_event { + consecutive_failures = 0; // Reset after successful connection that then failed + } + log::trace!("Stream height exited"); + } + Err(e) => { + log::trace!("Stream height failed: {e:?}"); + } + } + + es.close(); + + // If the receiver is closed, exit the task + if tx.is_closed() { + break; + } + + let delay = Self::get_delay(consecutive_failures); + log::trace!("Reconnecting in {:?}...", delay); + + if tx + .send(HeightStreamEvent::Reconnecting { delay }) + .await + .is_err() + { + return Ok(()); // Receiver dropped, exit task + } + tokio::time::sleep(delay).await; + + // increment consecutive failures so that on the next try + // it will start using back offs + consecutive_failures += 1; + } + + Ok(()) + } + + /// Streams archive height updates from the server via Server-Sent Events. + /// + /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects + /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s). + /// + /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes + /// and height updates. This allows applications to display connection status to users. + /// + /// # Returns + /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection + /// lifecycle and sends events through this channel. + /// + /// # Example + /// ```no_run + /// # use std::sync::Arc; + /// # use hypersync_client::{Client, ClientConfig, HeightStreamEvent}; + /// # async fn example() -> anyhow::Result<()> { + /// let client = Arc::new(Client::new(ClientConfig { + /// url: Some("https://eth.hypersync.xyz".parse()?), + /// ..Default::default() + /// })?); + /// + /// let mut rx = client.stream_height(); + /// + /// while let Some(event) = rx.recv().await { + /// match event { + /// HeightStreamEvent::Connected => println!("Connected to stream"), + /// HeightStreamEvent::Height(h) => println!("Height: {}", h), + /// HeightStreamEvent::Reconnecting { delay } => { + /// println!("Reconnecting in {:?}...", delay) + /// } + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + pub fn stream_height(self: Arc) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(16); + + tokio::spawn(async move { + if let Err(e) = self.stream_height_events_with_retry(&tx).await { + log::error!("Stream height failed unexpectedly: {e:?}"); + } + }); + + rx + } +} + fn check_simple_stream_params(config: &StreamConfig) -> Result<()> { if config.event_signature.is_some() { return Err(anyhow!( @@ -708,3 +934,64 @@ fn check_simple_stream_params(config: &StreamConfig) -> Result<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_get_delay() { + assert_eq!( + Client::get_delay(0), + Duration::from_millis(0), + "starts with 0 delay" + ); + // powers of 2 backoff + assert_eq!(Client::get_delay(1), Duration::from_millis(200)); + assert_eq!(Client::get_delay(2), Duration::from_millis(400)); + assert_eq!(Client::get_delay(3), Duration::from_millis(800)); + // maxes out at 30s + assert_eq!( + Client::get_delay(9), + Duration::from_secs(30), + "max delay is 30s" + ); + assert_eq!( + Client::get_delay(10), + Duration::from_secs(30), + "max delay is 30s" + ); + } + + #[tokio::test] + #[ignore = "integration test with live hs server for height stream"] + async fn test_stream_height_events() -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel(16); + let handle = tokio::spawn(async move { + let client = Arc::new(Client::new(ClientConfig { + url: Some("https://monad-testnet.hypersync.xyz".parse()?), + ..Default::default() + })?); + let mut es = client.get_es_stream().context("get es stream")?; + Client::stream_height_events(&mut es, &tx).await + }); + + let val = rx.recv().await; + assert!(val.is_some()); + assert_eq!(val.unwrap(), HeightStreamEvent::Connected); + let Some(HeightStreamEvent::Height(height)) = rx.recv().await else { + panic!("should have received height") + }; + let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else { + panic!("should have received height") + }; + assert!(height2 > height); + drop(rx); + + let res = handle.await.expect("should have joined"); + let received_an_event = + res.expect("should have ended the stream gracefully after dropping rx"); + assert!(received_an_event, "should have received an event"); + + Ok(()) + } +}