Skip to content

Commit

Permalink
Swapping notify channel with broadcast channel
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Apr 2, 2024
1 parent ca3fa46 commit 12a6832
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 35 deletions.
7 changes: 4 additions & 3 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use solana_transaction_status::{Reward, RewardType};
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::sync::{broadcast, Notify};
use tracing::trace_span;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
Expand Down Expand Up @@ -278,7 +278,7 @@ pub fn create_block_processing_task(
mut exit_notify: broadcast::Receiver<()>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
'main_loop: loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"block_client".to_string(),
Expand All @@ -293,7 +293,7 @@ pub fn create_block_processing_task(
// connect to grpc
let mut client =
connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?;
let mut stream = tokio::select! {
let mut stream = tokio::select! {
res = client
.subscribe_once(
HashMap::new(),
Expand Down Expand Up @@ -354,6 +354,7 @@ pub fn create_block_processing_task(
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
})
}

Expand Down
32 changes: 15 additions & 17 deletions services/src/quic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
Arc,
},
};
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock, Semaphore};

pub type EndpointPool = RotatingQueue<Endpoint>;

Expand All @@ -40,7 +40,6 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
Expand All @@ -51,7 +50,6 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
Expand All @@ -60,26 +58,29 @@ impl QuicConnection {
identity,
socket_address,
connection_params,
exit_notify,
timeout_counters: Arc::new(AtomicU64::new(0)),
has_connected_once: Arc::new(AtomicBool::new(false)),
}
}

async fn connect(&self, is_already_connected: bool) -> Option<Connection> {
async fn connect(
&self,
is_already_connected: bool,
exit_notify: broadcast::Receiver<()>,
) -> Option<Connection> {
QuicConnectionUtils::connect(
self.identity,
is_already_connected,
self.endpoint.clone(),
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_notify.clone(),
exit_notify,
)
.await
}

pub async fn get_connection(&self) -> Option<Connection> {
pub async fn get_connection(&self, exit_notify: broadcast::Receiver<()>) -> Option<Connection> {
// get new connection reset if necessary
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
let conn = self.connection.read().await.clone();
Expand All @@ -95,7 +96,7 @@ impl QuicConnection {
Some(connection)
} else {
NB_QUIC_CONNECTION_RESET.inc();
let new_conn = self.connect(true).await;
let new_conn = self.connect(true, exit_notify).await;
if let Some(new_conn) = new_conn {
*conn = Some(new_conn);
conn.clone()
Expand All @@ -116,25 +117,24 @@ impl QuicConnection {
// connection has recently been established/ just use it
return (*lk).clone();
}
let connection = self.connect(false).await;
let connection = self.connect(false, exit_notify).await;
*lk = connection.clone();
self.has_connected_once.store(true, Ordering::Relaxed);
connection
}
}
}

pub async fn send_transaction(&self, tx: &Vec<u8>) {
pub async fn send_transaction(&self, tx: &Vec<u8>, mut exit_notify: broadcast::Receiver<()>) {
let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count {
let mut do_retry = false;
let exit_notify = self.exit_notify.clone();

let connection = tokio::select! {
conn = self.get_connection() => {
conn = self.get_connection(exit_notify.resubscribe()) => {
conn
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
Expand All @@ -149,7 +149,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
Expand All @@ -164,7 +164,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
Expand Down Expand Up @@ -247,7 +247,6 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_notify: Arc<Notify>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
Expand All @@ -259,7 +258,6 @@ impl QuicConnectionPool {
endpoints.get().expect("Should get and endpoint"),
socket_address,
connection_parameters,
exit_notify.clone(),
));
}
Self {
Expand Down
8 changes: 4 additions & 4 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::{sync::Notify, time::timeout};
use tokio::{sync::broadcast, time::timeout};

lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
Expand Down Expand Up @@ -221,7 +221,7 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_notified: Arc<Notify>,
mut exit_notified: broadcast::Receiver<()>,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
Expand All @@ -230,7 +230,7 @@ impl QuicConnectionUtils {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.recv() => {
break;
}
}
Expand All @@ -240,7 +240,7 @@ impl QuicConnectionUtils {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.recv() => {
break;
}
}
Expand Down
24 changes: 13 additions & 11 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{Receiver, Sender},
broadcast::{self, Receiver, Sender},
Notify,
};

Expand Down Expand Up @@ -48,7 +48,7 @@ struct ActiveConnection {
tpu_address: SocketAddr,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: Arc<Notify>,
exit_notifier: broadcast::Sender<()>,
}

impl ActiveConnection {
Expand All @@ -59,13 +59,14 @@ impl ActiveConnection {
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
) -> Self {
let (exit_notifier, _) = broadcast::channel(1);
Self {
endpoints,
tpu_address,
identity,
data_cache,
connection_parameters,
exit_notifier: Arc::new(Notify::new()),
exit_notifier,
}
}

Expand All @@ -79,7 +80,6 @@ impl ActiveConnection {
let fill_notify = Arc::new(Notify::new());

let identity = self.identity;
let exit_notifier = self.exit_notifier.clone();

NB_QUIC_ACTIVE_CONNECTIONS.inc();

Expand All @@ -95,7 +95,6 @@ impl ActiveConnection {
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_notifier.clone(),
max_number_of_connections,
max_uni_stream_connections,
);
Expand All @@ -109,7 +108,7 @@ impl ActiveConnection {
let priorization_heap = priorization_heap.clone();
let data_cache = self.data_cache.clone();
let fill_notify = fill_notify.clone();
let exit_notifier = exit_notifier.clone();
let mut exit_notifier = self.exit_notifier.subscribe();
tokio::spawn(async move {
let mut current_blockheight =
data_cache.block_information_store.get_last_blockheight();
Expand All @@ -118,7 +117,7 @@ impl ActiveConnection {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.notified() => {
_ = exit_notifier.recv() => {
break;
}
};
Expand Down Expand Up @@ -165,12 +164,14 @@ impl ActiveConnection {
if let Ok(PooledConnection { connection, permit }) =
connection_pool.get_pooled_connection().await
{
let exit_notifier = self.exit_notifier.subscribe();
tokio::task::spawn(async move {
let _permit = permit;
connection.get_connection().await;
connection.get_connection(exit_notifier).await;
});
};

let mut exit_notifier = self.exit_notifier.subscribe();
'main_loop: loop {
tokio::select! {
_ = fill_notify.notified() => {
Expand All @@ -197,6 +198,7 @@ impl ActiveConnection {
break;
},
};
let exit_notifier = self.exit_notifier.subscribe();

tokio::spawn(async move {
// permit will be used to send all the transaction and then destroyed
Expand All @@ -205,13 +207,13 @@ impl ActiveConnection {

NB_QUIC_TASKS.inc();

connection.send_transaction(tx.transaction.as_ref()).await;
connection.send_transaction(tx.transaction.as_ref(), exit_notifier).await;
timer.observe_duration();
NB_QUIC_TASKS.dec();
});
}
},
_ = exit_notifier.notified() => {
_ = exit_notifier.recv() => {
break 'main_loop;
}
}
Expand Down Expand Up @@ -289,7 +291,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value.exit_notifier.notify_waiters();
let _ = value.exit_notifier.send(());
false
} else {
true
Expand Down

0 comments on commit 12a6832

Please sign in to comment.