Skip to content

Commit

Permalink
select multiple streams
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 8, 2023
1 parent b7ae926 commit c64511c
Showing 1 changed file with 104 additions and 94 deletions.
198 changes: 104 additions & 94 deletions cluster-endpoints/examples/grpc_using_streams.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::ops::{Add, Deref};
use std::ops::{Add, Deref, Sub};
use std::path::PathBuf;
use std::pin::pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::thread;
use anyhow::{bail, Context};
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use futures::stream::FuturesUnordered;
use itertools::{ExactlyOneError, Itertools};

use log::{debug, error, info, warn};
Expand All @@ -20,97 +21,112 @@ use tokio::{select};
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::{sleep, Duration, timeout, Instant, sleep_until};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::tonic::Status;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;

use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block};
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;

#[tokio::main]
// #[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() {
// RUST_LOG=info,grpc_using_streams=debug
tracing_subscriber::fmt::init();

// TODO remove
// console_subscriber::init();

// mango validator (mainnet)
// let grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string();
let grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string();
// via toxiproxy
let grpc_addr_mainnet_triton = "http://127.0.0.1:10001".to_string();
// let grpc_addr_mainnet_triton = "http://127.0.0.1:10001".to_string();
// ams81 (mainnet)
let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000".to_string();
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
// let grpc_addr = "http://147.28.169.13:10000".to_string();


let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(1000);
create_multiplex(grpc_addr_mainnet_triton, grpc_addr_mainnet_ams81, block_sx).await;

let green_config = GrpcSourceConfig::new("green-toxiproxy".to_string(), grpc_addr_mainnet_triton, None);
let blue_config = GrpcSourceConfig::new("blue".to_string(), grpc_addr_mainnet_ams81, None);

create_multiplex(vec![green_config, blue_config], CommitmentConfig::confirmed(), block_sx);

start_example_consumer(blocks_notifier);

// "infinite" sleep
sleep(Duration::from_secs(1800)).await;

}

fn start_example_consumer(blocks_notifier: Receiver<ProducedBlock>) {
tokio::spawn(async move {
let mut blocks_notifier = blocks_notifier;
loop {
let block = blocks_notifier.recv().await.unwrap();
info!("received block #{} with {} txs", block.slot, block.transactions.len());
}
});


// "infinite" sleep
sleep(Duration::from_secs(1800)).await;

}

