Skip to content

Commit

Permalink
set stake_vote optional and integrate leader schedule and vote in sen…
Browse files Browse the repository at this point in the history
…d Tx
  • Loading branch information
musitdev committed Dec 27, 2023
1 parent 8378d1c commit f0a12d5
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 40 deletions.
74 changes: 74 additions & 0 deletions cluster-endpoints/src/grpc_leaders_getter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use anyhow::{bail, Error};
use async_trait::async_trait;
use solana_lite_rpc_core::structures::epoch::EpochCache;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::{
structures::leader_data::LeaderData, traits::leaders_fetcher_interface::LeaderFetcherInterface,
};
use std::sync::Arc;
use tokio::sync::RwLock;

// Stores leaders for slots from older to newer in leader schedule
// regularly removed old leaders and adds new ones
pub struct GrpcLeaderGetter {
epoch_data: EpochCache,
leader_schedule: Arc<RwLock<CalculatedSchedule>>,
}

impl GrpcLeaderGetter {
pub fn new(leader_schedule: Arc<RwLock<CalculatedSchedule>>, epoch_data: EpochCache) -> Self {
Self {
leader_schedule,
epoch_data,
}
}
}

#[async_trait]
impl LeaderFetcherInterface for GrpcLeaderGetter {
async fn get_slot_leaders(
&self,
from: solana_sdk::slot_history::Slot,
to: solana_sdk::slot_history::Slot,
) -> anyhow::Result<Vec<LeaderData>> {
//get epoch of from/to slot to see if they're in the current stored epoch.
let from_epoch = self.epoch_data.get_epoch_at_slot(from).epoch;
let to_epoch = self.epoch_data.get_epoch_at_slot(to).epoch;
let leader_schedule_data = self.leader_schedule.read().await;
let current_epoch = leader_schedule_data
.current
.as_ref()
.map(|e| e.epoch)
.unwrap_or(from_epoch);
let next_epoch = leader_schedule_data
.current
.as_ref()
.map(|e| e.epoch)
.unwrap_or(to_epoch);
if from > to {
bail!("invalid arguments for get_slot_leaders");
}
if from_epoch < current_epoch || from_epoch > next_epoch {
bail!("invalid arguments for get_slot_leaders");
}
if to_epoch < current_epoch || to_epoch > next_epoch {
bail!("invalid arguments for get_slot_leaders");
}

let limit = to - from;

let schedule = leader_schedule_data
.get_slot_leaders(from, limit, self.epoch_data.get_epoch_schedule())
.await
.map_err(Error::msg)?;

Ok(schedule
.into_iter()
.enumerate()
.map(|(index, pubkey)| LeaderData {
leader_slot: from + index as u64,
pubkey,
})
.collect())
}
}
1 change: 1 addition & 0 deletions cluster-endpoints/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod endpoint_stremers;
pub mod grpc_leaders_getter;
pub mod grpc_subscription;
pub mod json_rpc_leaders_getter;
pub mod json_rpc_subscription;
Expand Down
2 changes: 1 addition & 1 deletion cluster-endpoints/src/rpc_polling/poll_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{structures::slot_notification::SlotNotification, AnyhowJoinHandle};
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use tokio::sync::broadcast::Sender;
const AVERAGE_SLOT_CHANGE_TIME: Duration = Duration::from_millis(400);
const AVERAGE_SLOT_CHANGE_TIME: Duration = Duration::from_millis(800);

