diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index 0f8884f8..0ddc4775 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -1,5 +1,6 @@ use crate::grpc_stream_utils::channelize_stream; use crate::grpc_subscription::map_block_update; +use futures::StreamExt; use geyser_grpc_connector::grpc_subscription_autoreconnect::{ create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, }; @@ -7,7 +8,6 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; use log::info; -use merge_streams::MergeStreams; use solana_lite_rpc_core::structures::produced_block::ProducedBlock; use solana_lite_rpc_core::structures::slot_notification::SlotNotification; use solana_lite_rpc_core::AnyhowJoinHandle; @@ -36,6 +36,21 @@ impl FromYellowstoneExtractor for BlockExtractor { } } +struct BlockHashExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockHashExtractor { + type Target = String; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(u64, String)> { + match update.update_oneof { + Some(UpdateOneof::Block(block)) => Some((block.slot, block.blockhash)), + Some(UpdateOneof::BlockMeta(block_meta)) => { + Some((block_meta.slot, block_meta.blockhash)) + } + _ => None, + } + } +} + pub fn create_grpc_multiplex_blocks_subscription( grpc_sources: Vec, ) -> (Receiver, AnyhowJoinHandle) { @@ -68,28 +83,57 @@ pub fn create_grpc_multiplex_blocks_subscription( create_multiplexed_stream(streams, BlockExtractor(commitment_config)) }; - let multiplex_stream_finalized = { + let (sender, multiplexed_merged_blocks) = + tokio::sync::broadcast::channel::(1000); + + let meta_stream_finalized = { let commitment_config = CommitmentConfig::finalized(); let mut streams = Vec::new(); for grpc_source in &grpc_sources { let stream = create_geyser_reconnecting_stream( grpc_source.clone(), - GeyserFilter(commitment_config).blocks_and_txs(), + GeyserFilter(commitment_config).blocks_meta(), ); streams.push(stream); } - - create_multiplexed_stream(streams, BlockExtractor(commitment_config)) + create_multiplexed_stream(streams, BlockHashExtractor(commitment_config)) + }; + let jh_channelizer = { + // spawn merged + tokio::task::spawn(async move { + let mut map_of_confimed_blocks = HashMap::::new(); + let mut multiplex_stream_confirmed = std::pin::pin!(multiplex_stream_confirmed); + let mut meta_stream_finalized = std::pin::pin!(meta_stream_finalized); + let sender = sender; + loop { + tokio::select! { + confirmed_block = multiplex_stream_confirmed.next() => { + if let Some(confirmed_block) = confirmed_block { + if let Err(e) = sender.send(confirmed_block.clone()) { + panic!("Confirmed block stream send gave error {e:?}"); + } + map_of_confimed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block); + } else { + panic!("Confirmed stream broke"); + } + }, + meta_finalized = meta_stream_finalized.next() => { + if let Some(blockhash) = meta_finalized { + if let Some(mut finalized_block) = map_of_confimed_blocks.remove(&blockhash) { + finalized_block.commitment_config = CommitmentConfig::finalized(); + if let Err(e) = sender.send(finalized_block.clone()) { + panic!("Finalized block stream send gave error {e:?}"); + } + } + } + } + } + } + }) }; - let merged_stream_confirmed_finalize = - (multiplex_stream_confirmed, multiplex_stream_finalized).merge(); - - let (multiplexed_finalized_blocks, jh_channelizer) = - channelize_stream(merged_stream_confirmed_finalize); - - (multiplexed_finalized_blocks, jh_channelizer) + (multiplexed_merged_blocks, jh_channelizer) } struct SlotExtractor {}