async fn create_multiplex(
grpc_addr_mainnet_triton: String,
grpc_addr_mainnet_ams81: String,
fn create_multiplex(
grpc_sources: Vec<GrpcSourceConfig>,
commitment_config: CommitmentConfig,
block_sx: Sender<ProducedBlock>,
) -> JoinHandle<()> {

// TODO
let commitment_config = CommitmentConfig::confirmed();
if grpc_sources.len() < 1 {
panic!("Must have at least one source");
}

let jh = tokio::spawn(async move {
info!("Starting multiplexer with {} sources: {}",
grpc_sources.len(),
grpc_sources.iter().map(|source| source.label.clone()).join(", "));

let mut futures = futures::stream::SelectAll::new();
// pin_mut!(futures);
for grpc_source in grpc_sources {

let stream = create_geyser_reconnecting_stream(grpc_source.clone()).await;
// futures.push(stream);
futures.push(Box::pin(stream));
}

let mut green = create_geyser_stream2("green-toxiproxy".to_string(), grpc_addr_mainnet_triton.clone(), None).await;
let mut blue = create_geyser_stream2("blue".to_string(), grpc_addr_mainnet_ams81.clone(), None).await;
pin_mut!(green);
pin_mut!(blue);
let mut green = pin!(green.next());
let mut blue = pin!(blue.next());

let mut current_slot = 0 as Slot;

// pin_mut!(futures);


// for stream in streams {
// // pin!(stream);
// futures.push(stream);
// }


let mut current_slot: Slot = 0;

'main_loop: loop {

let block_cmd =
select!(
message = &mut green => {
match message {
Some(message) => {
map_filter_block_message(current_slot, message, commitment_config)
}
None => {
panic!("must not close the stream");
}
}
// select streams

},
message = &mut blue => {
match message {
Some(message) => {
map_filter_block_message(current_slot, message, commitment_config)
}
None => {
panic!("must not close the stream");
}
let block_cmd = select! {
message = futures.next() => {
match message {
Some(message) => {
map_filter_block_message(current_slot, message, commitment_config)
}
None => {
panic!("must not close the stream");
}
}
);
}
};

match block_cmd {
BlockCmd::ForwardBlock(block) => {
Expand Down Expand Up @@ -141,7 +157,7 @@ enum BlockCmd {
SkipMessage,
}

fn map_filter_block_message(current_slot: Slot, update_message: SubscribeUpdate, commitment_config: CommitmentConfig,) -> BlockCmd {
fn map_filter_block_message(current_slot: Slot, update_message: SubscribeUpdate, commitment_config: CommitmentConfig) -> BlockCmd {
if let Some(UpdateOneof::Block(update_block_message)) = update_message.update_oneof {
if update_block_message.slot <= current_slot && current_slot != 0 {
// no progress - skip this
Expand All @@ -158,39 +174,30 @@ fn map_filter_block_message(current_slot: Slot, update_message: SubscribeUpdate,

}

async fn create_geyser_stream(grpc_addr: String, x_token: Option<String>) -> impl Stream<Item = Result<SubscribeUpdate, Status>> {
let mut client = GeyserGrpcClient::connect(grpc_addr, x_token, None).unwrap();

let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

let stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(CommitmentLevel::Confirmed),
Default::default(),
None,
).await.unwrap();
// TODO fix unwrap

// TODO pull tonic error handling inside this method
return stream;
#[derive(Clone, Debug)]
struct GrpcSourceConfig {
// symbolic name used in logs
label: String,
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
}

impl GrpcSourceConfig {
fn new(label: String, grpc_addr: String, grpc_x_token: Option<String>) -> Self {
Self {
label,
grpc_addr,
grpc_x_token,
tls_config: None,
}
}
}

async fn create_geyser_stream2(label: String, grpc_addr: String, x_token: Option<String>) -> impl Stream<Item = SubscribeUpdate> {
// TODO use GrpcSource
// note: stream never terminates
async fn create_geyser_reconnecting_stream(grpc_source: GrpcSourceConfig) -> impl Stream<Item = SubscribeUpdate> {
let label = grpc_source.label.clone();
stream! {
let mut throttle_barrier = Instant::now();
'main_loop: loop {
Expand All @@ -199,17 +206,19 @@ async fn create_geyser_stream2(label: String, grpc_addr: String, x_token: Option

// throws e.g. InvalidUri(InvalidUri(InvalidAuthority))
// GeyserGrpcClientError
// TODO extract parameters
let connect_result = GeyserGrpcClient::connect_with_timeout(
grpc_addr.clone(), x_token.clone(), None,
grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(),
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;

if let Err(client_connect_error) = connect_result {
// TODO identify non-recoverable errors and cancel stream
warn!("Connect failed on {} - retrying: {:?}", label, client_connect_error);
continue 'main_loop;
}

let mut client = connect_result.unwrap();
let mut client = match connect_result {
Ok(connected_client) => connected_client,
Err(geyser_grpc_client_error) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Connect failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
continue 'main_loop;
}
};

let mut blocks_subs = HashMap::new();
blocks_subs.insert(
Expand All @@ -235,15 +244,16 @@ async fn create_geyser_stream2(label: String, grpc_addr: String, x_token: Option
None,
).await;

if let Err(subscribe_error) = subscribe_result {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, subscribe_error);
continue 'main_loop;
}

let stream = subscribe_result.unwrap();
let geyser_stream = match subscribe_result {
Ok(subscribed_stream) => subscribed_stream,
Err(geyser_grpc_client_error) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
continue 'main_loop;
}
};

for await update_message in stream {
for await update_message in geyser_stream {
match update_message {
Ok(update_message) => {
info!(">message on {}", label);
Expand Down

0 comments on commit c64511c

Please sign in to comment.