pub async fn poll_commitment_slots(
rpc_client: Arc<RpcClient>,
Expand Down
3 changes: 2 additions & 1 deletion config.example.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"rpc_addr": "http://0.0.0.0:8899",
"ws_addr": "ws://0.0.0.0:8900",
"lite_rpc_http_addr": "http://0.0.0.0:8890",
"lite_rpc_http_addr": "[::]:8890",
"lite_rpc_ws_addr": "[::]:8891",
"fanout_size": 18,
"identity_keypair": null,
Expand All @@ -10,6 +10,7 @@
"transaction_retry_after_secs": 3,
"quic_proxy_addr": null,
"use_grpc": false,
"activate_leader_schedule": false,
"grpc_addr": "http://127.0.0.0:10000",
"grpc_x_token": null,
"postgres": {
Expand Down
16 changes: 16 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"rpc_addr": "http://localhost:8899",
"ws_addr": "ws://localhost:8900",
"lite_rpc_http_addr": "[::]:8890",
"lite_rpc_ws_addr": "[::]:8891",
"fanout_size": 18,
"identity_keypair": null,
"prometheus_addr": "[::]:9091",
"maximum_retries_per_tx": 40,
"transaction_retry_after_secs": 3,
"quic_proxy_addr": null,
"use_grpc": false,
"activate_leader_schedule": false,
"grpc_addr": "http://127.0.0.0:10000",
"grpc_x_token": null
}
43 changes: 27 additions & 16 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ pub struct LiteBridge {
rpc_client: Arc<RpcClient>,
transaction_service: TransactionService,
history: History,
state_vote_sendder: tokio::sync::mpsc::Sender<(
GetVoteAccountsConfig,
tokio::sync::oneshot::Sender<RpcVoteAccountStatus>,
)>,
state_vote_sendder: Option<
tokio::sync::mpsc::Sender<(
GetVoteAccountsConfig,
tokio::sync::oneshot::Sender<RpcVoteAccountStatus>,
)>,
>,
}

impl LiteBridge {
Expand All @@ -77,10 +79,12 @@ impl LiteBridge {
data_cache: DataCache,
transaction_service: TransactionService,
history: History,
state_vote_sendder: tokio::sync::mpsc::Sender<(
GetVoteAccountsConfig,
oneshot::Sender<RpcVoteAccountStatus>,
)>,
state_vote_sendder: Option<
tokio::sync::mpsc::Sender<(
GetVoteAccountsConfig,
oneshot::Sender<RpcVoteAccountStatus>,
)>,
>,
) -> Self {
Self {
rpc_client,
Expand Down Expand Up @@ -507,14 +511,21 @@ impl LiteRpcServer for LiteBridge {
) -> crate::rpc::Result<RpcVoteAccountStatus> {
let config: GetVoteAccountsConfig =
GetVoteAccountsConfig::try_from(config.unwrap_or_default()).unwrap_or_default();
let (tx, rx) = oneshot::channel();
if let Err(err) = self.state_vote_sendder.send((config, tx)).await {
return Err(jsonrpsee::core::Error::Custom(format!(
"error during query processing:{err}",
)));
if let Some(state_vote_sendder) = &self.state_vote_sendder {
let (tx, rx) = oneshot::channel();
if let Err(err) = state_vote_sendder.send((config, tx)).await {
return Err(jsonrpsee::core::Error::Custom(format!(
"error during query processing:{err}",
)));
}
rx.await.map_err(|err| {
jsonrpsee::core::Error::Custom(format!("error during query processing:{err}"))
})
} else {
self.rpc_client
.get_vote_accounts()
.await
.map_err(|err| (jsonrpsee::core::Error::Custom(err.to_string())))
}
rx.await.map_err(|err| {
jsonrpsee::core::Error::Custom(format!("error during query processing:{err}"))
})
}
}
2 changes: 2 additions & 0 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct Config {
pub quic_proxy_addr: Option<String>,
#[serde(default)]
pub use_grpc: bool,
#[serde(default)]
pub activate_leader_schedule: bool,
#[serde(default = "Config::default_grpc_addr")]
pub grpc_addr: String,
#[serde(default)]
Expand Down
69 changes: 52 additions & 17 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use lite_rpc::postgres_logger::PostgresLogger;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION};
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::grpc_leaders_getter::GrpcLeaderGetter;
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter;
use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription;
Expand All @@ -26,6 +27,7 @@ use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
produced_block::ProducedBlock,
};
use solana_lite_rpc_core::traits::leaders_fetcher_interface::LeaderFetcherInterface;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore;
Expand Down Expand Up @@ -92,6 +94,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
transaction_retry_after_secs,
quic_proxy_addr,
use_grpc,
activate_leader_schedule,
grpc_addr,
grpc_x_token,
..
Expand Down Expand Up @@ -127,6 +130,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;

let epoch_data = EpochCache::bootstrap_epoch(&rpc_client).await?;
let slots_per_epoch = epoch_data.get_epoch_schedule().slots_per_epoch;

let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
Expand All @@ -144,13 +148,13 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
};

let lata_cache_service = DataCachingService {
let data_cache_service = DataCachingService {
data_cache: data_cache.clone(),
clean_duration: Duration::from_secs(120),
};

// to avoid laggin we resubscribe to block notification
let data_caching_service = lata_cache_service.listen(
let data_caching_service = data_cache_service.listen(
blocks_notifier.resubscribe(),
slot_notifier.resubscribe(),
cluster_info_notifier,
Expand Down Expand Up @@ -179,7 +183,52 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
prometheus_addr,
data_cache: data_cache.clone(),
};
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));

//init grpc leader schedule and vote account is configured.
let (leader_schedule, rpc_stakes_send): (Arc<dyn LeaderFetcherInterface>, Option<_>) =
if use_grpc && activate_leader_schedule {
//init leader schedule grpc process.
//1) get stored schedule and stakes
if let Some((leader_schedule, vote_stakes)) =
solana_lite_rpc_stakevote::bootstrap_leaderschedule_from_files(slots_per_epoch)
{
data_cache
.identity_stakes
.update_stakes_for_identity(vote_stakes)
.await;
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = leader_schedule;
}
//2) start stake vote and leader schedule.
let (rpc_stakes_send, rpc_stakes_recv) = mpsc::channel(1000);
let stake_vote_jh = solana_lite_rpc_stakevote::start_stakes_and_votes_loop(
data_cache.clone(),
slot_notifier.resubscribe(),
rpc_stakes_recv,
Arc::clone(&rpc_client),
grpc_addr,
)
.await?;

//
tokio::spawn(async move {
let err = stake_vote_jh.await;
log::error!("Vote and stake Services exit with error: {err:?}");
});

(
Arc::new(GrpcLeaderGetter::new(
Arc::clone(&data_cache.leader_schedule),
data_cache.epoch_data.clone(),
)),
Some(rpc_stakes_send),
)
} else {
(
Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128)),
None,
)
};

let tpu_service: TpuService = TpuService::new(
tpu_config,
Expand All @@ -201,17 +250,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
slot_notifier.resubscribe(),
);

//start stake vote and leader schedule.
let (rpc_stakes_send, rpc_stakes_recv) = mpsc::channel(1000);
let stake_vote_jh = solana_lite_rpc_stakevote::start_stakes_and_votes_loop(
data_cache.clone(),
slot_notifier.resubscribe(),
rpc_stakes_recv,
Arc::clone(&rpc_client),
grpc_addr,
)
.await?;

drop(slot_notifier);

let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });
Expand All @@ -231,9 +269,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
.start(lite_rpc_http_addr, lite_rpc_ws_addr),
);
tokio::select! {
res = stake_vote_jh => {
anyhow::bail!("Stakes Votes Services {res:?}")
}
res = tx_service_jh => {
anyhow::bail!("Tx Services {res:?}")
}
Expand Down
Loading

0 comments on commit f0a12d5

Please sign in to comment.