From 2332bc008857ffb76df6c4b449f72b2ebf096e6d Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 6 Dec 2023 17:27:11 +0100 Subject: [PATCH] add emit logic --- cluster-endpoints/examples/stream_via_grpc.rs | 119 ++++++++++++++---- 1 file changed, 97 insertions(+), 22 deletions(-) diff --git a/cluster-endpoints/examples/stream_via_grpc.rs b/cluster-endpoints/examples/stream_via_grpc.rs index a74cf623..6bbad6a5 100644 --- a/cluster-endpoints/examples/stream_via_grpc.rs +++ b/cluster-endpoints/examples/stream_via_grpc.rs @@ -1,12 +1,14 @@ +use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; use std::thread; -use std::thread::sleep; use std::time::Duration; +use futures::StreamExt; +use itertools::{ExactlyOneError, Itertools}; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use serde::Serializer; use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::clock::Slot; @@ -14,6 +16,7 @@ use tokio::select; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::RwLock; +use tokio::time::{sleep, timeout}; use yellowstone_grpc_proto::geyser::CommitmentLevel; use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_block_processing_task; @@ -66,34 +69,105 @@ pub async fn main() { let (tx_tip, mut rx_tip) = tokio::sync::watch::channel::(0); - let (offer_block_sender, mut offer_block_notifier) = tokio::sync::mpsc::channel::(100); start_progressor("green".to_string(), blocks_notifier_green, rx_tip.clone(), offer_block_sender.clone()); start_progressor("blue".to_string(), blocks_notifier_blue, rx_tip.clone(), offer_block_sender.clone()); + // test tokio::spawn(async move { // need to wait until channels reached slot beyond tip // tokio::time::sleep(Duration::from_secs(14)).await; + // see also start procedure! let mut current_tip = 0; + let mut slots_offered = HashSet::new(); loop { - let slot_offered = offer_block_notifier.recv().await.unwrap(); - info!("<< offered slot: {:?}", slot_offered); - - // do a dump move and send it back as tip - let OfferBlockMsg::NextSlot(_label, slot_offered) = slot_offered; - let new_tip = slot_offered; - tx_tip.send(new_tip).unwrap(); - info!("--> progressing tip to {}", new_tip); - } + let timeout_secs = if current_tip == 0 { 3 } else { 10 }; + + let msg_or_timeout = timeout(Duration::from_secs(timeout_secs), offer_block_notifier.recv()).await; + info!("- current_tip={}", current_tip); + + match msg_or_timeout { + Ok(Some(offer_block_msg)) => { + // collect the offered slots from the channels + if let OfferBlockMsg::NextSlot(label, slot_offered) = offer_block_msg { + info!("<< offered slot from {}: {:?}", label, slot_offered); + + // TOOD use .parent instead + if slot_offered == current_tip + 1 { + current_tip = slot_offered; + info!("<< take block from {} as new tip {}", label, current_tip); + assert_ne!(current_tip, 0, "must not see empty tip"); + tx_tip.send(current_tip).unwrap(); + slots_offered.clear(); + continue; + } + + slots_offered.insert(slot_offered); + + } + // TODO handle else + } + Ok(None) => { + // TODO double-check + // channel closed + info!("--> channel closed"); + break; + } + Err(_elapsed) => { + // timeout + info!("--> timeout: got these slots: {:?}", slots_offered); + + if current_tip == 0 { + let start_slot = slots_offered.iter().max().expect("need at least one slot to start"); + current_tip = *start_slot; + assert_ne!(current_tip, 0, "must not see empty tip"); + tx_tip.send(current_tip).unwrap(); + info!("--> starting with tip {}", current_tip); + slots_offered.clear(); + continue; + } + + match slots_offered.iter().filter(|s| **s == current_tip + 1).exactly_one() { + Ok(found_next) => { + current_tip = *found_next; + assert_ne!(current_tip, 0, "must not see empty tip"); + tx_tip.send(current_tip).unwrap(); + info!("--> progressing tip to {}", current_tip); + slots_offered.clear(); + } + Err(missed) => { + warn!("--> no slots offered - SHOULD ABORT - no hope to progress"); + } + } + + sleep(Duration::from_millis(500)).await; + } + } + } // -- recv loop }); - sleep(Duration::from_secs(1000)); + // emitter + tokio::spawn(async move { + let mut rx_tip = rx_tip; + loop { + let tip = *rx_tip.borrow_and_update(); + if tip != 0 { + info!("<<<< emit block: {}", tip); + } + if rx_tip.changed().await.is_err() { + break; + } + } + }); + + // "infinite" sleep + sleep(Duration::from_secs(1800)).await } #[derive(Debug)] @@ -108,23 +182,24 @@ fn start_progressor(label: String, blocks_notifier: Receiver, mut // for test only // let start_slot = blocks_notifier.recv().await.unwrap().slot; - let mut tip = 0; - info!("starting at tip {}", tip); + // local copy of tip + let mut local_tip = 0; // block after tip offered by this stream let mut block_after_tip: Slot = 0; 'main_loop: loop { select! { _ = rx_tip.changed() => { - tip = rx_tip.borrow().clone(); - info!("++> {} tip changed to {}", label, tip); + local_tip = rx_tip.borrow_and_update().clone(); + info!("++> {} tip changed to {}", label, local_tip); + // TODO update local tip } - recv_result = blocks_notifier.recv(), if !(block_after_tip > tip) => { + recv_result = blocks_notifier.recv(), if !(block_after_tip > local_tip) => { match recv_result { Ok(block) => { info!("=> recv on {}: {}",label, format_block(&block)); - if block.slot > tip { - info!("==> {}: beyond tip ({} > {})", label, block.slot, tip); + if block.slot > local_tip { + info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip); block_after_tip = block.slot; offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip)).await.unwrap(); // this thread will sleep and not issue any recvs until we get tip.changed signal @@ -160,11 +235,11 @@ fn start_monkey_broadcast(capacity: usize) -> (Sender // failes if there are no receivers if counter % 3 == 0 { - info!("% delay value"); + debug!("% delay value"); tokio::time::sleep(Duration::from_millis(700)).await; } if counter % 5 == 0 { - info!("% drop value"); + debug!("% drop value"); continue 'recv_loop; }