Skip to content

Commit

Permalink
link via .parent
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 6, 2023
1 parent 2332bc0 commit 3e57410
Showing 1 changed file with 39 additions and 21 deletions.
60 changes: 39 additions & 21 deletions cluster-endpoints/examples/stream_via_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ pub async fn main() {
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
// let grpc_addr = "http://147.28.169.13:10000".to_string();

// let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000);
let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::<ProducedBlock>(1000);
let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000);
// let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::<ProducedBlock>(1000);
let (block_sx_blue, blocks_notifier_blue) = tokio::sync::broadcast::channel(1000);

let grpc_x_token = None;
Expand Down Expand Up @@ -82,7 +82,7 @@ pub async fn main() {

// see also start procedure!
let mut current_tip = 0;
let mut slots_offered = HashSet::new();
let mut blocks_offered = Vec::<BlockRef>::new();
loop {

let timeout_secs = if current_tip == 0 { 3 } else { 10 };
Expand All @@ -93,20 +93,20 @@ pub async fn main() {
match msg_or_timeout {
Ok(Some(offer_block_msg)) => {
// collect the offered slots from the channels
if let OfferBlockMsg::NextSlot(label, slot_offered) = offer_block_msg {
info!("<< offered slot from {}: {:?}", label, slot_offered);
if let OfferBlockMsg::NextSlot(label, block_offered) = offer_block_msg {
info!("<< offered slot from {}: {:?}", label, block_offered);

// TOOD use .parent instead
if slot_offered == current_tip + 1 {
current_tip = slot_offered;
if block_offered.parent_slot == current_tip {
current_tip = block_offered.slot;
info!("<< take block from {} as new tip {}", label, current_tip);
assert_ne!(current_tip, 0, "must not see empty tip");
tx_tip.send(current_tip).unwrap();
slots_offered.clear();
blocks_offered.clear();
continue;
}

slots_offered.insert(slot_offered);
blocks_offered.push(block_offered);

}
// TODO handle else
Expand All @@ -119,25 +119,25 @@ pub async fn main() {
}
Err(_elapsed) => {
// timeout
info!("--> timeout: got these slots: {:?}", slots_offered);
info!("--> timeout: got these slots: {:?}", blocks_offered);

if current_tip == 0 {
let start_slot = slots_offered.iter().max().expect("need at least one slot to start");
current_tip = *start_slot;
let start_slot = blocks_offered.iter().max_by(|lhs,rhs| lhs.slot.cmp(&rhs.slot)).expect("need at least one slot to start");
current_tip = start_slot.slot;
assert_ne!(current_tip, 0, "must not see empty tip");
tx_tip.send(current_tip).unwrap();
info!("--> starting with tip {}", current_tip);
slots_offered.clear();
blocks_offered.clear();
continue;
}

match slots_offered.iter().filter(|s| **s == current_tip + 1).exactly_one() {
match blocks_offered.iter().filter(|b| b.parent_slot == current_tip).exactly_one() {
Ok(found_next) => {
current_tip = *found_next;
current_tip = found_next.slot;
assert_ne!(current_tip, 0, "must not see empty tip");
tx_tip.send(current_tip).unwrap();
info!("--> progressing tip to {}", current_tip);
slots_offered.clear();
blocks_offered.clear();
}
Err(missed) => {
warn!("--> no slots offered - SHOULD ABORT - no hope to progress");
Expand Down Expand Up @@ -170,9 +170,24 @@ pub async fn main() {
sleep(Duration::from_secs(1800)).await
}

#[derive(Clone, Debug)]
struct BlockRef {
pub slot: Slot,
pub parent_slot: Slot,
}

impl From<ProducedBlock> for BlockRef {
fn from(block: ProducedBlock) -> Self {
BlockRef {
slot: block.slot,
parent_slot: block.parent_slot,
}
}
}

#[derive(Debug)]
enum OfferBlockMsg {
NextSlot(String, Slot),
NextSlot(String, BlockRef),
}

fn start_progressor(label: String, blocks_notifier: Receiver<ProducedBlock>, mut rx_tip: tokio::sync::watch::Receiver<Slot>,
Expand All @@ -186,22 +201,25 @@ fn start_progressor(label: String, blocks_notifier: Receiver<ProducedBlock>, mut
let mut local_tip = 0;

// block after tip offered by this stream
let mut block_after_tip: Slot = 0;
let mut block_after_tip: BlockRef = BlockRef {
slot: 0,
parent_slot: 0,
};
'main_loop: loop {
select! {
_ = rx_tip.changed() => {
local_tip = rx_tip.borrow_and_update().clone();
info!("++> {} tip changed to {}", label, local_tip);
// TODO update local tip
}
recv_result = blocks_notifier.recv(), if !(block_after_tip > local_tip) => {
recv_result = blocks_notifier.recv(), if !(block_after_tip.slot > local_tip) => {
match recv_result {
Ok(block) => {
info!("=> recv on {}: {}",label, format_block(&block));
if block.slot > local_tip {
info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip);
block_after_tip = block.slot;
offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip)).await.unwrap();
block_after_tip = BlockRef::from(block);
offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap();
// this thread will sleep and not issue any recvs until we get tip.changed signal
continue 'main_loop;
}
Expand Down

0 comments on commit 3e57410

Please sign in to comment.