Skip to content

Commit

Permalink
bugfixes multiplexing grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Sep 22, 2023
1 parent 2278e32 commit a84db9f
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ async-channel = { workspace = true }
solana-lite-rpc-core = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }
itertools = {workspace = true}
itertools = {workspace = true}
lazy_static = { workspace = true }
prometheus = { workspace = true }
19 changes: 10 additions & 9 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub fn create_block_processing_task(
}

pub fn create_grpc_subscription(
rpc_client: Arc<RpcClient>,
rpc_client: Option<Arc<RpcClient>>,
grpc_addr: String,
expected_grpc_version: String,
) -> anyhow::Result<(EndpointStreaming, Vec<AnyhowJoinHandle>)> {
Expand Down Expand Up @@ -380,21 +380,22 @@ pub fn create_grpc_subscription(
let block_finalized_task: AnyhowJoinHandle =

Check warning on line 380 in cluster-endpoints/src/grpc_subscription.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_subscription.rs

Check warning on line 380 in cluster-endpoints/src/grpc_subscription.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_subscription.rs
create_block_processing_task(grpc_addr, block_sx, CommitmentLevel::Finalized);

let cluster_info_polling =
let mut endpoint_tasks = vec![
slot_task,
block_confirmed_task,
block_finalized_task,
];
if let Some(rpc_client) = rpc_client {
let cluster_info_polling =
poll_vote_accounts_and_cluster_info(rpc_client, cluster_info_sx, va_sx);
endpoint_tasks.push(cluster_info_polling);
}

let streamers = EndpointStreaming {
blocks_notifier,
slot_notifier,
cluster_info_notifier,
vote_account_notifier,
};

let endpoint_tasks = vec![
slot_task,
block_confirmed_task,
block_finalized_task,
cluster_info_polling,
];
Ok((streamers, endpoint_tasks))
}
34 changes: 27 additions & 7 deletions cluster-endpoints/src/multiplexing_subscriptions.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
use crate::endpoint_stremers::EndpointStreaming;
use anyhow::Context;
use prometheus::{opts, register_gauge, Gauge};
use solana_lite_rpc_core::{commitment_utils::Commitment, AnyhowJoinHandle};
use solana_sdk::slot_history::Slot;
use std::collections::BTreeSet;

lazy_static::lazy_static! {
static ref GRPC_SLOT_UPDATE: Gauge =
register_gauge!(opts!("literpc_rpc_slot_update_from_grpc", "Is slot updated by grpc notification")).unwrap();

static ref GRPC_BLOCK_UPDATE: Gauge =
register_gauge!(opts!("literpc_rpc_block_update_from_grpc", "Is block updated by grpc notification")).unwrap();
}

const NB_BLOCKS_TO_CACHE: usize = 1024;

pub fn multiplexing_endstreams(
Expand All @@ -20,17 +29,17 @@ pub fn multiplexing_endstreams(
let mut processed_slot = 0;
let mut estimated_slot = 0;
loop {
let notification = tokio::select! {
let (notification, is_grpc_update) = tokio::select! {
rpc_slot = rpc_slot_notifier.recv() => {
if let Ok(slot_notification) = rpc_slot {
slot_notification
(slot_notification, false)
} else {
continue;
}
},
grpc_slot = grpc_slot_notifier.recv() => {
if let Ok(slot_notification) = grpc_slot {
slot_notification
(slot_notification, true)
} else {
continue;
}
Expand All @@ -40,6 +49,11 @@ pub fn multiplexing_endstreams(
if notification.processed_slot > processed_slot
|| notification.estimated_processed_slot > estimated_slot
{
if is_grpc_update {
GRPC_SLOT_UPDATE.set(1.0);
} else {
GRPC_SLOT_UPDATE.set(0.0);
}
processed_slot = notification.processed_slot;
estimated_slot = notification.estimated_processed_slot;
slot_sx.send(notification).context("send channel broken")?;
Expand All @@ -52,24 +66,30 @@ pub fn multiplexing_endstreams(
let block_multiplexer: AnyhowJoinHandle = tokio::spawn(async move {
let mut block_notified = BTreeSet::<(Slot, Commitment)>::new();
loop {
let block = tokio::select! {
let (block, is_grpc_update) = tokio::select! {
block_notification = rpc_block_notifier.recv() => {
if let Ok(block) = block_notification {
block
(block, false)
} else {
continue;
}
},
block_notification = grpc_block_notifier.recv() => {
if let Ok(block) = block_notification {
block
(block, true)
} else {
continue;
}
}
};
let key = (block.slot, block.commitment_config.into());
if block_notified.contains(&key) {
if !block_notified.contains(&key) {
if is_grpc_update {
GRPC_BLOCK_UPDATE.set(1.0);
} else {
GRPC_BLOCK_UPDATE.set(0.0);
}

block_notified.insert(key);
if block_notified.len() > NB_BLOCKS_TO_CACHE {
block_notified.pop_first();
Expand Down
2 changes: 1 addition & 1 deletion cluster-endpoints/src/rpc_polling/poll_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub fn poll_slots(
}
Ok(None) => log::error!("got nothing from slot update notifier"),
Err(err) => {
log::warn!("failed to receive slot update: {err}");
log::trace!("failed to receive slot update: {err}");
// force update the slot
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
Expand Down
6 changes: 3 additions & 3 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);

Check warning on line 108 in lite-rpc/src/main.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/lite-rpc/src/main.rs

Check warning on line 108 in lite-rpc/src/main.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/lite-rpc/src/main.rs

let (subscriptions, cluster_endpoint_tasks) = if use_grpc {
create_grpc_subscription(rpc_client.clone(), grpc_addr, GRPC_VERSION.to_string())?
create_grpc_subscription(Some(rpc_client.clone()), grpc_addr, GRPC_VERSION.to_string())?
} else if multiplex_rpc_grpc {
let (grpc_subscription, mut grpc_tasks) =
create_grpc_subscription(rpc_client.clone(), grpc_addr, GRPC_VERSION.to_string())?;
let (rpc_subscription, mut rpc_tasks) =
create_json_rpc_polling_subscription(rpc_client.clone())?;
let (grpc_subscription, mut grpc_tasks) =
create_grpc_subscription(None, grpc_addr, GRPC_VERSION.to_string())?;
let (multiplexed_endpoint, mut tasks) =
multiplexing_endstreams(rpc_subscription, grpc_subscription)?;
tasks.append(&mut grpc_tasks);
Expand Down

0 comments on commit a84db9f

Please sign in to comment.