Skip to content

Commit

Permalink
add emit logic
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 6, 2023
1 parent 46d450e commit 2332bc0
Showing 1 changed file with 97 additions and 22 deletions.
119 changes: 97 additions & 22 deletions cluster-endpoints/examples/stream_via_grpc.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use std::collections::HashSet;
use std::fmt::{Display, Formatter};

Check warning on line 2 in cluster-endpoints/examples/stream_via_grpc.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused imports: `Display`, `Formatter`
use std::ops::Deref;

Check warning on line 3 in cluster-endpoints/examples/stream_via_grpc.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused import: `std::ops::Deref`
use std::path::PathBuf;

Check warning on line 4 in cluster-endpoints/examples/stream_via_grpc.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused import: `std::path::PathBuf`
use std::sync::Arc;
use std::thread;

Check warning on line 6 in cluster-endpoints/examples/stream_via_grpc.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused import: `std::thread`
use std::thread::sleep;
use std::time::Duration;
use futures::StreamExt;
use itertools::{ExactlyOneError, Itertools};

Check warning on line 9 in cluster-endpoints/examples/stream_via_grpc.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused import: `ExactlyOneError`

use log::{debug, error, info};
use log::{debug, error, info, warn};
use serde::Serializer;

Check warning on line 12 in cluster-endpoints/examples/stream_via_grpc.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused import: `serde::Serializer`
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::clock::Slot;
use tokio::select;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::broadcast::error::TryRecvError;

Check warning on line 17 in cluster-endpoints/examples/stream_via_grpc.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused import: `tokio::sync::broadcast::error::TryRecvError`
use tokio::sync::RwLock;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_proto::geyser::CommitmentLevel;

use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_block_processing_task;
Expand Down Expand Up @@ -66,34 +69,105 @@ pub async fn main() {

let (tx_tip, mut rx_tip) = tokio::sync::watch::channel::<Slot>(0);


let (offer_block_sender, mut offer_block_notifier) = tokio::sync::mpsc::channel::<OfferBlockMsg>(100);

start_progressor("green".to_string(), blocks_notifier_green, rx_tip.clone(), offer_block_sender.clone());
start_progressor("blue".to_string(), blocks_notifier_blue, rx_tip.clone(), offer_block_sender.clone());


// test
tokio::spawn(async move {
// need to wait until channels reached slot beyond tip
// tokio::time::sleep(Duration::from_secs(14)).await;

// see also start procedure!
let mut current_tip = 0;
let mut slots_offered = HashSet::new();
loop {
let slot_offered = offer_block_notifier.recv().await.unwrap();
info!("<< offered slot: {:?}", slot_offered);

// do a dump move and send it back as tip
let OfferBlockMsg::NextSlot(_label, slot_offered) = slot_offered;
let new_tip = slot_offered;
tx_tip.send(new_tip).unwrap();
info!("--> progressing tip to {}", new_tip);
}

let timeout_secs = if current_tip == 0 { 3 } else { 10 };

let msg_or_timeout = timeout(Duration::from_secs(timeout_secs), offer_block_notifier.recv()).await;
info!("- current_tip={}", current_tip);

match msg_or_timeout {
Ok(Some(offer_block_msg)) => {
// collect the offered slots from the channels
if let OfferBlockMsg::NextSlot(label, slot_offered) = offer_block_msg {
info!("<< offered slot from {}: {:?}", label, slot_offered);

// TOOD use .parent instead
if slot_offered == current_tip + 1 {
current_tip = slot_offered;
info!("<< take block from {} as new tip {}", label, current_tip);
assert_ne!(current_tip, 0, "must not see empty tip");
tx_tip.send(current_tip).unwrap();
slots_offered.clear();
continue;
}

slots_offered.insert(slot_offered);

}
// TODO handle else
}
Ok(None) => {
// TODO double-check
// channel closed
info!("--> channel closed");
break;
}
Err(_elapsed) => {
// timeout
info!("--> timeout: got these slots: {:?}", slots_offered);

if current_tip == 0 {
let start_slot = slots_offered.iter().max().expect("need at least one slot to start");
current_tip = *start_slot;
assert_ne!(current_tip, 0, "must not see empty tip");
tx_tip.send(current_tip).unwrap();
info!("--> starting with tip {}", current_tip);
slots_offered.clear();
continue;
}

match slots_offered.iter().filter(|s| **s == current_tip + 1).exactly_one() {
Ok(found_next) => {
current_tip = *found_next;
assert_ne!(current_tip, 0, "must not see empty tip");
tx_tip.send(current_tip).unwrap();
info!("--> progressing tip to {}", current_tip);
slots_offered.clear();
}
Err(missed) => {
warn!("--> no slots offered - SHOULD ABORT - no hope to progress");
}
}

sleep(Duration::from_millis(500)).await;
}
}
} // -- recv loop

});

sleep(Duration::from_secs(1000));

// emitter
tokio::spawn(async move {
let mut rx_tip = rx_tip;
loop {
let tip = *rx_tip.borrow_and_update();
if tip != 0 {
info!("<<<< emit block: {}", tip);
}
if rx_tip.changed().await.is_err() {
break;
}
}
});

// "infinite" sleep
sleep(Duration::from_secs(1800)).await
}

#[derive(Debug)]
Expand All @@ -108,23 +182,24 @@ fn start_progressor(label: String, blocks_notifier: Receiver<ProducedBlock>, mut
// for test only
// let start_slot = blocks_notifier.recv().await.unwrap().slot;

let mut tip = 0;
info!("starting at tip {}", tip);
// local copy of tip
let mut local_tip = 0;

// block after tip offered by this stream
let mut block_after_tip: Slot = 0;
'main_loop: loop {
select! {
_ = rx_tip.changed() => {
tip = rx_tip.borrow().clone();
info!("++> {} tip changed to {}", label, tip);
local_tip = rx_tip.borrow_and_update().clone();
info!("++> {} tip changed to {}", label, local_tip);
// TODO update local tip
}
recv_result = blocks_notifier.recv(), if !(block_after_tip > tip) => {
recv_result = blocks_notifier.recv(), if !(block_after_tip > local_tip) => {
match recv_result {
Ok(block) => {
info!("=> recv on {}: {}",label, format_block(&block));
if block.slot > tip {
info!("==> {}: beyond tip ({} > {})", label, block.slot, tip);
if block.slot > local_tip {
info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip);
block_after_tip = block.slot;
offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip)).await.unwrap();
// this thread will sleep and not issue any recvs until we get tip.changed signal
Expand Down Expand Up @@ -160,11 +235,11 @@ fn start_monkey_broadcast<T: Clone + Send + 'static>(capacity: usize) -> (Sender
// failes if there are no receivers

if counter % 3 == 0 {
info!("% delay value");
debug!("% delay value");
tokio::time::sleep(Duration::from_millis(700)).await;
}
if counter % 5 == 0 {
info!("% drop value");
debug!("% drop value");
continue 'recv_loop;
}

Expand Down

0 comments on commit 2332bc0

Please sign in to comment.