Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gixing websocket subscriptions for signatures and slot #403

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ fn create_grpc_multiplex_block_info_task(
.expect("block_time from geyser block meta")
.timestamp
as u64,
parent: block_meta.parent_slot,
};

let send_started_at = Instant::now();
Expand Down
1 change: 1 addition & 0 deletions cluster-endpoints/src/rpc_polling/poll_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,5 +324,6 @@ fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo {
blockhash: produced_block.blockhash,
commitment_config: produced_block.commitment_config,
block_time: produced_block.block_time,
parent: produced_block.parent_slot,
}
}
2 changes: 1 addition & 1 deletion core/src/commitment_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(C)]
pub enum Commitment {
Processed = 0,
Expand Down
54 changes: 22 additions & 32 deletions core/src/stores/subscription_store.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,38 @@
use crate::commitment_utils::Commitment;
use crate::{structures::produced_block::TransactionInfo, types::SubscptionHanderSink};
use dashmap::DashMap;
use solana_client::rpc_response::{ProcessedSignatureResult, RpcSignatureResult};
use solana_sdk::signature::Signature;
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
slot_history::Slot,
};
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;

#[derive(Clone, Default)]
pub struct SubscriptionStore {
pub signature_subscribers:
Arc<DashMap<(Signature, CommitmentConfig), (SubscptionHanderSink, Instant)>>,
Arc<DashMap<(Signature, Commitment), (SubscptionHanderSink, Instant)>>,
}

impl SubscriptionStore {
#[allow(deprecated)]
pub fn get_supported_commitment_config(
commitment_config: CommitmentConfig,
) -> CommitmentConfig {
match commitment_config.commitment {
CommitmentLevel::Finalized | CommitmentLevel::Root | CommitmentLevel::Max => {
CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}
}
_ => CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
},
}
}

pub fn signature_subscribe(
&self,
signature: Signature,
commitment_config: CommitmentConfig,
sink: SubscptionHanderSink,
) {
let commitment_config = Self::get_supported_commitment_config(commitment_config);
self.signature_subscribers
.insert((signature, commitment_config), (sink, Instant::now()));
self.signature_subscribers.insert(
(signature, Commitment::from(commitment_config)),
(sink, Instant::now()),
);
}

pub fn signature_un_subscribe(
&self,
signature: Signature,
commitment_config: CommitmentConfig,
) {
let commitment_config = Self::get_supported_commitment_config(commitment_config);
self.signature_subscribers
.remove(&(signature, commitment_config));
.remove(&(signature, Commitment::from(commitment_config)));
}

