diff --git a/Cargo.lock b/Cargo.lock index 06b4a39cb..ab886db5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9980,6 +9980,7 @@ dependencies = [ "serde_with", "solana-client", "solana-clock", + "solana-reward-info", "solana-rpc-client-api", "solana-sdk", "solana-storage-proto", diff --git a/crates/extractors/solana-storage-proto/src/lib.rs b/crates/extractors/solana-storage-proto/src/lib.rs index bfe713c48..6ca12dec7 100644 --- a/crates/extractors/solana-storage-proto/src/lib.rs +++ b/crates/extractors/solana-storage-proto/src/lib.rs @@ -20,14 +20,14 @@ pub type StoredExtendedRewards = Vec; #[derive(Serialize, Deserialize)] pub struct StoredExtendedReward { - pubkey: String, - lamports: i64, + pub pubkey: String, + pub lamports: i64, #[serde(deserialize_with = "default_on_eof")] - post_balance: u64, + pub post_balance: u64, #[serde(deserialize_with = "default_on_eof")] - reward_type: Option, + pub reward_type: Option, #[serde(deserialize_with = "default_on_eof")] - commission: Option, + pub commission: Option, } impl From for Reward { diff --git a/crates/extractors/solana/Cargo.toml b/crates/extractors/solana/Cargo.toml index 5ff263339..5d5da06a7 100644 --- a/crates/extractors/solana/Cargo.toml +++ b/crates/extractors/solana/Cargo.toml @@ -31,6 +31,7 @@ solana-rpc-client-api = "3.0.10" solana-sdk = "3.0.0" solana-storage-proto = { path = "../solana-storage-proto" } solana-transaction-status-client-types = "3.0.10" +solana-reward-info = "3.0.0" thiserror.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/extractors/solana/src/error.rs b/crates/extractors/solana/src/error.rs index d724d357f..5578d4e51 100644 --- a/crates/extractors/solana/src/error.rs +++ b/crates/extractors/solana/src/error.rs @@ -107,6 +107,13 @@ pub enum Of1StreamError { #[error("expected '{expected}' node for cid '{cid}'")] MissingNode { expected: &'static str, cid: String }, + /// Block reward node slot does not match the expected slot. + /// + /// When processing block reward nodes in the CAR file, the slot + /// recorded within the reward data must match the slot being processed. + #[error("reward slot mismatch: expected {expected}, found {found}")] + RewardSlotMismatch { expected: u64, found: u64 }, + /// Failed to decompress data using Zstd. /// /// CAR files and Solana data structures may be compressed with Zstd diff --git a/crates/extractors/solana/src/extractor.rs b/crates/extractors/solana/src/extractor.rs index 86163eae9..bd69c94d8 100644 --- a/crates/extractors/solana/src/extractor.rs +++ b/crates/extractors/solana/src/extractor.rs @@ -106,6 +106,19 @@ impl SolanaExtractor { T: Stream>, { async_stream::stream! { + // Helper macro to simplify error handling and early returns in the stream. + macro_rules! ok_or_bail { + ($expr:expr) => { + match $expr { + Ok(val) => val, + Err(e) => { + yield Err(e); + return; + } + } + }; + } + // Slots can be skipped, so we'll track the next expected slot for switching to // JSON-RPC. let mut expected_next_slot = start; @@ -114,13 +127,7 @@ impl SolanaExtractor { // Download historical blocks from the Old Faithful archive. futures::pin_mut!(historical_block_stream); while let Some(slot) = historical_block_stream.next().await { - let slot = match slot { - Ok(slot) => slot, - Err(e) => { - yield Err(e); - return; - }, - }; + let slot = ok_or_bail!(slot); let current_slot = slot.slot; if !requested_range.contains(¤t_slot) { @@ -132,7 +139,8 @@ impl SolanaExtractor { } // Don't emit rows for skipped slots. - yield tables::convert_slot_to_db_rows(non_empty_of1_slot(slot), &self.network).map_err(Into::into); + let non_empty_slot = ok_or_bail!(non_empty_of1_slot(slot).map_err(Into::into)); + yield tables::convert_slot_to_db_rows(non_empty_slot, &self.network).map_err(Into::into); if current_slot == end { // Reached the end of the requested range. @@ -158,13 +166,7 @@ impl SolanaExtractor { match get_block_resp { Ok(block) => { - let non_empty_slot = match non_empty_rpc_slot(slot, block) { - Ok(slot) => slot, - Err(e) => { - yield Err(e.into()); - return; - }, - }; + let non_empty_slot = ok_or_bail!(non_empty_rpc_slot(slot, block).map_err(Into::into)); yield tables::convert_slot_to_db_rows(non_empty_slot, &self.network).map_err(Into::into); } Err(e) => { @@ -330,9 +332,9 @@ impl BlockStreamer for SolanaExtractor { } } -/// Converts to [tables::NonEmptySlot]. This conversion cannot fail since the Old Faithful -/// CAR parser only produces non-empty slots. -fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> tables::NonEmptySlot { +/// Converts [of1_client::DecodedSlot] to [tables::NonEmptySlot]. This conversion can fail if any +/// of the decoded fields do not match the expected format/values. +fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> anyhow::Result { let of1_client::DecodedSlot { slot, parent_slot, @@ -342,7 +344,7 @@ fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> tables::NonEmptySlot { blocktime, transactions, transaction_metas, - block_rewards: _block_rewards, + block_rewards, } = slot; let mut txs = Vec::with_capacity(transactions.len()); @@ -357,14 +359,18 @@ fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> tables::NonEmptySlot { let tx = tables::transactions::Transaction::from_of1_transaction( slot, tx_index, signatures, tx_meta, - ); + ) + .context("converting of1 transaction")?; let message = tables::messages::Message::from_of1_message(slot, tx_index, message); txs.push(tx); msgs.push(message); } - tables::NonEmptySlot { + let block_rewards = tables::block_rewards::BlockRewards::from_of1_rewards(slot, block_rewards) + .context("converting of1 block rewards")?; + + Ok(tables::NonEmptySlot { slot, parent_slot, blockhash, @@ -373,7 +379,8 @@ fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> tables::NonEmptySlot { blocktime: Some(blocktime), transactions: txs, messages: msgs, - } + block_rewards, + }) } /// Converts a JSON-RPC confirmed block into a [tables::NonEmptySlot]. This conversion @@ -382,10 +389,13 @@ fn non_empty_rpc_slot( slot: Slot, confirmed_block: rpc_client::UiConfirmedBlock, ) -> anyhow::Result { - // Transactions should be present since we requested them when fetching the block. + // Transactions and block rewards should be present since we requested them when fetching the block. let transactions = confirmed_block .transactions .with_context(|| format!("missing transactions in confirmed block {slot}"))?; + let block_rewards = confirmed_block + .rewards + .with_context(|| format!("missing block rewards in confirmed block {slot}"))?; let mut txs = Vec::with_capacity(transactions.len()); let mut msgs = Vec::with_capacity(transactions.len()); @@ -418,6 +428,8 @@ fn non_empty_rpc_slot( msgs.push(msg); } + let block_rewards = tables::block_rewards::BlockRewards::from_rpc_rewards(slot, block_rewards); + Ok(tables::NonEmptySlot { slot, parent_slot: confirmed_block.parent_slot, @@ -427,6 +439,7 @@ fn non_empty_rpc_slot( blocktime: confirmed_block.block_time, transactions: txs, messages: msgs, + block_rewards, }) } @@ -463,7 +476,7 @@ mod tests { blocktime: 0, transactions: Vec::new(), transaction_metas: Vec::new(), - block_rewards: of1_client::DecodedBlockRewards::Empty, + block_rewards: None, } } } diff --git a/crates/extractors/solana/src/of1_client.rs b/crates/extractors/solana/src/of1_client.rs index 08d8c1250..e79012fb6 100644 --- a/crates/extractors/solana/src/of1_client.rs +++ b/crates/extractors/solana/src/of1_client.rs @@ -48,7 +48,6 @@ pub(crate) type DecodedBlockRewards = DecodedField< >; pub(crate) enum DecodedField { - Empty, Proto(P), Bincode(B), } @@ -61,9 +60,8 @@ pub(crate) struct DecodedSlot { pub(crate) block_height: Option, pub(crate) blocktime: i64, pub(crate) transactions: Vec, - pub(crate) transaction_metas: Vec, - #[allow(dead_code)] - pub(crate) block_rewards: DecodedBlockRewards, + pub(crate) transaction_metas: Vec>, + pub(crate) block_rewards: Option, } pub(crate) async fn car_file_manager( @@ -536,40 +534,34 @@ async fn read_next_slot( P: prost::Message + Default, B: serde::de::DeserializeOwned, { - if data.is_empty() { - Ok(DecodedField::Empty) - } else { - // All fields that need to be decoded this way are ZSTD compressed in CAR files. - let decompressed = &*zstd::decode_all(data).map_err(|e| Of1StreamError::Zstd { - field_name, - error: e.to_string(), - })?; - match prost::Message::decode(decompressed).map(DecodedField::Proto) { - Ok(data_proto) => Ok(data_proto), - Err(prost_err) => { - match bincode::deserialize(decompressed).map(DecodedField::Bincode) { - Ok(data_bincode) => Ok(data_bincode), - Err(bincode_err) => { - let err = Of1StreamError::DecodeField { - field_name, - prost_err: prost_err.to_string(), - bincode_err: bincode_err.to_string(), - }; - Err(err) - } - } + // All fields that need to be decoded this way are ZSTD compressed in CAR files. + let decompressed = &*zstd::decode_all(data).map_err(|e| Of1StreamError::Zstd { + field_name, + error: e.to_string(), + })?; + match prost::Message::decode(decompressed).map(DecodedField::Proto) { + Ok(data_proto) => Ok(data_proto), + Err(prost_err) => match bincode::deserialize(decompressed).map(DecodedField::Bincode) { + Ok(data_bincode) => Ok(data_bincode), + Err(bincode_err) => { + let err = Of1StreamError::DecodeField { + field_name, + prost_err: prost_err.to_string(), + bincode_err: bincode_err.to_string(), + }; + Err(err) } - } + }, } } // Once we reach `Node::Block`, the node map will contain all of the nodes needed to reassemble // that block. - let mut nodes = car_parser::node::Nodes::read_until_block(node_reader) + let nodes = car_parser::node::Nodes::read_until_block(node_reader) .await .map_err(Of1StreamError::NodeParse)?; - let block = match nodes.nodes.pop() { + let block = match nodes.nodes.last() { // Expected block node. Some((_, car_parser::node::Node::Block(block))) => block, // Reached end of CAR file. @@ -577,7 +569,7 @@ async fn read_next_slot( Some((cid, node)) => { return Err(Of1StreamError::UnexpectedNode { kind: node.kind(), - cid: cid.into(), + cid: (*cid).into(), }); } }; @@ -608,13 +600,43 @@ async fn read_next_slot( .map_err(Of1StreamError::DataframeReassembly)?; let tx = bincode::deserialize(&tx_df).map_err(Of1StreamError::Bincode)?; - let tx_meta = decode_proto_or_bincode("tx_status_meta", tx_meta_df.as_slice())?; - transactions.push(tx); + + let tx_meta = if tx_meta_df.is_empty() { + None + } else { + decode_proto_or_bincode("tx_status_meta", tx_meta_df.as_slice()).map(Some)? + }; transaction_metas.push(tx_meta); } } + let block_rewards = nodes + .nodes + .get(&block.rewards) + .map(|rewards| { + let car_parser::node::Node::Rewards(rewards) = rewards else { + return Err(Of1StreamError::UnexpectedNode { + kind: rewards.kind(), + cid: block.rewards.to_string(), + }); + }; + if rewards.slot != block.slot { + return Err(Of1StreamError::RewardSlotMismatch { + expected: block.slot, + found: rewards.slot, + }); + } + + nodes + .reassemble_dataframes(&rewards.data) + .map_err(Of1StreamError::DataframeReassembly) + .and_then(|rewards_df| { + decode_proto_or_bincode("block_rewards", rewards_df.as_slice()) + }) + }) + .transpose()?; + let blockhash = { // Hash of the last entry has the same value as that block's `blockhash` in // CAR files. @@ -648,9 +670,7 @@ async fn read_next_slot( blocktime, transactions, transaction_metas, - // TODO: Work with rewards? - #[allow(dead_code)] - block_rewards: DecodedField::Empty, + block_rewards, }; Ok(Some(block)) diff --git a/crates/extractors/solana/src/rpc_client.rs b/crates/extractors/solana/src/rpc_client.rs index d2fd2c74a..2b589bda6 100644 --- a/crates/extractors/solana/src/rpc_client.rs +++ b/crates/extractors/solana/src/rpc_client.rs @@ -1,13 +1,10 @@ use std::{num::NonZeroU32, sync::Arc, time::Instant}; use datasets_common::network_id::NetworkId; -pub use solana_client::{ - rpc_config, - rpc_response::{RewardType, UiReturnDataEncoding}, -}; +pub use solana_client::{rpc_config, rpc_response::UiReturnDataEncoding}; use solana_clock::Slot; pub use solana_transaction_status_client_types::{ - EncodedTransaction, EncodedTransactionWithStatusMeta, TransactionStatusMeta, + EncodedTransaction, EncodedTransactionWithStatusMeta, Reward, TransactionStatusMeta, TransactionTokenBalance, UiConfirmedBlock, UiInstruction, UiMessage, UiRawMessage, UiTransaction, UiTransactionStatusMeta, UiTransactionTokenBalance, }; diff --git a/crates/extractors/solana/src/tables.rs b/crates/extractors/solana/src/tables.rs index e7afe555a..ae54d75c8 100644 --- a/crates/extractors/solana/src/tables.rs +++ b/crates/extractors/solana/src/tables.rs @@ -5,6 +5,7 @@ use solana_clock::Slot; use crate::error::RowConversionError; pub mod block_headers; +pub mod block_rewards; pub mod instructions; pub mod messages; pub mod transactions; @@ -15,6 +16,7 @@ pub(crate) const BASE58_ENCODED_HASH_LEN: usize = 44; pub fn all(network: &NetworkId) -> Vec { vec![ block_headers::table(network.clone()), + block_rewards::table(network.clone()), transactions::table(network.clone()), messages::table(network.clone()), instructions::table(network.clone()), @@ -31,6 +33,7 @@ pub(crate) struct NonEmptySlot { pub(crate) blocktime: Option, pub(crate) transactions: Vec, pub(crate) messages: Vec, + pub(crate) block_rewards: block_rewards::BlockRewards, } pub(crate) fn convert_slot_to_db_rows( @@ -46,6 +49,7 @@ pub(crate) fn convert_slot_to_db_rows( blocktime, transactions, messages, + block_rewards, } = non_empty_slot; let range = BlockRange { @@ -74,6 +78,15 @@ pub(crate) fn convert_slot_to_db_rows( .map_err(RowConversionError::TableBuild)? }; + let block_rewards_row = { + let mut builder = + block_rewards::BlockRewardsRowsBuilder::with_capacity(block_rewards.rewards.len()); + builder.append(&block_rewards); + builder + .build(range.clone()) + .map_err(RowConversionError::TableBuild)? + }; + let transactions_row = { let mut builder = transactions::TransactionRowsBuilder::with_capacity(transactions.len()); for tx in &transactions { @@ -123,6 +136,7 @@ pub(crate) fn convert_slot_to_db_rows( Ok(Rows::new(vec![ block_headers_row, + block_rewards_row, transactions_row, messages_row, instructions_row, diff --git a/crates/extractors/solana/src/tables/block_rewards.rs b/crates/extractors/solana/src/tables/block_rewards.rs new file mode 100644 index 000000000..121e205b7 --- /dev/null +++ b/crates/extractors/solana/src/tables/block_rewards.rs @@ -0,0 +1,264 @@ +use std::sync::{Arc, LazyLock}; + +use anyhow::Context; +use datasets_common::{ + block_range::BlockRange, + dataset::{SPECIAL_BLOCK_NUM, Table}, + network_id::NetworkId, +}; +use datasets_raw::{ + arrow::{ + ArrayRef, DataType, Field, Int64Builder, Schema, SchemaRef, StringBuilder, UInt64Builder, + }, + rows::{TableRowError, TableRows}, +}; +use solana_clock::Slot; + +use crate::{of1_client, rpc_client, tables}; + +pub const TABLE_NAME: &str = "block_rewards"; + +static SCHEMA: LazyLock = LazyLock::new(|| Arc::new(schema())); + +pub fn table(network: NetworkId) -> Table { + let name = TABLE_NAME.parse().expect("table name is valid"); + Table::new(name, SCHEMA.clone(), network, vec!["slot".to_string()]) +} + +/// Prefer using the pre-computed [SCHEMA]. +fn schema() -> Schema { + let fields = vec![ + Field::new(SPECIAL_BLOCK_NUM, DataType::UInt64, false), + Field::new("slot", DataType::UInt64, false), + Field::new("pubkey", DataType::Utf8, false), + Field::new("lamports", DataType::Int64, false), + Field::new("post_balance", DataType::UInt64, false), + Field::new("reward_type", DataType::Utf8, true), + Field::new("commission", DataType::Utf8, true), + ]; + + Schema::new(fields) +} + +#[derive(Debug, Default, Clone)] +pub(crate) struct Reward { + pub(crate) pubkey: String, + pub(crate) lamports: i64, + pub(crate) post_balance: u64, + pub(crate) reward_type: Option, + pub(crate) commission: Option, +} + +#[derive(Debug, Clone)] +pub(crate) enum RewardType { + /// The `Unspecified` variant only in the protobuf version of + /// RewardType, it is not defined by the Solana spec. + Unspecified, + Fee, + Rent, + Staking, + Voting, +} + +impl From for Reward { + fn from(value: solana_storage_proto::StoredExtendedReward) -> Self { + Self { + pubkey: value.pubkey, + lamports: value.lamports, + post_balance: value.post_balance, + reward_type: value.reward_type.map(Into::into), + commission: value.commission, + } + } +} + +impl TryFrom for Reward { + type Error = anyhow::Error; + + fn try_from(value: solana_storage_proto::confirmed_block::Reward) -> Result { + let reward = Self { + pubkey: value.pubkey, + lamports: value.lamports, + post_balance: value.post_balance, + reward_type: value + .reward_type + .try_into() + .context("parsing proto reward type") + .map(|reward| match reward { + RewardType::Unspecified => None, + reward => Some(reward), + })?, + commission: if value.commission.is_empty() { + None + } else { + value + .commission + .parse() + .map(Some) + .context("parsing proto commission")? + }, + }; + + Ok(reward) + } +} + +impl From for Reward { + fn from(value: rpc_client::Reward) -> Self { + Self { + pubkey: value.pubkey, + lamports: value.lamports, + post_balance: value.post_balance, + reward_type: value.reward_type.map(Into::into), + commission: value.commission, + } + } +} + +impl std::fmt::Display for RewardType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RewardType::Unspecified => write!(f, "unspecified"), + RewardType::Fee => write!(f, "fee"), + RewardType::Rent => write!(f, "rent"), + RewardType::Staking => write!(f, "staking"), + RewardType::Voting => write!(f, "voting"), + } + } +} + +impl TryFrom for RewardType { + type Error = anyhow::Error; + + fn try_from(value: i32) -> Result { + let typ = match value { + 0 => Self::Unspecified, + 1 => Self::Fee, + 2 => Self::Rent, + 3 => Self::Staking, + 4 => Self::Voting, + _ => anyhow::bail!("invalid reward type: {value}"), + }; + + Ok(typ) + } +} + +impl From for RewardType { + fn from(value: solana_reward_info::RewardType) -> Self { + match value { + solana_reward_info::RewardType::Fee => Self::Fee, + solana_reward_info::RewardType::Rent => Self::Rent, + solana_reward_info::RewardType::Staking => Self::Staking, + solana_reward_info::RewardType::Voting => Self::Voting, + } + } +} + +/// Solana block rewards. +#[derive(Debug, Default, Clone)] +pub(crate) struct BlockRewards { + pub(crate) slot: Slot, + pub(crate) rewards: Vec, +} + +impl BlockRewards { + pub(crate) fn from_of1_rewards( + slot: Slot, + rewards: Option, + ) -> anyhow::Result { + let rewards: Vec = rewards + .map(|rewards| { + let rewards = match rewards { + of1_client::DecodedField::Proto(proto_rewards) => proto_rewards + .rewards + .into_iter() + .map(TryInto::try_into) + .collect::>() + .context("converting of1 block rewards")?, + of1_client::DecodedField::Bincode(bincode_rewards) => { + bincode_rewards.into_iter().map(Into::into).collect() + } + }; + + anyhow::Ok(rewards) + }) + .transpose()? + .unwrap_or_default(); + + Ok(Self { slot, rewards }) + } + + pub(crate) fn from_rpc_rewards(slot: Slot, rewards: Vec) -> Self { + let rewards = rewards.into_iter().map(Into::into).collect(); + Self { slot, rewards } + } +} + +/// A builder for converting [BlockRewards] into [TableRows]. +pub(crate) struct BlockRewardsRowsBuilder { + special_block_num: UInt64Builder, + slot: UInt64Builder, + pubkey: StringBuilder, + lamports: Int64Builder, + post_balance: UInt64Builder, + reward_type: StringBuilder, + commission: StringBuilder, +} + +impl BlockRewardsRowsBuilder { + /// Creates a new [BlockRewardsRowsBuilder]. + pub(crate) fn with_capacity(capacity: usize) -> Self { + Self { + special_block_num: UInt64Builder::with_capacity(capacity), + slot: UInt64Builder::with_capacity(capacity), + pubkey: StringBuilder::with_capacity(capacity, tables::BASE58_ENCODED_HASH_LEN), + lamports: Int64Builder::with_capacity(capacity), + post_balance: UInt64Builder::with_capacity(capacity), + reward_type: StringBuilder::with_capacity(capacity, capacity * 8), + commission: StringBuilder::with_capacity(capacity, capacity * 4), + } + } + + /// Appends a [BlockRewards] to the builder. + pub(crate) fn append(&mut self, rewards: &BlockRewards) { + let BlockRewards { slot, rewards } = rewards; + + for reward in rewards { + self.special_block_num.append_value(*slot); + self.slot.append_value(*slot); + self.pubkey.append_value(&reward.pubkey); + self.lamports.append_value(reward.lamports); + self.post_balance.append_value(reward.post_balance); + self.reward_type + .append_option(reward.reward_type.as_ref().map(|t| t.to_string())); + self.commission + .append_option(reward.commission.map(|c| c.to_string())); + } + } + + /// Builds the [TableRows] from the appended data. + pub(crate) fn build(self, range: BlockRange) -> Result { + let Self { + mut special_block_num, + mut slot, + mut pubkey, + mut lamports, + mut post_balance, + mut reward_type, + mut commission, + } = self; + + let columns = vec![ + Arc::new(special_block_num.finish()) as ArrayRef, + Arc::new(slot.finish()), + Arc::new(pubkey.finish()), + Arc::new(lamports.finish()), + Arc::new(post_balance.finish()), + Arc::new(reward_type.finish()), + Arc::new(commission.finish()), + ]; + + TableRows::new(table(range.network.clone()), range, columns) + } +} diff --git a/crates/extractors/solana/src/tables/transactions.rs b/crates/extractors/solana/src/tables/transactions.rs index b0e0a38ee..a5cb372a7 100644 --- a/crates/extractors/solana/src/tables/transactions.rs +++ b/crates/extractors/solana/src/tables/transactions.rs @@ -15,10 +15,9 @@ use datasets_raw::{ }, rows::{TableRowError, TableRows}, }; -use serde::Deserialize; use solana_clock::Slot; -use crate::{of1_client, rpc_client}; +use crate::{of1_client, rpc_client, tables}; pub const TABLE_NAME: &str = "transactions"; @@ -135,28 +134,27 @@ impl Transaction { slot: Slot, tx_index: u32, of1_tx_signatures: Vec, - of1_tx_meta: of1_client::DecodedTransactionStatusMeta, - ) -> Self { + of1_tx_meta: Option, + ) -> anyhow::Result { let signatures = of1_tx_signatures.iter().map(|s| s.to_string()).collect(); - let transaction_status_meta = match of1_tx_meta { - of1_client::DecodedTransactionStatusMeta::Empty => None, - of1_client::DecodedTransactionStatusMeta::Proto(proto_meta) => { - let tx_meta = TransactionStatusMeta::from_proto_meta(slot, tx_index, proto_meta); - Some(tx_meta) - } + let transaction_status_meta = of1_tx_meta + .map(|meta| match meta { + of1_client::DecodedTransactionStatusMeta::Proto(proto_meta) => { + TransactionStatusMeta::from_proto_meta(slot, tx_index, proto_meta) + } - of1_client::DecodedTransactionStatusMeta::Bincode(stored_meta) => { - let tx_meta = TransactionStatusMeta::from_stored_meta(slot, tx_index, stored_meta); - Some(tx_meta) - } - }; + of1_client::DecodedTransactionStatusMeta::Bincode(stored_meta) => Ok( + TransactionStatusMeta::from_stored_meta(slot, tx_index, stored_meta), + ), + }) + .transpose()?; - Self { + Ok(Self { slot, index: tx_index, signatures, transaction_status_meta, - } + }) } pub(crate) fn from_rpc_transaction( @@ -185,11 +183,11 @@ pub(crate) struct TransactionStatusMeta { pub(crate) fee: u64, pub(crate) pre_balances: Vec, pub(crate) post_balances: Vec, - pub(crate) inner_instructions: Option>>, + pub(crate) inner_instructions: Option>>, pub(crate) log_messages: Option>, pub(crate) pre_token_balances: Option>, pub(crate) post_token_balances: Option>, - pub(crate) rewards: Option>, + pub(crate) rewards: Option>, pub(crate) loaded_addresses: Option, pub(crate) return_data: Option, pub(crate) compute_units_consumed: Option, @@ -212,7 +210,7 @@ impl TransactionStatusMeta { inner_instructions .instructions .into_iter() - .map(|inst| super::instructions::Instruction { + .map(|inst| tables::instructions::Instruction { slot, tx_index, program_id_index: inst.instruction.program_id_index, @@ -233,18 +231,9 @@ impl TransactionStatusMeta { .post_token_balances .map(|post_token_balances| post_token_balances.into_iter().map(From::from).collect()); - let rewards = rpc_tx_meta.rewards.map(|rewards| { - rewards - .into_iter() - .map(|reward| Reward { - pubkey: reward.pubkey, - lamports: reward.lamports, - post_balance: reward.post_balance, - reward_type: reward.reward_type.map(From::from), - commission: reward.commission, - }) - .collect() - }); + let rewards = rpc_tx_meta + .rewards + .map(|rewards| rewards.into_iter().map(Into::into).collect()); let loaded_addresses = LoadedAddresses { writable: rpc_tx_meta @@ -289,7 +278,7 @@ impl TransactionStatusMeta { slot: Slot, tx_index: u32, of_tx_meta: solana_storage_proto::confirmed_block::TransactionStatusMeta, - ) -> Self { + ) -> anyhow::Result { let inner_instructions = of_tx_meta .inner_instructions .into_iter() @@ -297,7 +286,7 @@ impl TransactionStatusMeta { inner .instructions .into_iter() - .map(|inst| super::instructions::Instruction { + .map(|inst| tables::instructions::Instruction { slot, tx_index, program_id_index: inst.program_id_index as u8, @@ -321,21 +310,12 @@ impl TransactionStatusMeta { .map(TransactionTokenBalance::from) .collect(); - let rewards = of_tx_meta + let rewards: Vec = of_tx_meta .rewards .into_iter() - .map(|reward| Reward { - pubkey: reward.pubkey, - lamports: reward.lamports, - post_balance: reward.post_balance, - reward_type: Some(reward.reward_type.into()), - commission: if reward.commission.is_empty() { - None - } else { - Some(reward.commission.parse().expect("commission parsing error")) - }, - }) - .collect(); + .map(TryInto::try_into) + .collect::>() + .context("converting of1 tx rewards")?; let loaded_addresses = LoadedAddresses { writable: of_tx_meta @@ -358,7 +338,7 @@ impl TransactionStatusMeta { } }); - TransactionStatusMeta { + Ok(TransactionStatusMeta { status: of_tx_meta.err.is_none(), fee: of_tx_meta.fee, pre_balances: of_tx_meta.pre_balances, @@ -372,7 +352,7 @@ impl TransactionStatusMeta { return_data, compute_units_consumed: of_tx_meta.compute_units_consumed, cost_units: of_tx_meta.cost_units, - } + }) } pub(crate) fn from_rpc_meta( @@ -397,7 +377,7 @@ impl TransactionStatusMeta { let data = bs58::decode(&compiled.data) .into_vec() .context("decoding base58 inner instruction data")?; - let instruction = super::instructions::Instruction { + let instruction = tables::instructions::Instruction { slot, tx_index, program_id_index: compiled.program_id_index, @@ -430,18 +410,9 @@ impl TransactionStatusMeta { .collect() }); - let rewards = rpc_meta.rewards.map(|rewards| { - rewards - .into_iter() - .map(|reward| Reward { - pubkey: reward.pubkey, - lamports: reward.lamports, - post_balance: reward.post_balance, - reward_type: reward.reward_type.map(Into::into), - commission: reward.commission, - }) - .collect() - }); + let rewards = rpc_meta + .rewards + .map(|rewards| rewards.into_iter().map(Into::into).collect()); let loaded_addresses = rpc_meta .loaded_addresses @@ -484,7 +455,7 @@ impl TransactionStatusMeta { } } -#[derive(Debug, Default, Deserialize, Clone)] +#[derive(Debug, Default, Clone)] pub(crate) struct TransactionTokenBalance { account_index: u8, mint: String, @@ -550,7 +521,7 @@ impl From for TransactionTo } } -#[derive(Debug, Default, Deserialize, Clone)] +#[derive(Debug, Default, Clone)] struct TokenAmount { ui_amount: Option, decimals: u8, @@ -558,64 +529,13 @@ struct TokenAmount { ui_amount_string: String, } -#[derive(Debug, Default, Deserialize, Clone)] -pub(crate) struct Reward { - pubkey: String, - lamports: i64, - post_balance: u64, - reward_type: Option, - commission: Option, -} - -#[derive(Debug, Deserialize, Clone)] -enum RewardType { - Fee, - Rent, - Staking, - Voting, -} - -impl std::fmt::Display for RewardType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RewardType::Fee => write!(f, "fee"), - RewardType::Rent => write!(f, "rent"), - RewardType::Staking => write!(f, "staking"), - RewardType::Voting => write!(f, "voting"), - } - } -} - -impl From for RewardType { - fn from(value: i32) -> Self { - match value { - 0 => Self::Fee, - 1 => Self::Rent, - 2 => Self::Staking, - 3 => Self::Voting, - _ => panic!("invalid reward type value: {}", value), - } - } -} - -impl From for RewardType { - fn from(value: rpc_client::RewardType) -> Self { - match value { - rpc_client::RewardType::Fee => Self::Fee, - rpc_client::RewardType::Rent => Self::Rent, - rpc_client::RewardType::Staking => Self::Staking, - rpc_client::RewardType::Voting => Self::Voting, - } - } -} - -#[derive(Debug, Default, Deserialize, Clone)] +#[derive(Debug, Default, Clone)] pub(crate) struct LoadedAddresses { writable: Vec, readonly: Vec, } -#[derive(Debug, Default, Deserialize, Clone)] +#[derive(Debug, Default, Clone)] pub(crate) struct TransactionReturnData { program_id: String, data: Vec, diff --git a/docs/schemas/solana.md b/docs/schemas/solana.md index 4f522a1db..3a144643c 100644 --- a/docs/schemas/solana.md +++ b/docs/schemas/solana.md @@ -15,6 +15,20 @@ Auto-generated file. See `to_markdown` in `crates/core/datasets-raw/src/schema.r | block_time | Int64 | YES | +---------------------+-----------+-------------+ ```` +## block_rewards +```` ++--------------+-----------+-------------+ +| column_name | data_type | is_nullable | ++--------------+-----------+-------------+ +| _block_num | UInt64 | NO | +| slot | UInt64 | NO | +| pubkey | Utf8 | NO | +| lamports | Int64 | NO | +| post_balance | UInt64 | NO | +| reward_type | Utf8 | YES | +| commission | Utf8 | YES | ++--------------+-----------+-------------+ +```` ## transactions ```` +---------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ diff --git a/tests/config/manifests/solana.json b/tests/config/manifests/solana.json index 033a954da..c4986cdcb 100644 --- a/tests/config/manifests/solana.json +++ b/tests/config/manifests/solana.json @@ -48,6 +48,50 @@ }, "network": "mainnet" }, + "block_rewards": { + "schema": { + "arrow": { + "fields": [ + { + "name": "_block_num", + "type": "UInt64", + "nullable": false + }, + { + "name": "slot", + "type": "UInt64", + "nullable": false + }, + { + "name": "pubkey", + "type": "Utf8", + "nullable": false + }, + { + "name": "lamports", + "type": "Int64", + "nullable": false + }, + { + "name": "post_balance", + "type": "UInt64", + "nullable": false + }, + { + "name": "reward_type", + "type": "Utf8", + "nullable": true + }, + { + "name": "commission", + "type": "Utf8", + "nullable": true + } + ] + } + }, + "network": "mainnet" + }, "instructions": { "schema": { "arrow": {