Skip to content

Commit

Permalink
Merge pull request #194 from blockworks-foundation/removing_unwanted_…
Browse files Browse the repository at this point in the history
…code

using semaphore in tpu connection manager and removing unwanted code
  • Loading branch information
godmodegalactus authored Sep 15, 2023
2 parents da94bec + 41d36a8 commit 7d36768
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 249 deletions.
309 changes: 142 additions & 167 deletions core/src/quic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@ use crate::{
quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils},
structures::rotating_queue::RotatingQueue,
};
use anyhow::bail;
use anyhow::Context;
use futures::FutureExt;
use log::warn;
use quinn::{Connection, Endpoint};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::VecDeque,
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::RwLock;
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};

pub type EndpointPool = RotatingQueue<Endpoint>;

#[derive(Clone)]
#[warn(clippy::rc_clone_in_vec_init)]
pub struct QuicConnection {
connection: Arc<RwLock<Connection>>,
connection: Arc<RwLock<Option<Connection>>>,
last_stable_id: Arc<AtomicU64>,
endpoint: Endpoint,
identity: Pubkey,
Expand All @@ -31,150 +32,135 @@ pub struct QuicConnection {
}

impl QuicConnection {
pub async fn new(
pub fn new(
identity: Pubkey,
endpoints: EndpointPool,
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<Self> {
let endpoint = endpoints
.get()
.await
.expect("endpoint pool is not suppose to be empty");
let connection = QuicConnectionUtils::connect(
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
last_stable_id: Arc::new(AtomicU64::new(0)),
endpoint,
identity,
false,
endpoint.clone(),
socket_address,
connection_params.connection_timeout,
connection_params.connection_retry_count,
exit_signal.clone(),
)
.await;

match connection {
Some(connection) => Ok(Self {
connection: Arc::new(RwLock::new(connection)),
last_stable_id: Arc::new(AtomicU64::new(0)),
endpoint,
identity,
socket_address,
connection_params,
exit_signal,
timeout_counters: Arc::new(AtomicU64::new(0)),
}),
None => {
bail!("Could not establish connection");
}
connection_params,
exit_signal,
timeout_counters: Arc::new(AtomicU64::new(0)),
}
}

async fn connect(&self) -> Option<Connection> {
QuicConnectionUtils::connect(
self.identity,
true,
self.endpoint.clone(),
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_signal.clone(),
)
.await
}

async fn get_connection(&self) -> Option<Connection> {
// get new connection reset if necessary
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
let conn = self.connection.read().await;
if conn.stable_id() == last_stable_id {
let current_stable_id = conn.stable_id();
// problematic connection
drop(conn);
let mut conn = self.connection.write().await;
// check may be already written by another thread
if conn.stable_id() != current_stable_id {
Some(conn.clone())
} else {
let new_conn = QuicConnectionUtils::connect(
self.identity,
true,
self.endpoint.clone(),
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_signal.clone(),
)
.await;
if let Some(new_conn) = new_conn {
*conn = new_conn;
Some(conn.clone())
let conn = self.connection.read().await.clone();
match conn {
Some(connection) => {
if connection.stable_id() == last_stable_id {
let current_stable_id = connection.stable_id();
// problematic connection
let mut conn = self.connection.write().await;
let connection = conn.clone().expect("Connection cannot be None here");
// check may be already written by another thread
if connection.stable_id() != current_stable_id {
Some(connection)
} else {
let new_conn = self.connect().await;
if let Some(new_conn) = new_conn {
*conn = Some(new_conn);
conn.clone()
} else {
// could not connect
None
}
}
} else {
// could not connect
None
Some(connection.clone())
}
}
} else {
Some(conn.clone())
None => {
let connection = self.connect().await;
*self.connection.write().await = connection.clone();
connection
}
}
}

pub async fn send_transaction_batch(&self, txs: Vec<Vec<u8>>) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);
}
pub async fn send_transaction(&self, tx: Vec<u8>) {
let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count {
if queue.is_empty() || self.exit_signal.load(Ordering::Relaxed) {
if self.exit_signal.load(Ordering::Relaxed) {
// return
return;
}

let mut do_retry = false;
while !queue.is_empty() {
let tx = queue.pop_front().unwrap();
let connection = self.get_connection().await;
let connection = self.get_connection().await;

if self.exit_signal.load(Ordering::Relaxed) {
return;
}
if self.exit_signal.load(Ordering::Relaxed) {
return;
}

if let Some(connection) = connection {
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(
connection,
self.connection_params.unistream_timeout,
)
.await
{
Ok(send_stream) => {
match QuicConnectionUtils::write_all(
send_stream,
&tx,
self.identity,
self.connection_params,
)
.await
{
Ok(()) => {
// do nothing
}
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
}
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
if let Some(connection) = connection {
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(
connection,
self.connection_params.unistream_timeout,
)
.await
{
Ok(send_stream) => {
match QuicConnectionUtils::write_all(
send_stream,
&tx,
self.identity,
self.connection_params,
)
.await
{
Ok(()) => {
// do nothing
}
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
}
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
}
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
}
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
}
if do_retry {
self.last_stable_id
.store(current_stable_id, Ordering::Relaxed);
queue.push_back(tx);
break;
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
}
} else {
warn!(
"Could not establish connection with {}",
self.identity.to_string()
);
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
}
if do_retry {
self.last_stable_id
.store(current_stable_id, Ordering::Relaxed);
break;
}
} else {
warn!(
"Could not establish connection with {}",
self.identity.to_string()
);
break;
}
if !do_retry {
break;
Expand All @@ -193,12 +179,15 @@ impl QuicConnection {

#[derive(Clone)]
pub struct QuicConnectionPool {
connections: RotatingQueue<QuicConnection>,
connection_parameters: QuicConnectionParameters,
endpoints: EndpointPool,
identity: Pubkey,
socket_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
connections: Vec<QuicConnection>,
// counting semaphore is ideal way to manage backpressure on the connection
// because a connection can create only N unistream connections
transactions_in_sending_semaphore: Vec<Arc<Semaphore>>,
}

pub struct PooledConnection {
pub connection: QuicConnection,
pub permit: OwnedSemaphorePermit,
}

impl QuicConnectionPool {
Expand All @@ -208,60 +197,46 @@ impl QuicConnectionPool {
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
let connections = RotatingQueue::new_empty();
let mut connections = vec![];
// should not clone connection each time but create a new one
for _ in 0..nb_connection {
connections.push(QuicConnection::new(
identity,
endpoints.get().expect("Should get and endpoint"),
socket_address,
connection_parameters,
exit_signal.clone(),
));
}
Self {
connections,
identity,
endpoints,
socket_address,
connection_parameters,
exit_signal,
transactions_in_sending_semaphore: {
// should create a new semaphore each time so avoid vec[elem;count]
let mut v = Vec::with_capacity(nb_connection);
(0..nb_connection).for_each(|_| {
v.push(Arc::new(Semaphore::new(max_number_of_unistream_connection)))
});
v
},
}
}

pub async fn send_transaction_batch(&self, txs: Vec<Vec<u8>>) {
let connection = match self.connections.get().await {
Some(connection) => connection,
None => {
let new_connection = QuicConnection::new(
self.identity,
self.endpoints.clone(),
self.socket_address,
self.connection_parameters,
self.exit_signal.clone(),
)
.await;
if new_connection.is_err() {
return;
}
let new_connection = new_connection.expect("Cannot establish a connection");
self.connections.add(new_connection.clone()).await;
new_connection
}
};

connection.send_transaction_batch(txs).await;
}

pub async fn add_connection(&self) {
let new_connection = QuicConnection::new(
self.identity,
self.endpoints.clone(),
self.socket_address,
self.connection_parameters,
self.exit_signal.clone(),
pub async fn get_pooled_connection(&self) -> anyhow::Result<PooledConnection> {
let (permit, index, _others) = futures::future::select_all(
self.transactions_in_sending_semaphore
.iter()
.map(|x| x.clone().acquire_owned().boxed()),
)
.await;
if let Ok(new_connection) = new_connection {
self.connections.add(new_connection).await;
}
}

pub async fn remove_connection(&self) {
if !self.connections.is_empty() {
self.connections.remove().await;
}
drop(_others);
let permit = permit.context("Cannot aquire permit, connection pool erased")?;
Ok(PooledConnection {
connection: self.connections[index].clone(),
permit,
})
}

pub fn len(&self) -> usize {
Expand Down
Loading

0 comments on commit 7d36768

Please sign in to comment.