diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 948026816..09a396c50 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -6,6 +6,7 @@ use core::{ task::{Context, Poll}, }; use futures::{Sink, SinkExt}; +use futures_util::StreamExt; use rollup_boost::FlashblocksPayloadV1; use std::{io, net::TcpListener, sync::Arc}; use tokio::{ @@ -209,6 +210,30 @@ async fn broadcast_loop( tracing::warn!("Broadcast channel lagged, some messages were dropped"); } }, + + // This handles channel closing and ping-pong logic + message = stream.next() => if let Some(message) = message { match message { + Ok(message) => { + match message { + Message::Ping(data) => { + if let Err(e) = stream.send(Message::Pong(data)).await { + tracing::warn!("Closing flashblocks subscription for {peer_addr}: {e}"); + break; // Exit the loop if sending fails + } + } + // We don't get any data from RB, so we won't handle closing frame + Message::Close(_) => { + tracing::info!("Closing frame received, stopping connection for {peer_addr}"); + break; + } + _ => (), + } + } + Err(e) => { + tracing::warn!("Received error. Closing flashblocks subscription for {peer_addr}: {e}"); + break; + } + } } } } }