diff --git a/Cargo.lock b/Cargo.lock index c4d3e9813a402d..b49a4b7752f89d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -688,6 +688,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derivative" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb582b60359da160a9477ee80f15c8d784c477e69c217ef2cdd4169c24ea380f" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.6", + "syn 1.0.27", +] + [[package]] name = "dialoguer" version = "0.6.2" @@ -1230,7 +1241,7 @@ dependencies = [ "log 0.4.8", "slab", "tokio 0.2.22", - "tokio-util", + "tokio-util 0.3.1", ] [[package]] @@ -3245,6 +3256,45 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-banks-client" +version = "1.3.5" +dependencies = [ + "async-trait", + "bincode", + "futures 0.3.5", + "solana-banks-interface", + "solana-banks-server", + "solana-runtime", + "solana-sdk 1.3.5", + "tarpc", + "tokio 0.2.22", + "tokio-serde", +] + +[[package]] +name = "solana-banks-interface" +version = "1.3.5" +dependencies = [ + "serde", + "solana-sdk 1.3.5", + "tarpc", +] + +[[package]] +name = "solana-banks-server" +version = "1.3.5" +dependencies = [ + "bincode", + "futures 0.3.5", + "solana-banks-interface", + "solana-runtime", + "solana-sdk 1.3.5", + "tarpc", + "tokio 0.2.22", + "tokio-serde", +] + [[package]] name = "solana-bench-exchange" version = "1.3.5" @@ -3496,6 +3546,7 @@ dependencies = [ "serial_test", "serial_test_derive", "solana-account-decoder", + "solana-banks-server", "solana-bpf-loader-program", "solana-budget-program", "solana-clap-utils", @@ -4755,6 +4806,36 @@ dependencies = [ "xattr", ] +[[package]] +name = "tarpc" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7262a81ff505d04617aabee6f3e416eafd4d67f856832196c221ffd434efda47" +dependencies = [ + "fnv", + "futures 0.3.5", + "humantime 1.3.0", + "log 0.4.8", + "pin-project", + "rand 0.7.3", + "serde", + "tarpc-plugins", + "tokio 0.2.22", + "tokio-serde", + "tokio-util 0.2.0", +] + +[[package]] +name = "tarpc-plugins" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edbaf92ceea0a2ab555bea18a47a891e46ba2d6f930ec9506771662f4ab82bb7" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.6", + "syn 1.0.27", +] + [[package]] name = "tempdir" version = "0.3.7" @@ -5081,6 +5162,20 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-serde" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebdd897b01021779294eb09bb3b52b6e11b0747f9f7e333a84bef532b656de99" +dependencies = [ + "bincode", + "bytes 0.5.4", + "derivative", + "futures 0.3.5", + "pin-project", + "serde", +] + [[package]] name = "tokio-sync" version = "0.1.8" @@ -5188,6 +5283,20 @@ dependencies = [ "tokio-reactor", ] +[[package]] +name = "tokio-util" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" +dependencies = [ + "bytes 0.5.4", + "futures-core", + "futures-sink", + "log 0.4.8", + "pin-project-lite", + "tokio 0.2.22", +] + [[package]] name = "tokio-util" version = "0.3.1" @@ -5232,7 +5341,7 @@ dependencies = [ "prost-derive", "tokio 0.2.22", "tokio-rustls 0.14.0", - "tokio-util", + "tokio-util 0.3.1", "tower", "tower-balance", "tower-load", diff --git a/Cargo.toml b/Cargo.toml index 181683eeb1093b..5e4ce8cfeee9ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,9 @@ members = [ "bench-tps", "accounts-bench", "banking-bench", + "banks-client", + "banks-interface", + "banks-server", "clap-utils", "cli-config", "client", diff --git a/banks-client/Cargo.toml b/banks-client/Cargo.toml new file mode 100644 index 00000000000000..c8414504233fcf --- /dev/null +++ b/banks-client/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "solana-banks-client" +version = "1.3.5" +description = "Solana banks client" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +async-trait = "0.1.36" +bincode = "1.3.1" +futures = "0.3" +solana-banks-interface = { path = "../banks-interface", version = "1.3.5" } +solana-sdk = { path = "../sdk", version = "1.3.5" } +tarpc = { version = "0.21.0", features = ["full"] } +tokio = "0.2" +tokio-serde = { version = "0.6", features = ["bincode"] } + +[dev-dependencies] +solana-runtime = { path = "../runtime", version = "1.3.5" } +solana-banks-server = { path = "../banks-server", version = "1.3.5" } + +[lib] +crate-type = ["lib"] +name = "solana_banks_client" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/banks-client/src/lib.rs b/banks-client/src/lib.rs new file mode 100644 index 00000000000000..1ded4dfa43d2f8 --- /dev/null +++ b/banks-client/src/lib.rs @@ -0,0 +1,283 @@ +//! A client for the ledger state, from the perspective of an arbitrary validator. +//! +//! Use start_tcp_client() to create a client and then import BanksClientExt to +//! access its methods. Additional "*_with_context" methods are also available, +//! but they are undocumented, may change over time, and are generally more +//! cumbersome to use. + +use async_trait::async_trait; +use futures::future::join_all; +pub use solana_banks_interface::{BanksClient, TransactionStatus}; +use solana_banks_interface::{BanksRequest, BanksResponse}; +use solana_sdk::{ + account::Account, clock::Slot, commitment_config::CommitmentLevel, + fee_calculator::FeeCalculator, hash::Hash, pubkey::Pubkey, signature::Signature, + transaction::Transaction, transport, +}; +use std::io::{self, Error, ErrorKind}; +use tarpc::{ + client, context, + rpc::{transport::channel::UnboundedChannel, ClientMessage, Response}, + serde_transport::tcp, +}; +use tokio::{net::ToSocketAddrs, time::Duration}; +use tokio_serde::formats::Bincode; + +#[async_trait] +pub trait BanksClientExt { + /// Send a transaction and return immediately. The server will resend the + /// transaction until either it is accepted by the cluster or the transaction's + /// blockhash expires. + async fn send_transaction(&mut self, transaction: Transaction) -> io::Result<()>; + + /// Return a recent, rooted blockhash from the server. The cluster will only accept + /// transactions with a blockhash that has not yet expired. Use the `get_fees` + /// method to get both a blockhash and the blockhash's last valid slot. + async fn get_recent_blockhash(&mut self) -> io::Result; + + /// Return the fee parameters associated with a recent, rooted blockhash. The cluster + /// will use the transaction's blockhash to look up these same fee parameters and + /// use them to calculate the transaction fee. + async fn get_fees(&mut self) -> io::Result<(FeeCalculator, Hash, Slot)>; + + /// Send a transaction and return after the transaction has been rejected or + /// reached the given level of commitment. + async fn process_transaction_with_commitment( + &mut self, + transaction: Transaction, + commitment: CommitmentLevel, + ) -> transport::Result<()>; + + /// Send a transaction and return after the transaction has been finalized or rejected. + async fn process_transaction(&mut self, transaction: Transaction) -> transport::Result<()>; + + /// Return the status of a transaction with a signature matching the transaction's first + /// signature. Return None if the transaction is not found, which may be because the + /// blockhash was expired or the fee-paying account had insufficient funds to pay the + /// transaction fee. Note that servers rarely store the full transaction history. This + /// method may return None if the transaction status has been discarded. + async fn get_transaction_status( + &mut self, + signature: Signature, + ) -> io::Result>; + + /// Same as get_transaction_status, but for multiple transactions. + async fn get_transaction_statuses( + &mut self, + signatures: Vec, + ) -> io::Result>>; + + /// Return the most recent rooted slot height. All transactions at or below this height + /// are said to be finalized. The cluster will not fork to a higher slot height. + async fn get_root_slot(&mut self) -> io::Result; + + /// Return the account at the given address at the time of the most recent root slot. + /// If the account is not found, None is returned. + async fn get_account(&mut self, address: Pubkey) -> io::Result>; + + /// Return the balance in lamports of an account at the given address at the slot + /// corresponding to the given commitment level. + async fn get_balance_with_commitment( + &mut self, + address: Pubkey, + commitment: CommitmentLevel, + ) -> io::Result; + + /// Return the balance in lamports of an account at the given address at the time + /// of the most recent root slot. + async fn get_balance(&mut self, address: Pubkey) -> io::Result; +} + +#[async_trait] +impl BanksClientExt for BanksClient { + async fn send_transaction(&mut self, transaction: Transaction) -> io::Result<()> { + self.send_transaction_with_context(context::current(), transaction) + .await + } + + async fn get_fees(&mut self) -> io::Result<(FeeCalculator, Hash, Slot)> { + self.get_fees_with_commitment_and_context(context::current(), CommitmentLevel::Root) + .await + } + + async fn get_recent_blockhash(&mut self) -> io::Result { + Ok(self.get_fees().await?.1) + } + + async fn process_transaction_with_commitment( + &mut self, + transaction: Transaction, + commitment: CommitmentLevel, + ) -> transport::Result<()> { + let mut ctx = context::current(); + ctx.deadline += Duration::from_secs(50); + let result = self + .process_transaction_with_commitment_and_context(ctx, transaction, commitment) + .await?; + match result { + None => Err(Error::new(ErrorKind::TimedOut, "invalid blockhash or fee-payer").into()), + Some(transaction_result) => Ok(transaction_result?), + } + } + + async fn process_transaction(&mut self, transaction: Transaction) -> transport::Result<()> { + self.process_transaction_with_commitment(transaction, CommitmentLevel::default()) + .await + } + + async fn get_root_slot(&mut self) -> io::Result { + self.get_slot_with_context(context::current(), CommitmentLevel::Root) + .await + } + + async fn get_account(&mut self, address: Pubkey) -> io::Result> { + self.get_account_with_commitment_and_context( + context::current(), + address, + CommitmentLevel::default(), + ) + .await + } + + async fn get_balance_with_commitment( + &mut self, + address: Pubkey, + commitment: CommitmentLevel, + ) -> io::Result { + let account = self + .get_account_with_commitment_and_context(context::current(), address, commitment) + .await?; + Ok(account.map(|x| x.lamports).unwrap_or(0)) + } + + async fn get_balance(&mut self, address: Pubkey) -> io::Result { + self.get_balance_with_commitment(address, CommitmentLevel::default()) + .await + } + + async fn get_transaction_status( + &mut self, + signature: Signature, + ) -> io::Result> { + self.get_transaction_status_with_context(context::current(), signature) + .await + } + + async fn get_transaction_statuses( + &mut self, + signatures: Vec, + ) -> io::Result>> { + // tarpc futures oddly hold a mutable reference back to the client so clone the client upfront + let mut clients_and_signatures: Vec<_> = signatures + .into_iter() + .map(|signature| (self.clone(), signature)) + .collect(); + + let futs = clients_and_signatures + .iter_mut() + .map(|(client, signature)| client.get_transaction_status(*signature)); + + let statuses = join_all(futs).await; + + // Convert Vec> to Result> + statuses.into_iter().collect() + } +} + +pub async fn start_client( + transport: UnboundedChannel, ClientMessage>, +) -> io::Result { + BanksClient::new(client::Config::default(), transport).spawn() +} + +pub async fn start_tcp_client(addr: T) -> io::Result { + let transport = tcp::connect(addr, Bincode::default()).await?; + BanksClient::new(client::Config::default(), transport).spawn() +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_banks_server::banks_server::start_local_server; + use solana_runtime::{bank::Bank, bank_forks::BankForks, genesis_utils::create_genesis_config}; + use solana_sdk::{message::Message, pubkey::Pubkey, signature::Signer, system_instruction}; + use std::sync::{Arc, RwLock}; + use tarpc::transport; + use tokio::{runtime::Runtime, time::delay_for}; + + #[test] + fn test_banks_client_new() { + let (client_transport, _server_transport) = transport::channel::unbounded(); + BanksClient::new(client::Config::default(), client_transport); + } + + #[test] + fn test_banks_server_transfer_via_server() -> io::Result<()> { + // This test shows the preferred way to interact with BanksServer. + // It creates a runtime explicitly (no globals via tokio macros) and calls + // `runtime.block_on()` just once, to run all the async code. + + let genesis = create_genesis_config(10); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new( + &genesis.genesis_config, + )))); + + let bob_pubkey = Pubkey::new_rand(); + let mint_pubkey = genesis.mint_keypair.pubkey(); + let instruction = system_instruction::transfer(&mint_pubkey, &bob_pubkey, 1); + let message = Message::new(&[instruction], Some(&mint_pubkey)); + + Runtime::new()?.block_on(async { + let client_transport = start_local_server(&bank_forks).await; + let mut banks_client = + BanksClient::new(client::Config::default(), client_transport).spawn()?; + + let recent_blockhash = banks_client.get_recent_blockhash().await?; + let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash); + banks_client.process_transaction(transaction).await.unwrap(); + assert_eq!(banks_client.get_balance(bob_pubkey).await?, 1); + Ok(()) + }) + } + + #[test] + fn test_banks_server_transfer_via_client() -> io::Result<()> { + // The caller may not want to hold the connection open until the transaction + // is processed (or blockhash expires). In this test, we verify the + // server-side functionality is available to the client. + + let genesis = create_genesis_config(10); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new( + &genesis.genesis_config, + )))); + + let mint_pubkey = &genesis.mint_keypair.pubkey(); + let bob_pubkey = Pubkey::new_rand(); + let instruction = system_instruction::transfer(&mint_pubkey, &bob_pubkey, 1); + let message = Message::new(&[instruction], Some(&mint_pubkey)); + + Runtime::new()?.block_on(async { + let client_transport = start_local_server(&bank_forks).await; + let mut banks_client = + BanksClient::new(client::Config::default(), client_transport).spawn()?; + let (_, recent_blockhash, last_valid_slot) = banks_client.get_fees().await?; + let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash); + let signature = transaction.signatures[0]; + banks_client.send_transaction(transaction).await?; + + let mut status = banks_client.get_transaction_status(signature).await?; + + while status.is_none() { + let root_slot = banks_client.get_root_slot().await?; + if root_slot > last_valid_slot { + break; + } + delay_for(Duration::from_millis(100)).await; + status = banks_client.get_transaction_status(signature).await?; + } + assert!(status.unwrap().err.is_none()); + assert_eq!(banks_client.get_balance(bob_pubkey).await?, 1); + Ok(()) + }) + } +} diff --git a/banks-interface/Cargo.toml b/banks-interface/Cargo.toml new file mode 100644 index 00000000000000..c4b77780d12f2a --- /dev/null +++ b/banks-interface/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "solana-banks-interface" +version = "1.3.5" +description = "Solana banks RPC interface" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +serde = { version = "1.0.112", features = ["derive"] } +solana-sdk = { path = "../sdk", version = "1.3.5" } +tarpc = { version = "0.21.0", features = ["full"] } + +[lib] +crate-type = ["lib"] +name = "solana_banks_interface" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/banks-interface/src/lib.rs b/banks-interface/src/lib.rs new file mode 100644 index 00000000000000..46640e91e538ae --- /dev/null +++ b/banks-interface/src/lib.rs @@ -0,0 +1,49 @@ +use serde::{Deserialize, Serialize}; +use solana_sdk::{ + account::Account, + clock::Slot, + commitment_config::CommitmentLevel, + fee_calculator::FeeCalculator, + hash::Hash, + pubkey::Pubkey, + signature::Signature, + transaction::{self, Transaction, TransactionError}, +}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct TransactionStatus { + pub slot: Slot, + pub confirmations: Option, // None = rooted + pub err: Option, +} + +#[tarpc::service] +pub trait Banks { + async fn send_transaction_with_context(transaction: Transaction); + async fn get_fees_with_commitment_and_context( + commitment: CommitmentLevel, + ) -> (FeeCalculator, Hash, Slot); + async fn get_transaction_status_with_context(signature: Signature) + -> Option; + async fn get_slot_with_context(commitment: CommitmentLevel) -> Slot; + async fn process_transaction_with_commitment_and_context( + transaction: Transaction, + commitment: CommitmentLevel, + ) -> Option>; + async fn get_account_with_commitment_and_context( + address: Pubkey, + commitment: CommitmentLevel, + ) -> Option; +} + +#[cfg(test)] +mod tests { + use super::*; + use tarpc::{client, transport}; + + #[test] + fn test_banks_client_new() { + let (client_transport, _server_transport) = transport::channel::unbounded(); + BanksClient::new(client::Config::default(), client_transport); + } +} diff --git a/banks-server/Cargo.toml b/banks-server/Cargo.toml new file mode 100644 index 00000000000000..075641e9a7028d --- /dev/null +++ b/banks-server/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "solana-banks-server" +version = "1.3.5" +description = "Solana banks server" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +bincode = "1.3.1" +futures = "0.3" +solana-banks-interface = { path = "../banks-interface", version = "1.3.5" } +solana-runtime = { path = "../runtime", version = "1.3.5" } +solana-sdk = { path = "../sdk", version = "1.3.5" } +tarpc = { version = "0.21.0", features = ["full"] } +tokio = "0.2" +tokio-serde = { version = "0.6", features = ["bincode"] } + +[lib] +crate-type = ["lib"] +name = "solana_banks_server" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs new file mode 100644 index 00000000000000..912e0813dcb98f --- /dev/null +++ b/banks-server/src/banks_server.rs @@ -0,0 +1,272 @@ +use bincode::{deserialize, serialize}; +use futures::{ + future, + prelude::stream::{self, StreamExt}, +}; +use solana_banks_interface::{Banks, BanksRequest, BanksResponse, TransactionStatus}; +use solana_runtime::{ + bank::Bank, + bank_forks::BankForks, + commitment::{BlockCommitmentCache, CommitmentSlots}, + send_transaction_service::{SendTransactionService, TransactionInfo}, +}; +use solana_sdk::{ + account::Account, + clock::Slot, + commitment_config::CommitmentLevel, + fee_calculator::FeeCalculator, + hash::Hash, + pubkey::Pubkey, + signature::Signature, + transaction::{self, Transaction}, +}; +use std::{ + collections::HashMap, + io, + net::SocketAddr, + sync::{ + atomic::AtomicBool, + mpsc::{channel, Receiver, Sender}, + Arc, RwLock, + }, + thread::Builder, + time::Duration, +}; +use tarpc::{ + context::Context, + rpc::{transport::channel::UnboundedChannel, ClientMessage, Response}, + serde_transport::tcp, + server::{self, Channel, Handler}, + transport, +}; +use tokio::time::delay_for; +use tokio_serde::formats::Bincode; + +#[derive(Clone)] +struct BanksServer { + bank_forks: Arc>, + block_commitment_cache: Arc>, + transaction_sender: Sender, +} + +impl BanksServer { + /// Return a BanksServer that forwards transactions to the + /// given sender. If unit-testing, those transactions can go to + /// a bank in the given BankForks. Otherwise, the receiver should + /// forward them to a validator in the leader schedule. + fn new( + bank_forks: Arc>, + block_commitment_cache: Arc>, + transaction_sender: Sender, + ) -> Self { + Self { + bank_forks, + block_commitment_cache, + transaction_sender, + } + } + + fn run(bank: &Bank, transaction_receiver: Receiver) { + while let Ok(info) = transaction_receiver.recv() { + let mut transaction_infos = vec![info]; + while let Ok(info) = transaction_receiver.try_recv() { + transaction_infos.push(info); + } + let transactions: Vec<_> = transaction_infos + .into_iter() + .map(|info| deserialize(&info.wire_transaction).unwrap()) + .collect(); + let _ = bank.process_transactions(&transactions); + } + } + + /// Useful for unit-testing + fn new_loopback(bank_forks: Arc>) -> Self { + let (transaction_sender, transaction_receiver) = channel(); + let bank = bank_forks.read().unwrap().working_bank(); + let slot = bank.slot(); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new( + HashMap::default(), + 0, + CommitmentSlots { + slot, + root: 0, + highest_confirmed_slot: 0, + highest_confirmed_root: 0, + }, + ))); + Builder::new() + .name("solana-bank-forks-client".to_string()) + .spawn(move || Self::run(&bank, transaction_receiver)) + .unwrap(); + Self::new(bank_forks, block_commitment_cache, transaction_sender) + } + + fn slot(&self, commitment: CommitmentLevel) -> Slot { + self.block_commitment_cache + .read() + .unwrap() + .slot_with_commitment(commitment) + } + + fn bank(&self, commitment: CommitmentLevel) -> Arc { + self.bank_forks.read().unwrap()[self.slot(commitment)].clone() + } + + async fn poll_signature_status( + self, + signature: Signature, + last_valid_slot: Slot, + commitment: CommitmentLevel, + ) -> Option> { + let mut status = self.bank(commitment).get_signature_status(&signature); + while status.is_none() { + delay_for(Duration::from_millis(200)).await; + let bank = self.bank(commitment); + if bank.slot() > last_valid_slot { + break; + } + status = bank.get_signature_status(&signature); + } + status + } +} + +#[tarpc::server] +impl Banks for BanksServer { + async fn send_transaction_with_context(self, _: Context, transaction: Transaction) { + let blockhash = &transaction.message.recent_blockhash; + let last_valid_slot = self + .bank_forks + .read() + .unwrap() + .root_bank() + .get_blockhash_last_valid_slot(&blockhash) + .unwrap(); + let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); + let info = + TransactionInfo::new(signature, serialize(&transaction).unwrap(), last_valid_slot); + self.transaction_sender.send(info).unwrap(); + } + + async fn get_fees_with_commitment_and_context( + self, + _: Context, + commitment: CommitmentLevel, + ) -> (FeeCalculator, Hash, Slot) { + let bank = self.bank(commitment); + let (blockhash, fee_calculator) = bank.last_blockhash_with_fee_calculator(); + let last_valid_slot = bank.get_blockhash_last_valid_slot(&blockhash).unwrap(); + (fee_calculator, blockhash, last_valid_slot) + } + + async fn get_transaction_status_with_context( + self, + _: Context, + signature: Signature, + ) -> Option { + let bank = self.bank(CommitmentLevel::Recent); + let (slot, status) = bank.get_signature_status_slot(&signature)?; + let r_block_commitment_cache = self.block_commitment_cache.read().unwrap(); + + let confirmations = if r_block_commitment_cache.root() >= slot { + None + } else { + r_block_commitment_cache + .get_confirmation_count(slot) + .or(Some(0)) + }; + Some(TransactionStatus { + slot, + confirmations, + err: status.err(), + }) + } + + async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot { + self.slot(commitment) + } + + async fn process_transaction_with_commitment_and_context( + self, + _: Context, + transaction: Transaction, + commitment: CommitmentLevel, + ) -> Option> { + let blockhash = &transaction.message.recent_blockhash; + let last_valid_slot = self + .bank_forks + .read() + .unwrap() + .root_bank() + .get_blockhash_last_valid_slot(&blockhash) + .unwrap(); + let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); + let info = + TransactionInfo::new(signature, serialize(&transaction).unwrap(), last_valid_slot); + self.transaction_sender.send(info).unwrap(); + self.poll_signature_status(signature, last_valid_slot, commitment) + .await + } + + async fn get_account_with_commitment_and_context( + self, + _: Context, + address: Pubkey, + commitment: CommitmentLevel, + ) -> Option { + let bank = self.bank(commitment); + bank.get_account(&address) + } +} + +pub async fn start_local_server( + bank_forks: &Arc>, +) -> UnboundedChannel, ClientMessage> { + let banks_server = BanksServer::new_loopback(bank_forks.clone()); + let (client_transport, server_transport) = transport::channel::unbounded(); + let server = server::new(server::Config::default()) + .incoming(stream::once(future::ready(server_transport))) + .respond_with(banks_server.serve()); + tokio::spawn(server); + client_transport +} + +pub async fn start_tcp_server( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, + block_commitment_cache: Arc>, +) -> io::Result<()> { + // Note: These settings are copied straight from the tarpc example. + let server = tcp::listen(listen_addr, Bincode::default) + .await? + // Ignore accept errors. + .filter_map(|r| future::ready(r.ok())) + .map(server::BaseChannel::with_defaults) + // Limit channels to 1 per IP. + .max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip()) + // serve is generated by the service attribute. It takes as input any type implementing + // the generated Banks trait. + .map(move |chan| { + let (sender, receiver) = channel(); + let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); + + SendTransactionService::new( + tpu_addr, + &bank_forks, + &exit_send_transaction_service, + receiver, + ); + + let server = + BanksServer::new(bank_forks.clone(), block_commitment_cache.clone(), sender); + chan.respond_with(server.serve()).execute() + }) + // Max 10 channels. + .buffer_unordered(10) + .for_each(|_| async {}); + + server.await; + Ok(()) +} diff --git a/banks-server/src/lib.rs b/banks-server/src/lib.rs new file mode 100644 index 00000000000000..a9acc11e5baaba --- /dev/null +++ b/banks-server/src/lib.rs @@ -0,0 +1,2 @@ +pub mod banks_server; +pub mod rpc_banks_service; diff --git a/banks-server/src/rpc_banks_service.rs b/banks-server/src/rpc_banks_service.rs new file mode 100644 index 00000000000000..541133e64a2260 --- /dev/null +++ b/banks-server/src/rpc_banks_service.rs @@ -0,0 +1,116 @@ +//! The `rpc_banks_service` module implements the Solana Banks RPC API. + +use crate::banks_server::start_tcp_server; +use futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select}; +use solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, +}; +use tokio::{ + runtime::Runtime, + time::{self, Duration}, +}; + +pub struct RpcBanksService { + thread_hdl: JoinHandle<()>, +} + +/// Run the TCP service until `exit` is set to true +async fn start_abortable_tcp_server( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, + block_commitment_cache: Arc>, + exit: Arc, +) { + let server = start_tcp_server( + listen_addr, + tpu_addr, + bank_forks.clone(), + block_commitment_cache.clone(), + ) + .fuse(); + let interval = time::interval(Duration::from_millis(100)).fuse(); + pin_mut!(server, interval); + loop { + select! { + _ = server => {}, + _ = interval.select_next_some() => { + if exit.load(Ordering::Relaxed) { + break; + } + } + } + } +} + +impl RpcBanksService { + fn run( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, + block_commitment_cache: Arc>, + exit: Arc, + ) { + let server = start_abortable_tcp_server( + listen_addr, + tpu_addr, + bank_forks, + block_commitment_cache, + exit, + ); + Runtime::new().unwrap().block_on(server); + } + + pub fn new( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: &Arc>, + block_commitment_cache: &Arc>, + exit: &Arc, + ) -> Self { + let bank_forks = bank_forks.clone(); + let block_commitment_cache = block_commitment_cache.clone(); + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-rpc-banks".to_string()) + .spawn(move || { + Self::run( + listen_addr, + tpu_addr, + bank_forks, + block_commitment_cache, + exit, + ) + }) + .unwrap(); + + Self { thread_hdl } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_runtime::bank::Bank; + + #[test] + fn test_rpc_banks_server_exit() { + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::default()))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let exit = Arc::new(AtomicBool::new(false)); + let addr = "127.0.0.1:0".parse().unwrap(); + let service = RpcBanksService::new(addr, addr, &bank_forks, &block_commitment_cache, &exit); + exit.store(true, Ordering::Relaxed); + service.join().unwrap(); + } +} diff --git a/cli-config/src/config.rs b/cli-config/src/config.rs index f766f1dec6b246..c65d087ed41851 100644 --- a/cli-config/src/config.rs +++ b/cli-config/src/config.rs @@ -76,6 +76,17 @@ impl Config { ws_url.to_string() } + pub fn compute_rpc_banks_url(json_rpc_url: &str) -> String { + let json_rpc_url: Option = json_rpc_url.parse().ok(); + if json_rpc_url.is_none() { + return "".to_string(); + } + let mut url = json_rpc_url.unwrap(); + let port = url.port_or_known_default().unwrap_or(80); + url.set_port(Some(port + 2)).expect("unable to set port"); + url.to_string() + } + pub fn import_address_labels

(&mut self, filename: P) -> Result<(), io::Error> where P: AsRef, @@ -122,4 +133,28 @@ mod test { assert_eq!(Config::compute_websocket_url(&"garbage"), String::new()); } + + #[test] + fn compute_rpc_banks_url() { + assert_eq!( + Config::compute_rpc_banks_url(&"http://devnet.solana.com"), + "http://devnet.solana.com:82/".to_string() + ); + + assert_eq!( + Config::compute_rpc_banks_url(&"https://devnet.solana.com"), + "https://devnet.solana.com:445/".to_string() + ); + + assert_eq!( + Config::compute_rpc_banks_url(&"http://example.com:8899"), + "http://example.com:8901/".to_string() + ); + assert_eq!( + Config::compute_rpc_banks_url(&"https://example.com:1234"), + "https://example.com:1236/".to_string() + ); + + assert_eq!(Config::compute_rpc_banks_url(&"garbage"), String::new()); + } } diff --git a/core/Cargo.toml b/core/Cargo.toml index 368e3f2ad8c580..3eee7246d62a1b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -44,6 +44,7 @@ serde = "1.0.112" serde_derive = "1.0.103" serde_json = "1.0.56" solana-account-decoder = { path = "../account-decoder", version = "1.3.5" } +solana-banks-server = { path = "../banks-server", version = "1.3.5" } solana-bpf-loader-program = { path = "../programs/bpf_loader", version = "1.3.5" } solana-budget-program = { path = "../programs/budget", version = "1.3.5" } solana-clap-utils = { path = "../clap-utils", version = "1.3.5" } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e81d1a651fd0a8..777a8a2506f64c 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -358,7 +358,7 @@ pub fn make_accounts_hashes_message( } // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "6qRS1ZwydpdSqzeyRdDvn5uwfDdFYkuUz4K4jSkd1oFW")] +#[frozen_abi(digest = "CnN1gW2K2TRydGc84eYnQJwdTADPjQf6LJLZ4RP1QeoH")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] enum Protocol { @@ -558,7 +558,7 @@ impl ClusterInfo { } let ip_addr = node.gossip.ip(); Some(format!( - "{:15} {:2}| {:5} | {:44} |{:^15}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", + "{:15} {:2}| {:5} | {:44} |{:^15}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", if ContactInfo::is_valid_address(&node.gossip) { ip_addr.to_string() } else { @@ -581,6 +581,7 @@ impl ClusterInfo { addr_to_string(&ip_addr, &node.serve_repair), addr_to_string(&ip_addr, &node.rpc), addr_to_string(&ip_addr, &node.rpc_pubsub), + addr_to_string(&ip_addr, &node.rpc_banks), node.shred_version, )) } @@ -2453,10 +2454,12 @@ impl Node { let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); let rpc_pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port); + let rpc_banks_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); + let rpc_banks_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_banks_port); let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let unused = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let info = ContactInfo { id: *pubkey, @@ -2466,7 +2469,7 @@ impl Node { repair: repair.local_addr().unwrap(), tpu: tpu.local_addr().unwrap(), tpu_forwards: tpu_forwards.local_addr().unwrap(), - unused: unused.local_addr().unwrap(), + rpc_banks: rpc_banks_addr, rpc: rpc_addr, rpc_pubsub: rpc_pubsub_addr, serve_repair: serve_repair.local_addr().unwrap(), @@ -2547,7 +2550,7 @@ impl Node { repair: SocketAddr::new(gossip_addr.ip(), repair_port), tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - unused: socketaddr_any!(), + rpc_banks: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 1d75a934b6f418..fce883008fecad 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -25,8 +25,8 @@ pub struct ContactInfo { pub tpu: SocketAddr, /// address to forward unprocessed transactions to pub tpu_forwards: SocketAddr, - /// unused address - pub unused: SocketAddr, + /// address to which to send bank state requests + pub rpc_banks: SocketAddr, /// address to which to send JSON-RPC requests pub rpc: SocketAddr, /// websocket for JSON-RPC push notifications @@ -95,7 +95,7 @@ impl Default for ContactInfo { repair: socketaddr_any!(), tpu: socketaddr_any!(), tpu_forwards: socketaddr_any!(), - unused: socketaddr_any!(), + rpc_banks: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: socketaddr_any!(), @@ -115,7 +115,7 @@ impl ContactInfo { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + rpc_banks: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -137,7 +137,7 @@ impl ContactInfo { repair: addr, tpu: addr, tpu_forwards: addr, - unused: addr, + rpc_banks: addr, rpc: addr, rpc_pubsub: addr, serve_repair: addr, @@ -162,6 +162,7 @@ impl ContactInfo { let repair = next_port(&bind_addr, 5); let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); + let rpc_banks = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_BANKS_PORT); let serve_repair = next_port(&bind_addr, 6); Self { id: *pubkey, @@ -171,7 +172,7 @@ impl ContactInfo { repair, tpu, tpu_forwards, - unused: "0.0.0.0:0".parse().unwrap(), + rpc_banks, rpc, rpc_pubsub, serve_repair, @@ -248,7 +249,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.rpc_banks.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -260,7 +261,7 @@ mod tests { assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); - assert!(ci.unused.ip().is_multicast()); + assert!(ci.rpc_banks.ip().is_multicast()); assert!(ci.serve_repair.ip().is_multicast()); } #[test] @@ -273,7 +274,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.rpc_banks.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -286,7 +287,7 @@ mod tests { assert_eq!(ci.tpu_forwards.port(), 13); assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); - assert!(ci.unused.ip().is_unspecified()); + assert_eq!(ci.rpc_banks.port(), rpc_port::DEFAULT_RPC_BANKS_PORT); assert_eq!(ci.serve_repair.port(), 16); } @@ -310,6 +311,10 @@ mod tests { d1.rpc_pubsub, socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PUBSUB_PORT)) ); + assert_eq!( + d1.rpc_banks, + socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_BANKS_PORT)) + ); assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238")); assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239")); assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240")); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 636c72eb2c3020..57f5f7f09fb40a 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -662,7 +662,7 @@ mod tests { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + rpc_banks: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -745,7 +745,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + rpc_banks: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr, @@ -773,7 +773,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + rpc_banks: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr2, diff --git a/core/src/validator.rs b/core/src/validator.rs index df87bd8e6a5a9d..a10abae721465e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -23,6 +23,7 @@ use crate::{ }; use crossbeam_channel::unbounded; use rand::{thread_rng, Rng}; +use solana_banks_server::rpc_banks_service::RpcBanksService; use solana_ledger::{ bank_forks_utils, blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, @@ -72,7 +73,7 @@ pub struct ValidatorConfig { pub voting_disabled: bool, pub account_paths: Vec, pub rpc_config: JsonRpcConfig, - pub rpc_ports: Option<(u16, u16)>, // (API, PubSub) + pub rpc_ports: Option<(u16, u16, u16)>, // (JsonRpc, JsonRpcPubSub, Banks) pub snapshot_config: Option, pub max_ledger_shreds: Option, pub broadcast_stage_type: BroadcastStageType, @@ -148,7 +149,7 @@ struct TransactionHistoryServices { pub struct Validator { pub id: Pubkey, validator_exit: Arc>>, - rpc_service: Option<(JsonRpcService, PubSubService)>, + rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>, transaction_status_service: Option, rewards_recorder_service: Option, gossip_service: GossipService, @@ -282,36 +283,47 @@ impl Validator { )); let rpc_override_health_check = Arc::new(AtomicBool::new(false)); - let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { - if ContactInfo::is_valid_address(&node.info.rpc) { - assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - assert_eq!(rpc_port, node.info.rpc.port()); - assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); - } else { - assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - } - ( - JsonRpcService::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), - config.rpc_config.clone(), - config.snapshot_config.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - genesis_config.hash(), - ledger_path, - validator_exit.clone(), - config.trusted_validators.clone(), - rpc_override_health_check.clone(), - ), - PubSubService::new( - &subscriptions, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), - &exit, - ), - ) - }); + let rpc_service = config + .rpc_ports + .map(|(rpc_port, rpc_pubsub_port, rpc_banks_port)| { + if ContactInfo::is_valid_address(&node.info.rpc) { + assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + assert_eq!(rpc_port, node.info.rpc.port()); + assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); + assert_eq!(rpc_banks_port, node.info.rpc_banks.port()); + } else { + assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + } + let tpu_address = cluster_info.my_contact_info().tpu; + ( + JsonRpcService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), + config.rpc_config.clone(), + config.snapshot_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + validator_exit.clone(), + config.trusted_validators.clone(), + rpc_override_health_check.clone(), + ), + PubSubService::new( + &subscriptions, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), + &exit, + ), + RpcBanksService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_banks_port), + tpu_address, + &bank_forks, + &block_commitment_cache, + &exit, + ), + ) + }); info!( "Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}", @@ -543,9 +555,10 @@ impl Validator { pub fn join(self) -> Result<()> { self.poh_service.join()?; drop(self.poh_recorder); - if let Some((rpc_service, rpc_pubsub_service)) = self.rpc_service { + if let Some((rpc_service, rpc_pubsub_service, rpc_banks_service)) = self.rpc_service { rpc_service.join()?; rpc_pubsub_service.join()?; + rpc_banks_service.join()?; } if let Some(transaction_status_service) = self.transaction_status_service { transaction_status_service.join()?; @@ -869,7 +882,11 @@ impl TestValidator { let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config); let config = ValidatorConfig { - rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port())), + rpc_ports: Some(( + node.info.rpc.port(), + node.info.rpc_pubsub.port(), + node.info.rpc_banks.port(), + )), ..ValidatorConfig::default() }; let node = Validator::new( @@ -1037,6 +1054,7 @@ mod tests { rpc_ports: Some(( validator_node.info.rpc.port(), validator_node.info.rpc_pubsub.port(), + validator_node.info.rpc_banks.port(), )), ..ValidatorConfig::default() }; @@ -1111,6 +1129,7 @@ mod tests { rpc_ports: Some(( validator_node.info.rpc.port(), validator_node.info.rpc_pubsub.port(), + validator_node.info.rpc_banks.port(), )), ..ValidatorConfig::default() }; diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index e299f08640b574..e2797cb155fe52 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -202,6 +202,7 @@ impl LocalCluster { leader_config.rpc_ports = Some(( leader_node.info.rpc.port(), leader_node.info.rpc_pubsub.port(), + leader_node.info.rpc_banks.port(), )); leader_config.account_paths = vec![leader_ledger_path.join("accounts")]; let leader_server = Validator::new( @@ -343,6 +344,7 @@ impl LocalCluster { config.rpc_ports = Some(( validator_node.info.rpc.port(), validator_node.info.rpc_pubsub.port(), + validator_node.info.rpc_banks.port(), )); config.account_paths = vec![ledger_path.join("accounts")]; let voting_keypair = voting_keypair.unwrap(); @@ -613,8 +615,11 @@ impl Cluster for LocalCluster { // Update the stored ContactInfo for this node let node = Node::new_localhost_with_pubkey(&pubkey); cluster_validator_info.info.contact_info = node.info.clone(); - cluster_validator_info.config.rpc_ports = - Some((node.info.rpc.port(), node.info.rpc_pubsub.port())); + cluster_validator_info.config.rpc_ports = Some(( + node.info.rpc.port(), + node.info.rpc_pubsub.port(), + node.info.rpc_banks.port(), + )); let entry_point_info = { if *pubkey == self.entry_point_info.id { diff --git a/runtime/src/send_transaction_service.rs b/runtime/src/send_transaction_service.rs index 0f24fe8a774745..bab219b0e4f7a4 100644 --- a/runtime/src/send_transaction_service.rs +++ b/runtime/src/send_transaction_service.rs @@ -22,9 +22,9 @@ pub struct SendTransactionService { } pub struct TransactionInfo { - signature: Signature, - wire_transaction: Vec, - last_valid_slot: Slot, + pub signature: Signature, + pub wire_transaction: Vec, + pub last_valid_slot: Slot, } impl TransactionInfo { diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index dd4a5818bf0ee2..d609148d5fb250 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -26,7 +26,7 @@ default = [ "serde_json", "ed25519-dalek", "solana-logger", - "solana-crate-features" + "solana-crate-features", ] [dependencies] diff --git a/sdk/src/commitment_config.rs b/sdk/src/commitment_config.rs index 4ab7b2ca9e7900..d0f67e9fb0f050 100644 --- a/sdk/src/commitment_config.rs +++ b/sdk/src/commitment_config.rs @@ -46,11 +46,27 @@ impl CommitmentConfig { #[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] #[serde(rename_all = "camelCase")] +/// An attribute of a slot. It describes how finalized a block is at some point in time. For example, a slot +/// is said to be at the max level immediately after the cluster recognizes the block at that slot as +/// finalized. When querying the ledger state, use lower levels of commitment to report progress and higher +/// levels to ensure state changes will not be rolled back. pub enum CommitmentLevel { + /// The highest slot having reached max vote lockout, as recognized by a supermajority of the cluster. Max, + + /// The highest slot of the heaviest fork. Ledger state at this slot is not derived from a finalized + /// block, but if multiple forks are present, is from the fork the validator believes is most likely + /// to finalize. Recent, + + /// The highest slot having reached max vote lockout. Root, + + /// The highest slot having reached 1 confirmation. Single, + + /// The highest slot having reached 1 confirmation via gossip votes; may occur before or after Single, + /// depending on gossip traffic. SingleGossip, } diff --git a/sdk/src/rpc_port.rs b/sdk/src/rpc_port.rs index 531be001b14003..cbdbcb5dce4787 100644 --- a/sdk/src/rpc_port.rs +++ b/sdk/src/rpc_port.rs @@ -3,3 +3,6 @@ pub const DEFAULT_RPC_PORT: u16 = 8899; /// Default port number for JSON RPC pubsub pub const DEFAULT_RPC_PUBSUB_PORT: u16 = 8900; + +/// Default port number for Banks RPC API +pub const DEFAULT_RPC_BANKS_PORT: u16 = 8901; diff --git a/validator/src/main.rs b/validator/src/main.rs index a401934b73e35b..1301f7fa6f189e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -599,7 +599,7 @@ pub fn main() { .value_name("PORT") .takes_value(true) .validator(port_validator) - .help("Use this port for JSON RPC, and the next port for the RPC websocket"), + .help("Use this port for JSON RPC, the next port for the RPC websocket, and the following for the RPC banks API"), ) .arg( Arg::with_name("private_rpc") @@ -960,7 +960,7 @@ pub fn main() { }, rpc_ports: value_t!(matches, "rpc_port", u16) .ok() - .map(|rpc_port| (rpc_port, rpc_port + 1)), + .map(|rpc_port| (rpc_port, rpc_port + 1, rpc_port + 2)), voting_disabled: matches.is_present("no_voting"), wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), trusted_validators, @@ -1178,9 +1178,10 @@ pub fn main() { ); if !private_rpc { - if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports { + if let Some((rpc_port, rpc_pubsub_port, rpc_banks_port)) = validator_config.rpc_ports { node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_port); node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub_port); + node.info.rpc_banks = SocketAddr::new(node.info.gossip.ip(), rpc_banks_port); } } @@ -1199,8 +1200,12 @@ pub fn main() { let mut tcp_listeners = vec![]; if !private_rpc { - if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports { - for (purpose, port) in &[("RPC", rpc_port), ("RPC pubsub", rpc_pubsub_port)] { + if let Some((rpc_port, rpc_pubsub_port, rpc_banks_port)) = validator_config.rpc_ports { + for (purpose, port) in &[ + ("RPC", rpc_port), + ("RPC pubsub", rpc_pubsub_port), + ("RPC banks", rpc_banks_port), + ] { tcp_listeners.push(( *port, TcpListener::bind(&SocketAddr::from((rpc_bind_address, *port)))