pub async fn notify(
Expand All @@ -58,13 +41,20 @@ impl SubscriptionStore {
transaction_info: &TransactionInfo,
commitment_config: CommitmentConfig,
) {
if let Some((_sig, (sink, _))) = self
.signature_subscribers
.remove(&(transaction_info.signature, commitment_config))
{
if let Some((_sig, (sink, _))) = self.signature_subscribers.remove(&(
transaction_info.signature,
Commitment::from(commitment_config),
)) {
let signature_result =
RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult {
err: transaction_info.err.clone(),
});
// none if transaction succeeded
sink.send(slot, serde_json::json!({ "err": transaction_info.err }))
.await;
sink.send(
slot,
serde_json::to_value(signature_result).expect("Should be serializable in json"),
)
.await;
}
}

Expand Down
1 change: 1 addition & 0 deletions core/src/structures/block_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use solana_sdk::hash::Hash;
#[derive(Clone, Debug)]
pub struct BlockInfo {
pub slot: u64,
pub parent: u64,
pub block_height: u64,
pub blockhash: Hash,
pub commitment_config: CommitmentConfig,
Expand Down
60 changes: 41 additions & 19 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use jsonrpsee::core::RpcResult;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_account_decoder::UiAccount;
use solana_lite_rpc_accounts::account_service::AccountService;
use solana_lite_rpc_core::encoding::{BASE58, BASE64};
use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService;
use solana_lite_rpc_prioritization_fees::prioritization_fee_calculation_method::PrioritizationFeeCalculationMethod;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcAccountInfoConfig;
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcSendTransactionConfig};
use solana_rpc_client_api::response::{OptionalContext, RpcKeyedAccount};
use solana_rpc_client_api::{
config::{
Expand All @@ -21,28 +22,27 @@ use solana_rpc_client_api::{
},
};
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::signature::Signature;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use solana_transaction_status::{
TransactionBinaryEncoding, TransactionStatus, UiConfirmedBlock, UiTransactionEncoding,
};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use solana_lite_rpc_blockstore::history::History;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::{
encoding,
stores::{block_information_store::BlockInformation, data_cache::DataCache},
use solana_lite_rpc_core::stores::{
block_information_store::BlockInformation, data_cache::DataCache,
};
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
};

use crate::rpc_errors::RpcErrors;
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
rpc::LiteRpcServer,
};
use crate::{configs::IsBlockHashValidConfig, rpc::LiteRpcServer};
use solana_lite_rpc_prioritization_fees::rpc_data::{AccountPrioFeesStats, PrioFeesStats};
use solana_lite_rpc_prioritization_fees::PrioFeesService;

Expand Down Expand Up @@ -347,37 +347,59 @@ impl LiteRpcServer for LiteBridge {
async fn send_transaction(
&self,
tx: String,
send_transaction_config: Option<SendTransactionConfig>,
send_transaction_config: Option<RpcSendTransactionConfig>,
) -> RpcResult<String> {
RPC_SEND_TX.inc();

// Copied these constants from solana labs code
const MAX_BASE58_SIZE: usize = 1683;
const MAX_BASE64_SIZE: usize = 1644;

let SendTransactionConfig {
let RpcSendTransactionConfig {
encoding,
max_retries,
..
} = send_transaction_config.unwrap_or_default();

let encoding = encoding.unwrap_or(UiTransactionEncoding::Base58);
let expected_size = match encoding {
encoding::BinaryEncoding::Base58 => MAX_BASE58_SIZE,
encoding::BinaryEncoding::Base64 => MAX_BASE64_SIZE,
UiTransactionEncoding::Base58 => MAX_BASE58_SIZE,
UiTransactionEncoding::Base64 => MAX_BASE64_SIZE,
_ => usize::MAX,
};
if tx.len() > expected_size {
return Err(jsonrpsee::types::error::ErrorCode::OversizedRequest.into());
}

let raw_tx = match encoding.decode(tx) {
Ok(raw_tx) => raw_tx,
Err(_) => {
return Err(jsonrpsee::types::error::ErrorCode::InvalidParams.into());
let binary_encoding = encoding
.into_binary_encoding()
.ok_or(jsonrpsee::types::error::ErrorCode::InvalidParams)?;

let wire_output = match binary_encoding {
TransactionBinaryEncoding::Base58 => {
if tx.len() > MAX_BASE58_SIZE {
return Err(jsonrpsee::types::error::ErrorCode::OversizedRequest.into());
}
BASE58
.decode(tx)
.map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?
}
TransactionBinaryEncoding::Base64 => {
if tx.len() > MAX_BASE64_SIZE {
return Err(jsonrpsee::types::error::ErrorCode::OversizedRequest.into());
}
BASE64
.decode(tx)
.map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?
}
};

if wire_output.len() > PACKET_DATA_SIZE {
return Err(jsonrpsee::types::error::ErrorCode::OversizedRequest.into());
}
let max_retries = max_retries.map(|x| x as u16);
match self
.transaction_service
.send_wire_transaction(raw_tx, max_retries)
.send_wire_transaction(wire_output, max_retries)
.await
{
Ok(sig) => {
Expand Down
29 changes: 16 additions & 13 deletions lite-rpc/src/bridge_pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_accounts::account_service::AccountService;
use solana_lite_rpc_core::{
commitment_utils::Commitment, stores::data_cache::DataCache,
structures::account_data::AccountNotificationMessage, types::BlockStream,
commitment_utils::Commitment,
stores::data_cache::DataCache,
structures::account_data::AccountNotificationMessage,
types::{BlockInfoStream, BlockStream},
};
use std::{str::FromStr, sync::Arc, time::Duration};
use tokio::sync::broadcast::error::RecvError::{Closed, Lagged};
Expand Down Expand Up @@ -48,7 +50,8 @@ pub struct LitePubSubBridge {
data_cache: DataCache,
prio_fees_service: PrioFeesService,
account_priofees_service: AccountPrioService,
block_stream: BlockStream,
_block_stream: BlockStream,
block_info_stream: BlockInfoStream,
accounts_service: Option<AccountService>,
}

Expand All @@ -58,13 +61,15 @@ impl LitePubSubBridge {
prio_fees_service: PrioFeesService,
account_priofees_service: AccountPrioService,
block_stream: BlockStream,
block_info_stream: BlockInfoStream,
accounts_service: Option<AccountService>,
) -> Self {
Self {
data_cache,
prio_fees_service,
account_priofees_service,
block_stream,
_block_stream: block_stream,
block_info_stream,
accounts_service,
}
}
Expand All @@ -74,17 +79,14 @@ impl LitePubSubBridge {
impl LiteRpcPubSubServer for LitePubSubBridge {
async fn slot_subscribe(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
let sink = pending.accept().await?;
let mut block_stream = self.block_stream.resubscribe();
let mut block_info_stream = self.block_info_stream.resubscribe();
tokio::spawn(async move {
loop {
match block_stream.recv().await {
Ok(produced_block) => {
if !produced_block.commitment_config.is_processed() {
continue;
}
match block_info_stream.recv().await {
Ok(block_info) => {
let slot_info = SlotInfo {
slot: produced_block.slot,
parent: produced_block.parent_slot,
slot: block_info.slot,
parent: block_info.parent,
root: 0,
};
let result_message = jsonrpsee::SubscriptionMessage::from_json(&slot_info);
Expand Down Expand Up @@ -137,10 +139,11 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
async fn signature_subscribe(
&self,
pending: PendingSubscriptionSink,
signature: Signature,
signature: String,
config: RpcSignatureSubscribeConfig,
) -> SubscriptionResult {
RPC_SIGNATURE_SUBSCRIBE.inc();
let signature = Signature::from_str(&signature)?;
let sink = pending.accept().await?;

let jsonrpsee_sink = JsonRpseeSubscriptionHandlerSink::new(sink);
Expand Down
1 change: 1 addition & 0 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
block_priofees_service,
account_priofees_service,
blocks_notifier,
blockinfo_notifier,
accounts_service.clone(),
);

Expand Down
6 changes: 3 additions & 3 deletions lite-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig};
use crate::configs::IsBlockHashValidConfig;
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use solana_account_decoder::UiAccount;
Expand All @@ -7,7 +7,7 @@ use solana_lite_rpc_prioritization_fees::rpc_data::{AccountPrioFeesStats, PrioFe
use solana_rpc_client_api::config::{
RpcAccountInfoConfig, RpcBlocksConfigWrapper, RpcContextConfig, RpcGetVoteAccountsConfig,
RpcLeaderScheduleConfig, RpcProgramAccountsConfig, RpcRequestAirdropConfig,
RpcSignatureStatusConfig, RpcSignaturesForAddressConfig,
RpcSendTransactionConfig, RpcSignatureStatusConfig, RpcSignaturesForAddressConfig,
};
use solana_rpc_client_api::response::{
OptionalContext, Response as RpcResponse, RpcBlockhash,
Expand Down Expand Up @@ -133,7 +133,7 @@ pub trait LiteRpc {
async fn send_transaction(
&self,
tx: String,
send_transaction_config: Option<SendTransactionConfig>,
send_transaction_config: Option<RpcSendTransactionConfig>,
) -> RpcResult<String>;

// ***********************
Expand Down
Loading
Loading