From c6b02c2ebb631f85fb07fc827dea7848743ac2f4 Mon Sep 17 00:00:00 2001 From: Jason Date: Thu, 25 Sep 2025 17:48:58 +0200 Subject: [PATCH 01/10] feat: add height stream example and enhance hypersync-client with streaming capabilities * Introduced a new example for streaming height updates in `examples/height_stream`. * Updated `hypersync-client` to support streaming from the `/height/stream` SSE endpoint. * Modified dependencies in `Cargo.toml` to include necessary libraries for the new functionality. --- Cargo.toml | 21 ++--- examples/height_stream/Cargo.toml | 10 +++ examples/height_stream/src/main.rs | 30 +++++++ hypersync-client/Cargo.toml | 2 +- hypersync-client/src/lib.rs | 136 +++++++++++++++++++++++++++++ 5 files changed, 188 insertions(+), 11 deletions(-) create mode 100644 examples/height_stream/Cargo.toml create mode 100644 examples/height_stream/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 47e3e9c..fd8a906 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,14 +2,15 @@ resolver = "2" members = [ - "hypersync-client", - "hypersync-format", - "hypersync-net-types", - "hypersync-schema", - "examples/all_erc20", - "examples/wallet", - "examples/watch", - "examples/reverse_wallet", - "examples/call_watch", - "examples/call_decode_output", + "hypersync-client", + "hypersync-format", + "hypersync-net-types", + "hypersync-schema", + "examples/all_erc20", + "examples/wallet", + "examples/watch", + "examples/reverse_wallet", + "examples/call_watch", + "examples/call_decode_output", + "examples/height_stream", ] diff --git a/examples/height_stream/Cargo.toml b/examples/height_stream/Cargo.toml new file mode 100644 index 0000000..5616d3a --- /dev/null +++ b/examples/height_stream/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "height_stream" +version = "0.1.0" +edition = "2021" + +[dependencies] +hypersync-client = { path = "../../hypersync-client" } +tokio = { version = "1", features = ["full"] } +env_logger = "0.11" +anyhow = "1" diff --git a/examples/height_stream/src/main.rs b/examples/height_stream/src/main.rs new file mode 100644 index 0000000..9f6c27d --- /dev/null +++ b/examples/height_stream/src/main.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use anyhow::Result; +use hypersync_client::{Client, ClientConfig}; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + + let client = Arc::new(Client::new(ClientConfig { + url: Some("http://localhost:1131".parse().unwrap()), + ..Default::default() + })?); + + let mut rx = client.clone().stream_height().await?; + + println!("listening for height updates... (Ctrl+C to quit)"); + + while let Some(msg) = rx.recv().await { + match msg { + Ok(height) => println!("height: {}", height), + Err(e) => { + eprintln!("stream error: {e:?}"); + break; + } + } + } + + Ok(()) +} diff --git a/hypersync-client/Cargo.toml b/hypersync-client/Cargo.toml index 00545bb..65b4c62 100644 --- a/hypersync-client/Cargo.toml +++ b/hypersync-client/Cargo.toml @@ -54,7 +54,7 @@ hypersync-schema = { path = "../hypersync-schema", version = "0.3" } [dependencies.reqwest] version = "0.12" default-features = false -features = ["json", "rustls-tls"] +features = ["json", "rustls-tls", "stream"] [dev-dependencies] maplit = "1" diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index 091c505..a78d2af 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -3,9 +3,12 @@ use std::{num::NonZeroU64, sync::Arc, time::Duration}; use anyhow::{anyhow, Context, Result}; +use futures::StreamExt; use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query}; use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk}; +use reqwest::header; use reqwest::Method; +use serde::Deserialize; mod column_mapping; mod config; @@ -692,6 +695,139 @@ impl Client { } } +#[derive(Debug, Deserialize)] +struct HeightSsePayloadJson { + height: Option, +} + +impl Client { + /// Streams latest archive height updates from the server using the `/height/stream` SSE endpoint. + /// + /// Returns a channel receiver that yields `u64` heights. The sender task runs in the background + /// and closes the channel if the connection drops or an error occurs. Messages that cannot be + /// parsed are ignored. + pub async fn stream_height(self: Arc) -> Result>> { + let mut url = self.url.clone(); + let mut segments = url.path_segments_mut().ok().context("get path segments")?; + segments.push("height"); + segments.push("sse"); + std::mem::drop(segments); + + let mut req = self.http_client.request(Method::GET, url); + + if let Some(bearer_token) = &self.bearer_token { + req = req.bearer_auth(bearer_token); + } + + req = req + .header(header::ACCEPT, "text/event-stream") + // SSE is a long-lived request; use a long timeout to avoid body timeouts. + // TODO: Make this configurable - and much shorter! + .timeout(Duration::from_secs(24 * 60 * 60)); + + let res = req.send().await.context("execute http req")?; + + let status = res.status(); + if !status.is_success() { + return Err(anyhow!("http response status code {}", status)); + } + + let (tx, rx) = mpsc::channel(16); + let mut byte_stream = res.bytes_stream(); + + tokio::spawn(async move { + let mut buf = String::new(); + + while let Some(item) = byte_stream.next().await { + let bytes = match item { + Ok(b) => b, + Err(e) => { + let _ = tx.send(Err(anyhow!("sse stream error: {:?}", e))).await; + return; + } + }; + + use std::fmt::Write as _; + let chunk_str = match std::str::from_utf8(&bytes) { + Ok(s) => s, + Err(_) => { + let mut tmp = String::with_capacity(bytes.len() * 2); + for &b in bytes.as_ref() { + // lossily map non-utf8 to replacement + let _ = write!(&mut tmp, "{}", char::from(b)); + } + buf.push_str(&tmp); + continue; + } + }; + buf.push_str(chunk_str); + + // Normalize Windows CRLF to LF to simplify parsing + if buf.contains("\r\n") { + buf = buf.replace("\r\n", "\n"); + } + + // Process complete events separated by blank line + loop { + if let Some(idx) = buf.find("\n\n") { + let event_block = buf[..idx].to_string(); + buf.drain(..idx + 2); + + // Parse a single SSE event block + let mut event_name: Option<&str> = None; + let mut data_lines: Vec<&str> = Vec::new(); + + for line in event_block.lines() { + if line.is_empty() { + continue; + } + if line.starts_with(':') { + // comment/keep-alive + continue; + } + if let Some(rest) = line.strip_prefix("event:") { + event_name = Some(rest.trim()); + continue; + } + if let Some(rest) = line.strip_prefix("data:") { + data_lines.push(rest.trim()); + continue; + } + // ignore other fields like id + } + + let name = event_name.unwrap_or(""); + if name == "height" { + let data = data_lines.join("\n"); + // Preferred: parse plain integer body + if let Ok(h) = data.trim().parse::() { + if tx.send(Ok(h)).await.is_err() { + return; + } + } else { + // Backward compatibility: parse {"height": N} + if let Ok(payload) = + serde_json::from_str::(&data) + { + if let Some(h) = payload.height { + if tx.send(Ok(h)).await.is_err() { + return; + } + } + } + } + } + } else { + break; + } + } + } + }); + + Ok(rx) + } +} + fn check_simple_stream_params(config: &StreamConfig) -> Result<()> { if config.event_signature.is_some() { return Err(anyhow!( From b195efe2560f90d8c8e58bb933ed2ade4f821961 Mon Sep 17 00:00:00 2001 From: Jason Date: Mon, 27 Oct 2025 22:27:58 +0200 Subject: [PATCH 02/10] update client to reconnect correctly --- Cargo.toml | 1 + examples/height_stream/src/main.rs | 8 +- hypersync-client/src/lib.rs | 343 ++++++++++++++++++++--------- 3 files changed, 246 insertions(+), 106 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fd8a906..d2663f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,5 @@ members = [ "examples/call_watch", "examples/call_decode_output", "examples/height_stream", + "examples/height_stream_reconnect", ] diff --git a/examples/height_stream/src/main.rs b/examples/height_stream/src/main.rs index 9f6c27d..83e907f 100644 --- a/examples/height_stream/src/main.rs +++ b/examples/height_stream/src/main.rs @@ -8,7 +8,11 @@ async fn main() -> Result<()> { env_logger::init(); let client = Arc::new(Client::new(ClientConfig { - url: Some("http://localhost:1131".parse().unwrap()), + url: Some( + "https://arbitrum-sepolia.zone1.hypersync.xyz" + .parse() + .unwrap(), + ), ..Default::default() })?); @@ -20,7 +24,7 @@ async fn main() -> Result<()> { match msg { Ok(height) => println!("height: {}", height), Err(e) => { - eprintln!("stream error: {e:?}"); + eprintln!("stream error - will automatically reconnect: {e:?}"); break; } } diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index a78d2af..6e0a31c 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -700,127 +700,262 @@ struct HeightSsePayloadJson { height: Option, } +const INITIAL_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_millis(200); +const MAX_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(30); +const MAX_CONNECTION_AGE: std::time::Duration = std::time::Duration::from_secs(24 * 60 * 60); + impl Client { - /// Streams latest archive height updates from the server using the `/height/stream` SSE endpoint. + /// Streams latest archive height updates from the server using the `/height/sse` SSE endpoint. + /// + /// # Overview + /// This function establishes a long-lived Server-Sent Events (SSE) connection to continuously + /// receive height updates from the hypersync server. The connection is resilient and will + /// automatically reconnect if it drops due to network issues, server restarts, or shutdowns. + /// + /// # Returns + /// Returns a channel receiver that yields `Result` heights. A background task manages + /// the connection lifecycle and sends height updates through this channel. + /// + /// # Connection Management + /// - **Automatic Reconnection**: If the connection drops, the client automatically attempts + /// to reconnect with exponential backoff (1s → 2s → 4s → ... → max 30s) + /// - **Graceful Shutdown**: When the server closes the stream (e.g., during restart), the + /// client detects it and reconnects immediately + /// - **Error Handling**: Connection errors are logged and don't terminate the stream + /// + /// # SSE Protocol Details + /// The function parses SSE messages according to the W3C EventSource spec: + /// - Messages are separated by blank lines (`\n\n`) + /// - Each message can have `event:` and `data:` fields + /// - Keep-alive comments (`:ping`) are ignored + /// - Only `event:height` messages are processed + /// + /// # Example + /// ```no_run + /// # use std::sync::Arc; + /// # use hypersync_client::{Client, ClientConfig}; + /// # async fn example() -> anyhow::Result<()> { + /// let client = Arc::new(Client::new(ClientConfig { + /// url: Some("https://eth.hypersync.xyz".parse()?), + /// ..Default::default() + /// })?); + /// + /// let mut rx = client.stream_height().await?; /// - /// Returns a channel receiver that yields `u64` heights. The sender task runs in the background - /// and closes the channel if the connection drops or an error occurs. Messages that cannot be - /// parsed are ignored. + /// while let Some(result) = rx.recv().await { + /// match result { + /// Ok(height) => println!("Height: {}", height), + /// Err(e) => eprintln!("Error: {}", e), + /// } + /// } + /// # Ok(()) + /// # } + /// ``` pub async fn stream_height(self: Arc) -> Result>> { - let mut url = self.url.clone(); - let mut segments = url.path_segments_mut().ok().context("get path segments")?; - segments.push("height"); - segments.push("sse"); - std::mem::drop(segments); - - let mut req = self.http_client.request(Method::GET, url); - - if let Some(bearer_token) = &self.bearer_token { - req = req.bearer_auth(bearer_token); - } - - req = req - .header(header::ACCEPT, "text/event-stream") - // SSE is a long-lived request; use a long timeout to avoid body timeouts. - // TODO: Make this configurable - and much shorter! - .timeout(Duration::from_secs(24 * 60 * 60)); - - let res = req.send().await.context("execute http req")?; - - let status = res.status(); - if !status.is_success() { - return Err(anyhow!("http response status code {}", status)); - } - + // Create a channel for sending height updates from the background task to the caller. + // Buffer size of 16 allows for some burst handling without blocking the sender. let (tx, rx) = mpsc::channel(16); - let mut byte_stream = res.bytes_stream(); + let client = self.clone(); + // Spawn a background task that manages the SSE connection lifecycle. + // This task runs indefinitely, handling reconnections automatically. tokio::spawn(async move { - let mut buf = String::new(); - - while let Some(item) = byte_stream.next().await { - let bytes = match item { - Ok(b) => b, - Err(e) => { - let _ = tx.send(Err(anyhow!("sse stream error: {:?}", e))).await; - return; - } - }; - - use std::fmt::Write as _; - let chunk_str = match std::str::from_utf8(&bytes) { - Ok(s) => s, - Err(_) => { - let mut tmp = String::with_capacity(bytes.len() * 2); - for &b in bytes.as_ref() { - // lossily map non-utf8 to replacement - let _ = write!(&mut tmp, "{}", char::from(b)); - } - buf.push_str(&tmp); - continue; - } - }; - buf.push_str(chunk_str); - - // Normalize Windows CRLF to LF to simplify parsing - if buf.contains("\r\n") { - buf = buf.replace("\r\n", "\n"); + // Reconnection delay starts at 1 second and doubles on each failure (exponential backoff). + // This prevents hammering the server when it's down or restarting. + let mut reconnect_delay = INITIAL_RECONNECT_DELAY; + + // Main reconnection loop - runs forever, attempting to maintain a connection. + loop { + // === STEP 1: Build the SSE endpoint URL === + // Construct the full URL path: /height/sse + let mut url = client.url.clone(); + let mut segments = url.path_segments_mut().ok().unwrap(); + segments.push("height"); + segments.push("sse"); + std::mem::drop(segments); // Release the mutable borrow on url + + // === STEP 2: Prepare the HTTP GET request === + let mut req = client.http_client.request(Method::GET, url); + + // Add bearer token authentication if configured + if let Some(bearer_token) = &client.bearer_token { + req = req.bearer_auth(bearer_token); } - // Process complete events separated by blank line - loop { - if let Some(idx) = buf.find("\n\n") { - let event_block = buf[..idx].to_string(); - buf.drain(..idx + 2); - - // Parse a single SSE event block - let mut event_name: Option<&str> = None; - let mut data_lines: Vec<&str> = Vec::new(); - - for line in event_block.lines() { - if line.is_empty() { - continue; - } - if line.starts_with(':') { - // comment/keep-alive - continue; - } - if let Some(rest) = line.strip_prefix("event:") { - event_name = Some(rest.trim()); - continue; - } - if let Some(rest) = line.strip_prefix("data:") { - data_lines.push(rest.trim()); - continue; - } - // ignore other fields like id + // Configure request headers and timeout. + // SSE connections are long-lived, so we use a 24-hour timeout to prevent + // the HTTP client from terminating the connection prematurely. + req = req + .header(header::ACCEPT, "text/event-stream") + .timeout(MAX_CONNECTION_AGE); + + // === STEP 3: Attempt to establish the SSE connection === + match req.send().await { + Ok(res) => { + let status = res.status(); + + // Check for HTTP errors (non-2xx status codes) + if !status.is_success() { + log::warn!("❌ HTTP error: status code {}", status); + + // Wait before retrying with exponential backoff + tokio::time::sleep(reconnect_delay).await; + reconnect_delay = + std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); + continue; // Retry the connection } - let name = event_name.unwrap_or(""); - if name == "height" { - let data = data_lines.join("\n"); - // Preferred: parse plain integer body - if let Ok(h) = data.trim().parse::() { - if tx.send(Ok(h)).await.is_err() { - return; - } - } else { - // Backward compatibility: parse {"height": N} - if let Ok(payload) = - serde_json::from_str::(&data) - { - if let Some(h) = payload.height { - if tx.send(Ok(h)).await.is_err() { - return; + // Successfully connected! + log::info!("✅ Connected to height SSE stream"); + + // Reset reconnection delay after successful connection + reconnect_delay = INITIAL_RECONNECT_DELAY; + + // === STEP 4: Process the SSE byte stream === + // Get the response body as a stream of bytes + let mut byte_stream = res.bytes_stream(); + + // Buffer for accumulating incomplete SSE messages. + // SSE messages are text-based and separated by blank lines (\n\n). + let mut buf = String::new(); + + // Flag to track if the connection is still active + let mut connection_active = true; + + // Main message processing loop - runs until the connection drops + while connection_active { + match byte_stream.next().await { + // === Successfully received bytes from the stream === + Some(Ok(bytes)) => { + log::trace!( + "📦 Received {} bytes from SSE stream", + bytes.len() + ); + + use std::fmt::Write as _; + + // Convert bytes to UTF-8 string + let chunk_str = match std::str::from_utf8(&bytes) { + Ok(s) => s, + Err(_) => { + // Handle invalid UTF-8 by doing lossy conversion. + // This is rare but can happen with network corruption. + let mut tmp = String::with_capacity(bytes.len() * 2); + for &b in bytes.as_ref() { + let _ = write!(&mut tmp, "{}", char::from(b)); + } + buf.push_str(&tmp); + continue; + } + }; + + // Append the new chunk to our buffer + buf.push_str(chunk_str); + + // === STEP 5: Parse complete SSE messages === + // SSE messages are separated by blank lines (\n\n). + // Process all complete messages currently in the buffer. + loop { + if let Some(idx) = buf.find("\n\n") { + // Extract one complete SSE message + let event_block = buf[..idx].to_string(); + buf.drain(..idx + 2); // Remove message + blank line from buffer + + // Parse the SSE event fields according to the W3C spec + let mut event_name: Option<&str> = None; + let mut data_lines: Vec<&str> = Vec::new(); + + // Process each line in the event block + for line in event_block.lines() { + if line.is_empty() { + continue; + } + if line.starts_with(':') { + // Comment line (used for keep-alive pings). + // Format: ": ping" or ": " + continue; + } + if let Some(rest) = line.strip_prefix("event:") { + // Event type field. + // Format: "event: height" + event_name = Some(rest.trim()); + continue; + } + if let Some(rest) = line.strip_prefix("data:") { + // Data field (can be multiple per event). + // Format: "data: 12345" + data_lines.push(rest.trim()); + continue; + } + // Ignore other SSE fields like "id:" and "retry:" + } + + // === STEP 6: Process height events === + let name = event_name.unwrap_or(""); + if name == "height" { + // Combine multiple data lines (though typically just one) + let data = data_lines.join("\n"); + + // Try parsing as plain integer (preferred format). + // Server sends: event:height\ndata:12345\n\n + if let Ok(h) = data.trim().parse::() { + log::debug!("📈 Height update: {}", h); + + // Send the height through the channel. + // If the receiver is dropped, exit the task gracefully. + if tx.send(Ok(h)).await.is_err() { + log::info!( + "Receiver dropped, exiting stream task" + ); + return; + } + } else { + log::warn!( + "❌ Failed to parse height: {}", + data + ); + connection_active = false; + continue; + } + } + } else { + // No complete message in buffer yet, wait for more data + break; } } } + + // === Stream error occurred (network issue, timeout, etc.) === + Some(Err(e)) => { + log::warn!("⚠️ SSE stream error: {:?}", e); + connection_active = false; // Exit loop and reconnect + } + + // === Stream ended (server closed the connection) === + // This happens during server restarts, shutdowns, or SIGTERM simulation + None => { + log::info!("🔌 SSE stream closed by server, will reconnect"); + connection_active = false; // Exit loop and reconnect + } } } - } else { - break; + } + + // === Failed to establish HTTP connection === + Err(e) => { + log::warn!("❌ Failed to connect to height stream: {:?}", e); } } + + // === STEP 7: Wait before reconnecting === + // After any disconnection (graceful or error), wait before attempting to reconnect. + // This implements exponential backoff to avoid overwhelming the server. + log::info!("⏳ Reconnecting in {:?}...", reconnect_delay); + tokio::time::sleep(reconnect_delay).await; + + // Double the delay for the next attempt, up to the maximum. + // Pattern: 0.5s → 1s → 2s → 4s → 8s → 16s → 30s (max) → 30s → ... + reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); } }); From 9b96da63e5c45c6ecf09281fd98c3f1f83dcf35e Mon Sep 17 00:00:00 2001 From: Jason Date: Thu, 6 Nov 2025 15:52:01 +0200 Subject: [PATCH 03/10] Disconnect if no pings or messages for 15 seconds --- hypersync-client/src/lib.rs | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index 6e0a31c..3077edb 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -703,6 +703,12 @@ struct HeightSsePayloadJson { const INITIAL_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_millis(200); const MAX_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(30); const MAX_CONNECTION_AGE: std::time::Duration = std::time::Duration::from_secs(24 * 60 * 60); +/// Timeout for detecting dead connections. Server sends keepalive pings every 5s (when idle), +/// so we timeout after 15s (3x the ping interval) if no data is received. +/// This allows the client to detect server crashes that don't trigger graceful shutdown. +/// Note: The server uses smart keepalive - it only pings when no height updates occur, +/// so active chains won't generate unnecessary pings. +const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); impl Client { /// Streams latest archive height updates from the server using the `/height/sse` SSE endpoint. @@ -824,9 +830,24 @@ impl Client { // Main message processing loop - runs until the connection drops while connection_active { - match byte_stream.next().await { + // Wait for next chunk with timeout to detect dead connections. + // Server sends keepalive pings every 20s, so if we don't receive + // ANY data (including pings) within 60s, the connection is dead. + let next_chunk = + tokio::time::timeout(READ_TIMEOUT, byte_stream.next()).await; + + match next_chunk { + // === Timeout: no data received within READ_TIMEOUT === + Err(_) => { + log::warn!( + "⏱️ No data received for {:?}, connection appears dead", + READ_TIMEOUT + ); + connection_active = false; + } + // === Successfully received bytes from the stream === - Some(Ok(bytes)) => { + Ok(Some(Ok(bytes))) => { log::trace!( "📦 Received {} bytes from SSE stream", bytes.len() @@ -926,14 +947,14 @@ impl Client { } // === Stream error occurred (network issue, timeout, etc.) === - Some(Err(e)) => { + Ok(Some(Err(e))) => { log::warn!("⚠️ SSE stream error: {:?}", e); connection_active = false; // Exit loop and reconnect } // === Stream ended (server closed the connection) === // This happens during server restarts, shutdowns, or SIGTERM simulation - None => { + Ok(None) => { log::info!("🔌 SSE stream closed by server, will reconnect"); connection_active = false; // Exit loop and reconnect } From 34acac751016312b5002f7b4337c9b10d5e4b4b6 Mon Sep 17 00:00:00 2001 From: Jason Date: Fri, 7 Nov 2025 13:50:08 +0200 Subject: [PATCH 04/10] Update client, cleanup and get ready for prod --- Cargo.toml | 1 - examples/height_stream/src/main.rs | 2 +- hypersync-client/src/lib.rs | 228 +++++++---------------------- 3 files changed, 51 insertions(+), 180 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d2663f9..fd8a906 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,5 +13,4 @@ members = [ "examples/call_watch", "examples/call_decode_output", "examples/height_stream", - "examples/height_stream_reconnect", ] diff --git a/examples/height_stream/src/main.rs b/examples/height_stream/src/main.rs index 83e907f..8baf56f 100644 --- a/examples/height_stream/src/main.rs +++ b/examples/height_stream/src/main.rs @@ -9,7 +9,7 @@ async fn main() -> Result<()> { let client = Arc::new(Client::new(ClientConfig { url: Some( - "https://arbitrum-sepolia.zone1.hypersync.xyz" + "https://arbitrum-sepolia.hypersync.xyz" .parse() .unwrap(), ), diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index 3077edb..b0c16c9 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -8,7 +8,6 @@ use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Que use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk}; use reqwest::header; use reqwest::Method; -use serde::Deserialize; mod column_mapping; mod config; @@ -695,46 +694,22 @@ impl Client { } } -#[derive(Debug, Deserialize)] -struct HeightSsePayloadJson { - height: Option, -} - const INITIAL_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_millis(200); const MAX_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(30); const MAX_CONNECTION_AGE: std::time::Duration = std::time::Duration::from_secs(24 * 60 * 60); -/// Timeout for detecting dead connections. Server sends keepalive pings every 5s (when idle), -/// so we timeout after 15s (3x the ping interval) if no data is received. -/// This allows the client to detect server crashes that don't trigger graceful shutdown. -/// Note: The server uses smart keepalive - it only pings when no height updates occur, -/// so active chains won't generate unnecessary pings. +/// Timeout for detecting dead connections. Server sends keepalive pings every 5s, +/// so we timeout after 15s (3x the ping interval). const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); impl Client { - /// Streams latest archive height updates from the server using the `/height/sse` SSE endpoint. + /// Streams archive height updates from the server via Server-Sent Events. /// - /// # Overview - /// This function establishes a long-lived Server-Sent Events (SSE) connection to continuously - /// receive height updates from the hypersync server. The connection is resilient and will - /// automatically reconnect if it drops due to network issues, server restarts, or shutdowns. + /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects + /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s). /// /// # Returns - /// Returns a channel receiver that yields `Result` heights. A background task manages - /// the connection lifecycle and sends height updates through this channel. - /// - /// # Connection Management - /// - **Automatic Reconnection**: If the connection drops, the client automatically attempts - /// to reconnect with exponential backoff (1s → 2s → 4s → ... → max 30s) - /// - **Graceful Shutdown**: When the server closes the stream (e.g., during restart), the - /// client detects it and reconnects immediately - /// - **Error Handling**: Connection errors are logged and don't terminate the stream - /// - /// # SSE Protocol Details - /// The function parses SSE messages according to the W3C EventSource spec: - /// - Messages are separated by blank lines (`\n\n`) - /// - Each message can have `event:` and `data:` fields - /// - Keep-alive comments (`:ping`) are ignored - /// - Only `event:height` messages are processed + /// Channel receiver yielding `Result` heights. The background task handles connection + /// lifecycle and sends updates through this channel. /// /// # Example /// ```no_run @@ -758,224 +733,120 @@ impl Client { /// # } /// ``` pub async fn stream_height(self: Arc) -> Result>> { - // Create a channel for sending height updates from the background task to the caller. - // Buffer size of 16 allows for some burst handling without blocking the sender. let (tx, rx) = mpsc::channel(16); let client = self.clone(); - // Spawn a background task that manages the SSE connection lifecycle. - // This task runs indefinitely, handling reconnections automatically. tokio::spawn(async move { - // Reconnection delay starts at 1 second and doubles on each failure (exponential backoff). - // This prevents hammering the server when it's down or restarting. let mut reconnect_delay = INITIAL_RECONNECT_DELAY; - // Main reconnection loop - runs forever, attempting to maintain a connection. loop { - // === STEP 1: Build the SSE endpoint URL === - // Construct the full URL path: /height/sse let mut url = client.url.clone(); - let mut segments = url.path_segments_mut().ok().unwrap(); - segments.push("height"); - segments.push("sse"); - std::mem::drop(segments); // Release the mutable borrow on url + url.path_segments_mut() + .expect("base URL cannot be a cannot-be-a-base URL") + .extend(&["height", "sse"]); - // === STEP 2: Prepare the HTTP GET request === let mut req = client.http_client.request(Method::GET, url); - - // Add bearer token authentication if configured if let Some(bearer_token) = &client.bearer_token { req = req.bearer_auth(bearer_token); } - - // Configure request headers and timeout. - // SSE connections are long-lived, so we use a 24-hour timeout to prevent - // the HTTP client from terminating the connection prematurely. req = req .header(header::ACCEPT, "text/event-stream") .timeout(MAX_CONNECTION_AGE); - // === STEP 3: Attempt to establish the SSE connection === match req.send().await { Ok(res) => { let status = res.status(); - - // Check for HTTP errors (non-2xx status codes) if !status.is_success() { - log::warn!("❌ HTTP error: status code {}", status); - - // Wait before retrying with exponential backoff + log::warn!("HTTP error: status code {}", status); tokio::time::sleep(reconnect_delay).await; reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); - continue; // Retry the connection + continue; } - // Successfully connected! - log::info!("✅ Connected to height SSE stream"); - - // Reset reconnection delay after successful connection + log::info!("Connected to height SSE stream"); reconnect_delay = INITIAL_RECONNECT_DELAY; - // === STEP 4: Process the SSE byte stream === - // Get the response body as a stream of bytes let mut byte_stream = res.bytes_stream(); - - // Buffer for accumulating incomplete SSE messages. - // SSE messages are text-based and separated by blank lines (\n\n). let mut buf = String::new(); - - // Flag to track if the connection is still active let mut connection_active = true; - // Main message processing loop - runs until the connection drops while connection_active { - // Wait for next chunk with timeout to detect dead connections. - // Server sends keepalive pings every 20s, so if we don't receive - // ANY data (including pings) within 60s, the connection is dead. let next_chunk = tokio::time::timeout(READ_TIMEOUT, byte_stream.next()).await; match next_chunk { - // === Timeout: no data received within READ_TIMEOUT === Err(_) => { log::warn!( - "⏱️ No data received for {:?}, connection appears dead", + "No data received for {:?}, connection appears dead", READ_TIMEOUT ); connection_active = false; } - - // === Successfully received bytes from the stream === Ok(Some(Ok(bytes))) => { - log::trace!( - "📦 Received {} bytes from SSE stream", - bytes.len() - ); + log::trace!("Received {} bytes from SSE stream", bytes.len()); + + let chunk_str = String::from_utf8_lossy(&bytes); + buf.push_str(&chunk_str); + + // Process complete SSE messages (delimited by \n\n) + while let Some(idx) = buf.find("\n\n") { + let event_block = buf[..idx].to_string(); + buf.drain(..idx + 2); + + let mut event_name: Option<&str> = None; + let mut data_lines: Vec<&str> = Vec::new(); - use std::fmt::Write as _; - - // Convert bytes to UTF-8 string - let chunk_str = match std::str::from_utf8(&bytes) { - Ok(s) => s, - Err(_) => { - // Handle invalid UTF-8 by doing lossy conversion. - // This is rare but can happen with network corruption. - let mut tmp = String::with_capacity(bytes.len() * 2); - for &b in bytes.as_ref() { - let _ = write!(&mut tmp, "{}", char::from(b)); + for line in event_block.lines() { + if line.is_empty() || line.starts_with(':') { + continue; } - buf.push_str(&tmp); - continue; - } - }; - - // Append the new chunk to our buffer - buf.push_str(chunk_str); - - // === STEP 5: Parse complete SSE messages === - // SSE messages are separated by blank lines (\n\n). - // Process all complete messages currently in the buffer. - loop { - if let Some(idx) = buf.find("\n\n") { - // Extract one complete SSE message - let event_block = buf[..idx].to_string(); - buf.drain(..idx + 2); // Remove message + blank line from buffer - - // Parse the SSE event fields according to the W3C spec - let mut event_name: Option<&str> = None; - let mut data_lines: Vec<&str> = Vec::new(); - - // Process each line in the event block - for line in event_block.lines() { - if line.is_empty() { - continue; - } - if line.starts_with(':') { - // Comment line (used for keep-alive pings). - // Format: ": ping" or ": " - continue; - } - if let Some(rest) = line.strip_prefix("event:") { - // Event type field. - // Format: "event: height" - event_name = Some(rest.trim()); - continue; - } - if let Some(rest) = line.strip_prefix("data:") { - // Data field (can be multiple per event). - // Format: "data: 12345" - data_lines.push(rest.trim()); - continue; - } - // Ignore other SSE fields like "id:" and "retry:" + if let Some(rest) = line.strip_prefix("event:") { + event_name = Some(rest.trim()); + } else if let Some(rest) = line.strip_prefix("data:") { + data_lines.push(rest.trim()); } + } - // === STEP 6: Process height events === - let name = event_name.unwrap_or(""); - if name == "height" { - // Combine multiple data lines (though typically just one) + match event_name.unwrap_or("") { + "height" => { let data = data_lines.join("\n"); - - // Try parsing as plain integer (preferred format). - // Server sends: event:height\ndata:12345\n\n if let Ok(h) = data.trim().parse::() { - log::debug!("📈 Height update: {}", h); - - // Send the height through the channel. - // If the receiver is dropped, exit the task gracefully. + log::debug!("Height update: {}", h); if tx.send(Ok(h)).await.is_err() { - log::info!( - "Receiver dropped, exiting stream task" - ); + log::info!("Receiver dropped, exiting"); return; } } else { - log::warn!( - "❌ Failed to parse height: {}", - data - ); + log::warn!("Failed to parse height: {}", data); connection_active = false; - continue; } } - } else { - // No complete message in buffer yet, wait for more data - break; + "ping" => { + log::trace!("Received keepalive ping"); + } + _ => {} } } } - - // === Stream error occurred (network issue, timeout, etc.) === Ok(Some(Err(e))) => { - log::warn!("⚠️ SSE stream error: {:?}", e); - connection_active = false; // Exit loop and reconnect + log::warn!("SSE stream error: {:?}", e); + connection_active = false; } - - // === Stream ended (server closed the connection) === - // This happens during server restarts, shutdowns, or SIGTERM simulation Ok(None) => { - log::info!("🔌 SSE stream closed by server, will reconnect"); - connection_active = false; // Exit loop and reconnect + log::info!("SSE stream closed by server, will reconnect"); + connection_active = false; } } } } - - // === Failed to establish HTTP connection === Err(e) => { - log::warn!("❌ Failed to connect to height stream: {:?}", e); + log::warn!("Failed to connect to height stream: {:?}", e); } } - // === STEP 7: Wait before reconnecting === - // After any disconnection (graceful or error), wait before attempting to reconnect. - // This implements exponential backoff to avoid overwhelming the server. - log::info!("⏳ Reconnecting in {:?}...", reconnect_delay); + log::info!("Reconnecting in {:?}...", reconnect_delay); tokio::time::sleep(reconnect_delay).await; - - // Double the delay for the next attempt, up to the maximum. - // Pattern: 0.5s → 1s → 2s → 4s → 8s → 16s → 30s (max) → 30s → ... reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); } }); @@ -1000,3 +871,4 @@ fn check_simple_stream_params(config: &StreamConfig) -> Result<()> { Ok(()) } + From 7e52d156a73040ec9350fa4a3c3e5f1959e466f7 Mon Sep 17 00:00:00 2001 From: Jason Date: Fri, 7 Nov 2025 15:22:51 +0200 Subject: [PATCH 05/10] Expose to the client information about the stream - so they can react appropriately when connection is lost. --- examples/height_stream/README.md | 24 ++++++++++++ examples/height_stream/src/main.rs | 14 +++---- hypersync-client/src/lib.rs | 63 +++++++++++++++++++++++++----- 3 files changed, 85 insertions(+), 16 deletions(-) create mode 100644 examples/height_stream/README.md diff --git a/examples/height_stream/README.md b/examples/height_stream/README.md new file mode 100644 index 0000000..a0f0c00 --- /dev/null +++ b/examples/height_stream/README.md @@ -0,0 +1,24 @@ +# Height Stream Example + +This example demonstrates using the `stream_height()` API to receive real-time height updates from a HyperSync server. + +## Running the Example + +```bash +# With connection events visible (no env_logger needed) +cargo run -p height_stream + +# With debug logs (shows keepalive pings, etc) +RUST_LOG=debug cargo run -p height_stream + +# With trace logs (shows all SSE bytes received) +RUST_LOG=trace cargo run -p height_stream +``` + +## API Design + +The stream emits `HeightStreamEvent` enum variants: + +- **`Connected`**: Emitted when successfully connected/reconnected to the server +- **`Height(u64)`**: Emitted for each height update received +- **`Reconnecting { delay }`**: Emitted when connection is lost, before waiting to reconnect diff --git a/examples/height_stream/src/main.rs b/examples/height_stream/src/main.rs index 8baf56f..68fc3d9 100644 --- a/examples/height_stream/src/main.rs +++ b/examples/height_stream/src/main.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Result; -use hypersync_client::{Client, ClientConfig}; +use hypersync_client::{Client, ClientConfig, HeightStreamEvent}; #[tokio::main] async fn main() -> Result<()> { @@ -20,12 +20,12 @@ async fn main() -> Result<()> { println!("listening for height updates... (Ctrl+C to quit)"); - while let Some(msg) = rx.recv().await { - match msg { - Ok(height) => println!("height: {}", height), - Err(e) => { - eprintln!("stream error - will automatically reconnect: {e:?}"); - break; + while let Some(event) = rx.recv().await { + match event { + HeightStreamEvent::Connected => println!("✓ Connected to stream"), + HeightStreamEvent::Height(height) => println!("height: {}", height), + HeightStreamEvent::Reconnecting { delay } => { + println!("⟳ Reconnecting in {:?}...", delay) } } } diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index b0c16c9..e397bac 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -701,20 +701,37 @@ const MAX_CONNECTION_AGE: std::time::Duration = std::time::Duration::from_secs(2 /// so we timeout after 15s (3x the ping interval). const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); +/// Events emitted by the height stream. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum HeightStreamEvent { + /// Successfully connected or reconnected to the SSE stream. + Connected, + /// Received a height update from the server. + Height(u64), + /// Connection lost, will attempt to reconnect after the specified delay. + Reconnecting { + /// Duration to wait before attempting reconnection. + delay: Duration, + }, +} + impl Client { /// Streams archive height updates from the server via Server-Sent Events. /// /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s). /// + /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes + /// and height updates. This allows applications to display connection status to users. + /// /// # Returns - /// Channel receiver yielding `Result` heights. The background task handles connection - /// lifecycle and sends updates through this channel. + /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection + /// lifecycle and sends events through this channel. /// /// # Example /// ```no_run /// # use std::sync::Arc; - /// # use hypersync_client::{Client, ClientConfig}; + /// # use hypersync_client::{Client, ClientConfig, HeightStreamEvent}; /// # async fn example() -> anyhow::Result<()> { /// let client = Arc::new(Client::new(ClientConfig { /// url: Some("https://eth.hypersync.xyz".parse()?), @@ -723,16 +740,19 @@ impl Client { /// /// let mut rx = client.stream_height().await?; /// - /// while let Some(result) = rx.recv().await { - /// match result { - /// Ok(height) => println!("Height: {}", height), - /// Err(e) => eprintln!("Error: {}", e), + /// while let Some(event) = rx.recv().await { + /// match event { + /// HeightStreamEvent::Connected => println!("Connected to stream"), + /// HeightStreamEvent::Height(h) => println!("Height: {}", h), + /// HeightStreamEvent::Reconnecting { delay } => { + /// println!("Reconnecting in {:?}...", delay) + /// } /// } /// } /// # Ok(()) /// # } /// ``` - pub async fn stream_height(self: Arc) -> Result>> { + pub async fn stream_height(self: Arc) -> Result> { let (tx, rx) = mpsc::channel(16); let client = self.clone(); @@ -758,6 +778,15 @@ impl Client { let status = res.status(); if !status.is_success() { log::warn!("HTTP error: status code {}", status); + if tx + .send(HeightStreamEvent::Reconnecting { + delay: reconnect_delay, + }) + .await + .is_err() + { + return; + } tokio::time::sleep(reconnect_delay).await; reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); @@ -765,6 +794,9 @@ impl Client { } log::info!("Connected to height SSE stream"); + if tx.send(HeightStreamEvent::Connected).await.is_err() { + return; + } reconnect_delay = INITIAL_RECONNECT_DELAY; let mut byte_stream = res.bytes_stream(); @@ -813,7 +845,11 @@ impl Client { let data = data_lines.join("\n"); if let Ok(h) = data.trim().parse::() { log::debug!("Height update: {}", h); - if tx.send(Ok(h)).await.is_err() { + if tx + .send(HeightStreamEvent::Height(h)) + .await + .is_err() + { log::info!("Receiver dropped, exiting"); return; } @@ -846,6 +882,15 @@ impl Client { } log::info!("Reconnecting in {:?}...", reconnect_delay); + if tx + .send(HeightStreamEvent::Reconnecting { + delay: reconnect_delay, + }) + .await + .is_err() + { + return; + } tokio::time::sleep(reconnect_delay).await; reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); } From 0b3e7e4ad7fada7f66347fd2fad743d2737ad428 Mon Sep 17 00:00:00 2001 From: Jono Prest Date: Tue, 11 Nov 2025 13:07:11 +0200 Subject: [PATCH 06/10] Run fmt --- examples/height_stream/src/main.rs | 6 +----- hypersync-client/src/lib.rs | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/examples/height_stream/src/main.rs b/examples/height_stream/src/main.rs index 68fc3d9..7a8543f 100644 --- a/examples/height_stream/src/main.rs +++ b/examples/height_stream/src/main.rs @@ -8,11 +8,7 @@ async fn main() -> Result<()> { env_logger::init(); let client = Arc::new(Client::new(ClientConfig { - url: Some( - "https://arbitrum-sepolia.hypersync.xyz" - .parse() - .unwrap(), - ), + url: Some("https://arbitrum-sepolia.hypersync.xyz".parse().unwrap()), ..Default::default() })?); diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index e397bac..20517d9 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -916,4 +916,3 @@ fn check_simple_stream_params(config: &StreamConfig) -> Result<()> { Ok(()) } - From 3efd5cb137dad13b7bdb8bed5b4f88c08ca0f51c Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 12 Nov 2025 13:36:00 +0200 Subject: [PATCH 07/10] Update stream to use library --- hypersync-client/Cargo.toml | 2 + hypersync-client/src/lib.rs | 257 +++++++++++++++++++----------------- 2 files changed, 140 insertions(+), 119 deletions(-) diff --git a/hypersync-client/Cargo.toml b/hypersync-client/Cargo.toml index 65b4c62..b1f3400 100644 --- a/hypersync-client/Cargo.toml +++ b/hypersync-client/Cargo.toml @@ -30,6 +30,7 @@ tokio = { version = "1", default-features = false, features = [ "test-util", "rt", "macros", + "sync", ] } log = "0.4" fastrange-rs = "0.1" @@ -50,6 +51,7 @@ alloy-primitives = "1.1" hypersync-net-types = { path = "../hypersync-net-types", version = "0.11.0-rc.3" } hypersync-format = { path = "../hypersync-format", version = "0.5.8" } hypersync-schema = { path = "../hypersync-schema", version = "0.3" } +reqwest-eventsource = "0.6" [dependencies.reqwest] version = "0.12" diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index 20517d9..b82b09b 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -6,8 +6,9 @@ use anyhow::{anyhow, Context, Result}; use futures::StreamExt; use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query}; use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk}; -use reqwest::header; -use reqwest::Method; +use reqwest::{header, Method}; +use reqwest_eventsource::retry::ExponentialBackoff; +use reqwest_eventsource::Event; mod column_mapping; mod config; @@ -694,12 +695,11 @@ impl Client { } } -const INITIAL_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_millis(200); -const MAX_RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(30); -const MAX_CONNECTION_AGE: std::time::Duration = std::time::Duration::from_secs(24 * 60 * 60); +const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200); +const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); /// Timeout for detecting dead connections. Server sends keepalive pings every 5s, /// so we timeout after 15s (3x the ping interval). -const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); +const READ_TIMEOUT: Duration = Duration::from_secs(15); /// Events emitted by the height stream. #[derive(Debug, Clone, PartialEq, Eq)] @@ -757,142 +757,161 @@ impl Client { let client = self.clone(); tokio::spawn(async move { - let mut reconnect_delay = INITIAL_RECONNECT_DELAY; + let mut consecutive_failures = 0u32; loop { + // Build the GET /height/sse request let mut url = client.url.clone(); url.path_segments_mut() - .expect("base URL cannot be a cannot-be-a-base URL") + .expect("valid base URL") .extend(&["height", "sse"]); - let mut req = client.http_client.request(Method::GET, url); - if let Some(bearer_token) = &client.bearer_token { - req = req.bearer_auth(bearer_token); + let mut req = client + .http_client + .get(url) + .header(header::ACCEPT, "text/event-stream"); + + if let Some(bearer) = &client.bearer_token { + req = req.bearer_auth(bearer); } - req = req - .header(header::ACCEPT, "text/event-stream") - .timeout(MAX_CONNECTION_AGE); - - match req.send().await { - Ok(res) => { - let status = res.status(); - if !status.is_success() { - log::warn!("HTTP error: status code {}", status); - if tx - .send(HeightStreamEvent::Reconnecting { - delay: reconnect_delay, - }) - .await - .is_err() - { - return; - } - tokio::time::sleep(reconnect_delay).await; - reconnect_delay = - std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); - continue; - } - log::info!("Connected to height SSE stream"); - if tx.send(HeightStreamEvent::Connected).await.is_err() { - return; + // Configure exponential backoff for library-level retries + let retry_policy = ExponentialBackoff::new( + INITIAL_RECONNECT_DELAY, + 2.0, + Some(MAX_RECONNECT_DELAY), + None, // unlimited retries + ); + + // Turn the request into an EventSource stream with retries + let mut es = match reqwest_eventsource::EventSource::new(req) { + Ok(es) => { + let mut es = es; + es.set_retry_policy(Box::new(retry_policy)); + es + } + Err(err) => { + log::error!("Failed to create EventSource: {:?}", err); + + let delay = if consecutive_failures == 0 { + Duration::from_millis(0) // immediate retry on first failure + } else { + // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s + INITIAL_RECONNECT_DELAY + .checked_mul(1 << (consecutive_failures - 1).min(7)) + .unwrap_or(MAX_RECONNECT_DELAY) + .min(MAX_RECONNECT_DELAY) + }; + + if delay > Duration::from_millis(0) { + let _ = tx.send(HeightStreamEvent::Reconnecting { delay }).await; + tokio::time::sleep(delay).await; + } + consecutive_failures += 1; + continue; + } + }; + + let mut connection_successful = false; + + loop { + // Wrap the stream in a timeout to detect dead connections + let next_event = tokio::time::timeout(READ_TIMEOUT, es.next()).await; + + match next_event { + Err(_) => { + // Timeout - no data received (including pings) for READ_TIMEOUT + log::warn!( + "No data received for {:?}, connection appears dead", + READ_TIMEOUT + ); + es.close(); + break; // Break inner loop to reconnect } - reconnect_delay = INITIAL_RECONNECT_DELAY; - - let mut byte_stream = res.bytes_stream(); - let mut buf = String::new(); - let mut connection_active = true; - - while connection_active { - let next_chunk = - tokio::time::timeout(READ_TIMEOUT, byte_stream.next()).await; - - match next_chunk { - Err(_) => { - log::warn!( - "No data received for {:?}, connection appears dead", - READ_TIMEOUT - ); - connection_active = false; + Ok(Some(ev)) => { + match ev { + Ok(Event::Open) => { + // Successfully connected + log::info!("Connected to height SSE stream"); + connection_successful = true; + consecutive_failures = 0; // Reset failure count on successful connection + if tx.send(HeightStreamEvent::Connected).await.is_err() { + return; // Receiver dropped, exit task + } } - Ok(Some(Ok(bytes))) => { - log::trace!("Received {} bytes from SSE stream", bytes.len()); - - let chunk_str = String::from_utf8_lossy(&bytes); - buf.push_str(&chunk_str); - - // Process complete SSE messages (delimited by \n\n) - while let Some(idx) = buf.find("\n\n") { - let event_block = buf[..idx].to_string(); - buf.drain(..idx + 2); - - let mut event_name: Option<&str> = None; - let mut data_lines: Vec<&str> = Vec::new(); - - for line in event_block.lines() { - if line.is_empty() || line.starts_with(':') { - continue; - } - if let Some(rest) = line.strip_prefix("event:") { - event_name = Some(rest.trim()); - } else if let Some(rest) = line.strip_prefix("data:") { - data_lines.push(rest.trim()); - } - } - - match event_name.unwrap_or("") { - "height" => { - let data = data_lines.join("\n"); - if let Ok(h) = data.trim().parse::() { - log::debug!("Height update: {}", h); - if tx - .send(HeightStreamEvent::Height(h)) - .await - .is_err() - { - log::info!("Receiver dropped, exiting"); - return; - } - } else { - log::warn!("Failed to parse height: {}", data); - connection_active = false; + Ok(Event::Message(msg)) => { + let evt = msg.event.as_str(); + match evt { + // our server emits "height" events with data as integer text + "height" | "" => { + if let Ok(h) = msg.data.trim().parse::() { + log::debug!("Height update: {}", h); + if tx + .send(HeightStreamEvent::Height(h)) + .await + .is_err() + { + return; // Receiver dropped, exit task } + } else { + // malformed => close and reconnect + log::warn!("Failed to parse height: {}", msg.data); + es.close(); + break; // Break inner loop to reconnect } - "ping" => { - log::trace!("Received keepalive ping"); - } - _ => {} } + // server keepalives detect connection liveness via timeout + "ping" => { + log::trace!("Received keepalive ping"); + } + _ => { /* ignore unknown event types */ } } } - Ok(Some(Err(e))) => { - log::warn!("SSE stream error: {:?}", e); - connection_active = false; - } - Ok(None) => { - log::info!("SSE stream closed by server, will reconnect"); - connection_active = false; + Err(err) => { + // Stream error - library may or may not retry depending on error type + log::warn!("SSE stream error: {:?}", err); + es.close(); + break; // Break inner loop to reconnect } } } + Ok(None) => { + // Stream ended normally + log::info!("SSE stream closed"); + break; // Break inner loop to reconnect + } } - Err(e) => { - log::warn!("Failed to connect to height stream: {:?}", e); + } + + // Connection closed, calculate backoff delay + let delay = if !connection_successful || consecutive_failures == 0 { + // Immediate retry if we never connected, or first failure after success + Duration::from_millis(0) + } else { + // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s + INITIAL_RECONNECT_DELAY + .checked_mul(1 << (consecutive_failures - 1).min(7)) + .unwrap_or(MAX_RECONNECT_DELAY) + .min(MAX_RECONNECT_DELAY) + }; + + if delay > Duration::from_millis(0) { + log::info!("Reconnecting in {:?}...", delay); + if tx + .send(HeightStreamEvent::Reconnecting { delay }) + .await + .is_err() + { + return; // Receiver dropped, exit task } + tokio::time::sleep(delay).await; } - log::info!("Reconnecting in {:?}...", reconnect_delay); - if tx - .send(HeightStreamEvent::Reconnecting { - delay: reconnect_delay, - }) - .await - .is_err() - { - return; + if connection_successful { + consecutive_failures = 0; // Reset after successful connection that then failed + } else { + consecutive_failures += 1; } - tokio::time::sleep(reconnect_delay).await; - reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); } }); From 96f7c930bd1f4e5a3b731493b81168e87903353f Mon Sep 17 00:00:00 2001 From: Jono Prest Date: Thu, 13 Nov 2025 13:27:11 +0200 Subject: [PATCH 08/10] Refactor stream event --- hypersync-client/src/lib.rs | 378 +++++++++++++++++++++--------------- 1 file changed, 219 insertions(+), 159 deletions(-) diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index b82b09b..0b6bd69 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -8,7 +8,7 @@ use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Que use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk}; use reqwest::{header, Method}; use reqwest_eventsource::retry::ExponentialBackoff; -use reqwest_eventsource::Event; +use reqwest_eventsource::{Event, EventSource}; mod column_mapping; mod config; @@ -695,6 +695,7 @@ impl Client { } } +/// 200ms const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200); const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); /// Timeout for detecting dead connections. Server sends keepalive pings every 5s, @@ -715,7 +716,159 @@ pub enum HeightStreamEvent { }, } +enum InternalStreamEvent { + Publish(HeightStreamEvent), + Ping, + Unknown(String), +} + impl Client { + fn get_es_stream(self: Arc) -> Result { + // Build the GET /height/sse request + let mut url = self.url.clone(); + url.path_segments_mut() + .ok() + .context("invalid base URL")? + .extend(&["height", "sse"]); + + let mut req = self + .http_client + .get(url) + .header(header::ACCEPT, "text/event-stream"); + + if let Some(bearer) = &self.bearer_token { + req = req.bearer_auth(bearer); + } + + // Configure exponential backoff for library-level retries + let retry_policy = ExponentialBackoff::new( + INITIAL_RECONNECT_DELAY, + 2.0, + Some(MAX_RECONNECT_DELAY), + None, // unlimited retries + ); + + // Turn the request into an EventSource stream with retries + let mut es = reqwest_eventsource::EventSource::new(req) + .context("unexpected error creating EventSource")?; + es.set_retry_policy(Box::new(retry_policy)); + Ok(es) + } + + async fn next_height(event_source: &mut EventSource) -> Result> { + let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next()) + .await + .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))? + else { + return Ok(None); + }; + + let e = match res.context("failed response")? { + Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected), + Event::Message(event) => match event.event.as_str() { + "height" => { + let height = event + .data + .trim() + .parse::() + .context("parsing height from event data")?; + InternalStreamEvent::Publish(HeightStreamEvent::Height(height)) + } + "ping" => InternalStreamEvent::Ping, + _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)), + }, + }; + + Ok(Some(e)) + } + + async fn stream_height_events( + es: &mut EventSource, + tx: &mpsc::Sender, + ) -> Result { + let mut received_an_event = false; + while let Some(event) = Self::next_height(es).await.context("failed next height")? { + match event { + InternalStreamEvent::Publish(event) => { + received_an_event = true; + if tx.send(event).await.is_err() { + return Ok(received_an_event); // Receiver dropped, exit task + } + } + InternalStreamEvent::Ping => (), // ignore pings + InternalStreamEvent::Unknown(_event) => (), // ignore unknown events + } + } + Ok(received_an_event) + } + + fn get_delay(consecutive_failures: u32) -> Duration { + if consecutive_failures > 0 { + /// helper function to calculate 2^x + /// optimization using bit shifting + const fn two_to_pow(x: u32) -> u32 { + 1 << x + } + // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s + INITIAL_RECONNECT_DELAY + .saturating_mul(two_to_pow(consecutive_failures - 1)) + .min(MAX_RECONNECT_DELAY) + } else { + // On zero consecutive failures, 0 delay + Duration::from_millis(0) + } + } + + async fn stream_height_events_with_retry( + self: Arc, + tx: &mpsc::Sender, + ) -> Result<()> { + let mut consecutive_failures = 0u32; + + loop { + // should always be able to creat a new es stream + // something is wrong with the req builder otherwise + let mut es = self.clone().get_es_stream().context("get es stream")?; + + match Self::stream_height_events(&mut es, tx).await { + Ok(received_an_event) => { + if received_an_event { + consecutive_failures = 0; // Reset after successful connection that then failed + } + log::trace!("Stream height exited"); + } + Err(e) => { + log::trace!("Stream height failed: {e:?}"); + } + } + + es.close(); + + // If the receiver is closed, exit the task + if tx.is_closed() { + break; + } + + let delay = Self::get_delay(consecutive_failures); + log::trace!("Reconnecting in {:?}...", delay); + + if tx + .send(HeightStreamEvent::Reconnecting { delay }) + .await + .is_err() + { + return Ok(()); // Receiver dropped, exit task + } + tokio::time::sleep(delay).await; + + // increment consecutive failures so that on the next try + // it will start using back offs + consecutive_failures += 1; + } + + Ok(()) + } + /// Streams archive height updates from the server via Server-Sent Events. /// /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects @@ -752,170 +905,16 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub async fn stream_height(self: Arc) -> Result> { + pub fn stream_height(self: Arc) -> mpsc::Receiver { let (tx, rx) = mpsc::channel(16); - let client = self.clone(); tokio::spawn(async move { - let mut consecutive_failures = 0u32; - - loop { - // Build the GET /height/sse request - let mut url = client.url.clone(); - url.path_segments_mut() - .expect("valid base URL") - .extend(&["height", "sse"]); - - let mut req = client - .http_client - .get(url) - .header(header::ACCEPT, "text/event-stream"); - - if let Some(bearer) = &client.bearer_token { - req = req.bearer_auth(bearer); - } - - // Configure exponential backoff for library-level retries - let retry_policy = ExponentialBackoff::new( - INITIAL_RECONNECT_DELAY, - 2.0, - Some(MAX_RECONNECT_DELAY), - None, // unlimited retries - ); - - // Turn the request into an EventSource stream with retries - let mut es = match reqwest_eventsource::EventSource::new(req) { - Ok(es) => { - let mut es = es; - es.set_retry_policy(Box::new(retry_policy)); - es - } - Err(err) => { - log::error!("Failed to create EventSource: {:?}", err); - - let delay = if consecutive_failures == 0 { - Duration::from_millis(0) // immediate retry on first failure - } else { - // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s - INITIAL_RECONNECT_DELAY - .checked_mul(1 << (consecutive_failures - 1).min(7)) - .unwrap_or(MAX_RECONNECT_DELAY) - .min(MAX_RECONNECT_DELAY) - }; - - if delay > Duration::from_millis(0) { - let _ = tx.send(HeightStreamEvent::Reconnecting { delay }).await; - tokio::time::sleep(delay).await; - } - consecutive_failures += 1; - continue; - } - }; - - let mut connection_successful = false; - - loop { - // Wrap the stream in a timeout to detect dead connections - let next_event = tokio::time::timeout(READ_TIMEOUT, es.next()).await; - - match next_event { - Err(_) => { - // Timeout - no data received (including pings) for READ_TIMEOUT - log::warn!( - "No data received for {:?}, connection appears dead", - READ_TIMEOUT - ); - es.close(); - break; // Break inner loop to reconnect - } - Ok(Some(ev)) => { - match ev { - Ok(Event::Open) => { - // Successfully connected - log::info!("Connected to height SSE stream"); - connection_successful = true; - consecutive_failures = 0; // Reset failure count on successful connection - if tx.send(HeightStreamEvent::Connected).await.is_err() { - return; // Receiver dropped, exit task - } - } - Ok(Event::Message(msg)) => { - let evt = msg.event.as_str(); - match evt { - // our server emits "height" events with data as integer text - "height" | "" => { - if let Ok(h) = msg.data.trim().parse::() { - log::debug!("Height update: {}", h); - if tx - .send(HeightStreamEvent::Height(h)) - .await - .is_err() - { - return; // Receiver dropped, exit task - } - } else { - // malformed => close and reconnect - log::warn!("Failed to parse height: {}", msg.data); - es.close(); - break; // Break inner loop to reconnect - } - } - // server keepalives detect connection liveness via timeout - "ping" => { - log::trace!("Received keepalive ping"); - } - _ => { /* ignore unknown event types */ } - } - } - Err(err) => { - // Stream error - library may or may not retry depending on error type - log::warn!("SSE stream error: {:?}", err); - es.close(); - break; // Break inner loop to reconnect - } - } - } - Ok(None) => { - // Stream ended normally - log::info!("SSE stream closed"); - break; // Break inner loop to reconnect - } - } - } - - // Connection closed, calculate backoff delay - let delay = if !connection_successful || consecutive_failures == 0 { - // Immediate retry if we never connected, or first failure after success - Duration::from_millis(0) - } else { - // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s - INITIAL_RECONNECT_DELAY - .checked_mul(1 << (consecutive_failures - 1).min(7)) - .unwrap_or(MAX_RECONNECT_DELAY) - .min(MAX_RECONNECT_DELAY) - }; - - if delay > Duration::from_millis(0) { - log::info!("Reconnecting in {:?}...", delay); - if tx - .send(HeightStreamEvent::Reconnecting { delay }) - .await - .is_err() - { - return; // Receiver dropped, exit task - } - tokio::time::sleep(delay).await; - } - - if connection_successful { - consecutive_failures = 0; // Reset after successful connection that then failed - } else { - consecutive_failures += 1; - } + if let Err(e) = self.stream_height_events_with_retry(&tx).await { + log::error!("Stream height failed unexpectedly: {e:?}"); } }); - Ok(rx) + rx } } @@ -935,3 +934,64 @@ fn check_simple_stream_params(config: &StreamConfig) -> Result<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_get_delay() { + assert_eq!( + Client::get_delay(0), + Duration::from_millis(0), + "starts with 0 delay" + ); + // powers of 2 backoff + assert_eq!(Client::get_delay(1), Duration::from_millis(200)); + assert_eq!(Client::get_delay(2), Duration::from_millis(400)); + assert_eq!(Client::get_delay(3), Duration::from_millis(800)); + // maxes out at 30s + assert_eq!( + Client::get_delay(9), + Duration::from_secs(30), + "max delay is 30s" + ); + assert_eq!( + Client::get_delay(10), + Duration::from_secs(30), + "max delay is 30s" + ); + } + + #[tokio::test] + #[ignore = "integration test with live hs server for height stream"] + async fn test_stream_height_events() -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel(16); + let handle = tokio::spawn(async move { + let client = Arc::new(Client::new(ClientConfig { + url: Some("https://monad-testnet.hypersync.xyz".parse()?), + ..Default::default() + })?); + let mut es = client.get_es_stream().context("get es stream")?; + Client::stream_height_events(&mut es, &tx).await + }); + + let val = rx.recv().await; + assert!(val.is_some()); + assert_eq!(val.unwrap(), HeightStreamEvent::Connected); + let Some(HeightStreamEvent::Height(height)) = rx.recv().await else { + panic!("should have received height") + }; + let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else { + panic!("should have received height") + }; + assert!(height2 > height); + drop(rx); + + let res = handle.await.expect("should have joined"); + let received_an_event = + res.expect("should have ended the stream gracefully after dropping rx"); + assert!(received_an_event, "should have received an event"); + + Ok(()) + } +} From 331139020cca3db8fb6cb3a5d0c0215953483248 Mon Sep 17 00:00:00 2001 From: Jono Prest Date: Thu, 13 Nov 2025 13:33:34 +0200 Subject: [PATCH 09/10] Fix example --- examples/height_stream/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/height_stream/src/main.rs b/examples/height_stream/src/main.rs index 7a8543f..d8db833 100644 --- a/examples/height_stream/src/main.rs +++ b/examples/height_stream/src/main.rs @@ -12,7 +12,7 @@ async fn main() -> Result<()> { ..Default::default() })?); - let mut rx = client.clone().stream_height().await?; + let mut rx = client.clone().stream_height(); println!("listening for height updates... (Ctrl+C to quit)"); From c2e3f092791f32b38c9b9abccb104bde7f12f3ad Mon Sep 17 00:00:00 2001 From: Jono Prest Date: Thu, 13 Nov 2025 13:44:17 +0200 Subject: [PATCH 10/10] Fix doc test --- hypersync-client/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index 0b6bd69..c019885 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -891,7 +891,7 @@ impl Client { /// ..Default::default() /// })?); /// - /// let mut rx = client.stream_height().await?; + /// let mut rx = client.stream_height(); /// /// while let Some(event) = rx.recv().await { /// match event {