Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions crates/extractors/solana-storage-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ pub type StoredExtendedRewards = Vec<StoredExtendedReward>;

#[derive(Serialize, Deserialize)]
pub struct StoredExtendedReward {
pubkey: String,
lamports: i64,
pub pubkey: String,
pub lamports: i64,
#[serde(deserialize_with = "default_on_eof")]
post_balance: u64,
pub post_balance: u64,
#[serde(deserialize_with = "default_on_eof")]
reward_type: Option<RewardType>,
pub reward_type: Option<RewardType>,
#[serde(deserialize_with = "default_on_eof")]
commission: Option<u8>,
pub commission: Option<u8>,
}

impl From<StoredExtendedReward> for Reward {
Expand Down
1 change: 1 addition & 0 deletions crates/extractors/solana/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ solana-rpc-client-api = "3.0.10"
solana-sdk = "3.0.0"
solana-storage-proto = { path = "../solana-storage-proto" }
solana-transaction-status-client-types = "3.0.10"
solana-reward-info = "3.0.0"
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions crates/extractors/solana/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ pub enum Of1StreamError {
#[error("expected '{expected}' node for cid '{cid}'")]
MissingNode { expected: &'static str, cid: String },

/// Block reward node slot does not match the expected slot.
///
/// When processing block reward nodes in the CAR file, the slot
/// recorded within the reward data must match the slot being processed.
#[error("reward slot mismatch: expected {expected}, found {found}")]
RewardSlotMismatch { expected: u64, found: u64 },

/// Failed to decompress data using Zstd.
///
/// CAR files and Solana data structures may be compressed with Zstd
Expand Down
61 changes: 37 additions & 24 deletions crates/extractors/solana/src/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ impl SolanaExtractor {
T: Stream<Item = Result<of1_client::DecodedSlot, BlockStreamError>>,
{
async_stream::stream! {
// Helper macro to simplify error handling and early returns in the stream.
macro_rules! ok_or_bail {
Comment on lines +109 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ok_or_bail! macro is a useful pattern for stream error handling. However, defining it inside the async_stream::stream! block means it's recreated for every stream instantiation.

Consider moving this to a module-level macro or extracting the error-and-return pattern into a helper. That said, macro hygiene within async_stream::stream! can be tricky, so the current approach is pragmatic if moving it causes issues.

($expr:expr) => {
match $expr {
Ok(val) => val,
Err(e) => {
yield Err(e);
return;
}
}
};
}

// Slots can be skipped, so we'll track the next expected slot for switching to
// JSON-RPC.
let mut expected_next_slot = start;
Expand All @@ -114,13 +127,7 @@ impl SolanaExtractor {
// Download historical blocks from the Old Faithful archive.
futures::pin_mut!(historical_block_stream);
while let Some(slot) = historical_block_stream.next().await {
let slot = match slot {
Ok(slot) => slot,
Err(e) => {
yield Err(e);
return;
},
};
let slot = ok_or_bail!(slot);

let current_slot = slot.slot;
if !requested_range.contains(&current_slot) {
Expand All @@ -132,7 +139,8 @@ impl SolanaExtractor {
}

// Don't emit rows for skipped slots.
yield tables::convert_slot_to_db_rows(non_empty_of1_slot(slot), &self.network).map_err(Into::into);
let non_empty_slot = ok_or_bail!(non_empty_of1_slot(slot).map_err(Into::into));
yield tables::convert_slot_to_db_rows(non_empty_slot, &self.network).map_err(Into::into);

if current_slot == end {
// Reached the end of the requested range.
Expand All @@ -158,13 +166,7 @@ impl SolanaExtractor {

match get_block_resp {
Ok(block) => {
let non_empty_slot = match non_empty_rpc_slot(slot, block) {
Ok(slot) => slot,
Err(e) => {
yield Err(e.into());
return;
},
};
let non_empty_slot = ok_or_bail!(non_empty_rpc_slot(slot, block).map_err(Into::into));
yield tables::convert_slot_to_db_rows(non_empty_slot, &self.network).map_err(Into::into);
}
Err(e) => {
Expand Down Expand Up @@ -330,9 +332,9 @@ impl BlockStreamer for SolanaExtractor {
}
}

/// Converts to [tables::NonEmptySlot]. This conversion cannot fail since the Old Faithful
/// CAR parser only produces non-empty slots.
fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> tables::NonEmptySlot {
/// Converts [of1_client::DecodedSlot] to [tables::NonEmptySlot]. This conversion can fail if any
/// of the decoded fields do not match the expected format/values.
fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> anyhow::Result<tables::NonEmptySlot> {
let of1_client::DecodedSlot {
slot,
parent_slot,
Expand All @@ -342,7 +344,7 @@ fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> tables::NonEmptySlot {
blocktime,
transactions,
transaction_metas,
block_rewards: _block_rewards,
block_rewards,
} = slot;

let mut txs = Vec::with_capacity(transactions.len());
Expand All @@ -357,14 +359,18 @@ fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> tables::NonEmptySlot {

let tx = tables::transactions::Transaction::from_of1_transaction(
slot, tx_index, signatures, tx_meta,
);
)
.context("converting of1 transaction")?;
let message = tables::messages::Message::from_of1_message(slot, tx_index, message);

txs.push(tx);
msgs.push(message);
}

Comment on lines 368 to 369
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error context "missing of1 block rewards" is misleading. Looking at BlockRewards::from_of1_rewards, it accepts Option<DecodedBlockRewards> and returns an empty rewards list when None is passed.

The actual errors that could occur here are related to malformed reward data (e.g., invalid reward type values, unparseable commission strings). Consider a more accurate context:

let block_rewards = tables::block_rewards::BlockRewards::from_of1_rewards(slot, block_rewards)
    .context("parsing of1 block rewards")?;

tables::NonEmptySlot {
let block_rewards = tables::block_rewards::BlockRewards::from_of1_rewards(slot, block_rewards)
.context("converting of1 block rewards")?;

Ok(tables::NonEmptySlot {
slot,
parent_slot,
blockhash,
Expand All @@ -373,7 +379,8 @@ fn non_empty_of1_slot(slot: of1_client::DecodedSlot) -> tables::NonEmptySlot {
blocktime: Some(blocktime),
transactions: txs,
messages: msgs,
}
block_rewards,
})
}

/// Converts a JSON-RPC confirmed block into a [tables::NonEmptySlot]. This conversion
Expand All @@ -382,10 +389,13 @@ fn non_empty_rpc_slot(
slot: Slot,
confirmed_block: rpc_client::UiConfirmedBlock,
) -> anyhow::Result<tables::NonEmptySlot> {
// Transactions should be present since we requested them when fetching the block.
// Transactions and block rewards should be present since we requested them when fetching the block.
let transactions = confirmed_block
.transactions
.with_context(|| format!("missing transactions in confirmed block {slot}"))?;
let block_rewards = confirmed_block
.rewards
.with_context(|| format!("missing block rewards in confirmed block {slot}"))?;

let mut txs = Vec::with_capacity(transactions.len());
let mut msgs = Vec::with_capacity(transactions.len());
Expand Down Expand Up @@ -418,6 +428,8 @@ fn non_empty_rpc_slot(
msgs.push(msg);
}

let block_rewards = tables::block_rewards::BlockRewards::from_rpc_rewards(slot, block_rewards);

Ok(tables::NonEmptySlot {
slot,
parent_slot: confirmed_block.parent_slot,
Expand All @@ -427,6 +439,7 @@ fn non_empty_rpc_slot(
blocktime: confirmed_block.block_time,
transactions: txs,
messages: msgs,
block_rewards,
})
}

Expand Down Expand Up @@ -463,7 +476,7 @@ mod tests {
blocktime: 0,
transactions: Vec::new(),
transaction_metas: Vec::new(),
block_rewards: of1_client::DecodedBlockRewards::Empty,
block_rewards: None,
}
}
}
Expand Down
90 changes: 55 additions & 35 deletions crates/extractors/solana/src/of1_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pub(crate) type DecodedBlockRewards = DecodedField<
>;

pub(crate) enum DecodedField<P, B> {
Empty,
Proto(P),
Bincode(B),
}
Expand All @@ -61,9 +60,8 @@ pub(crate) struct DecodedSlot {
pub(crate) block_height: Option<u64>,
pub(crate) blocktime: i64,
pub(crate) transactions: Vec<solana_sdk::transaction::VersionedTransaction>,
pub(crate) transaction_metas: Vec<DecodedTransactionStatusMeta>,
#[allow(dead_code)]
pub(crate) block_rewards: DecodedBlockRewards,
pub(crate) transaction_metas: Vec<Option<DecodedTransactionStatusMeta>>,
pub(crate) block_rewards: Option<DecodedBlockRewards>,
}

pub(crate) async fn car_file_manager(
Expand Down Expand Up @@ -536,48 +534,42 @@ async fn read_next_slot<R: tokio::io::AsyncRead + Unpin>(
P: prost::Message + Default,
B: serde::de::DeserializeOwned,
{
if data.is_empty() {
Ok(DecodedField::Empty)
} else {
// All fields that need to be decoded this way are ZSTD compressed in CAR files.
let decompressed = &*zstd::decode_all(data).map_err(|e| Of1StreamError::Zstd {
field_name,
error: e.to_string(),
})?;
match prost::Message::decode(decompressed).map(DecodedField::Proto) {
Ok(data_proto) => Ok(data_proto),
Err(prost_err) => {
match bincode::deserialize(decompressed).map(DecodedField::Bincode) {
Ok(data_bincode) => Ok(data_bincode),
Err(bincode_err) => {
let err = Of1StreamError::DecodeField {
field_name,
prost_err: prost_err.to_string(),
bincode_err: bincode_err.to_string(),
};
Err(err)
}
}
// All fields that need to be decoded this way are ZSTD compressed in CAR files.
let decompressed = &*zstd::decode_all(data).map_err(|e| Of1StreamError::Zstd {
field_name,
error: e.to_string(),
})?;
match prost::Message::decode(decompressed).map(DecodedField::Proto) {
Ok(data_proto) => Ok(data_proto),
Err(prost_err) => match bincode::deserialize(decompressed).map(DecodedField::Bincode) {
Ok(data_bincode) => Ok(data_bincode),
Err(bincode_err) => {
let err = Of1StreamError::DecodeField {
field_name,
prost_err: prost_err.to_string(),
bincode_err: bincode_err.to_string(),
};
Err(err)
}
}
},
}
}

// Once we reach `Node::Block`, the node map will contain all of the nodes needed to reassemble
// that block.
let mut nodes = car_parser::node::Nodes::read_until_block(node_reader)
let nodes = car_parser::node::Nodes::read_until_block(node_reader)
.await
.map_err(Of1StreamError::NodeParse)?;

let block = match nodes.nodes.pop() {
let block = match nodes.nodes.last() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential edge case here: nodes.nodes.last() returns a reference to the last element in iteration order of the HashMap (or whatever map type Nodes uses internally). This assumes the block node is always the last one added during read_until_block.

Previously with .pop(), the code was explicitly removing and consuming the last element from a mutable nodes. Now with .last(), you're relying on iteration order being consistent.

If Nodes uses a HashMap, the iteration order is not guaranteed and could vary. Could you confirm that Nodes::nodes preserves insertion order (e.g., uses IndexMap or similar)?

// Expected block node.
Some((_, car_parser::node::Node::Block(block))) => block,
// Reached end of CAR file.
None | Some((_, car_parser::node::Node::Epoch(_))) => return Ok(None),
Some((cid, node)) => {
return Err(Of1StreamError::UnexpectedNode {
kind: node.kind(),
cid: cid.into(),
cid: (*cid).into(),
});
}
};
Expand Down Expand Up @@ -608,13 +600,43 @@ async fn read_next_slot<R: tokio::io::AsyncRead + Unpin>(
.map_err(Of1StreamError::DataframeReassembly)?;

let tx = bincode::deserialize(&tx_df).map_err(Of1StreamError::Bincode)?;
let tx_meta = decode_proto_or_bincode("tx_status_meta", tx_meta_df.as_slice())?;

transactions.push(tx);

let tx_meta = if tx_meta_df.is_empty() {
None
} else {
decode_proto_or_bincode("tx_status_meta", tx_meta_df.as_slice()).map(Some)?
};
transaction_metas.push(tx_meta);
}
}

let block_rewards = nodes
.nodes
.get(&block.rewards)
.map(|rewards| {
let car_parser::node::Node::Rewards(rewards) = rewards else {
Comment on lines +617 to +618
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error variant for when the node exists but isn't a Rewards node reuses MissingNode, which is semantically incorrect - the node isn't missing, it's the wrong type. This could be confusing for debugging.

Consider using UnexpectedNode instead (similar to line 570-573), or adding a dedicated WrongNodeType error variant:

let Some(car_parser::node::Node::Rewards(rewards)) = nodes.nodes.get(&block.rewards) else {
    return Err(Of1StreamError::UnexpectedNode {
        kind: "non-rewards",
        cid: block.rewards.to_string(),
    });
};

return Err(Of1StreamError::UnexpectedNode {
kind: rewards.kind(),
cid: block.rewards.to_string(),
});
};
if rewards.slot != block.slot {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slot mismatch check is a good data integrity validation. However, the error message could be more actionable by including the CID or other context about which block/rewards node caused the mismatch:

#[error("reward slot mismatch for cid {cid}: expected {expected}, found {found}")]
RewardSlotMismatch { expected: u64, found: u64, cid: String },

This would help with debugging if this error occurs in production.

return Err(Of1StreamError::RewardSlotMismatch {
expected: block.slot,
found: rewards.slot,
});
}

nodes
.reassemble_dataframes(&rewards.data)
.map_err(Of1StreamError::DataframeReassembly)
.and_then(|rewards_df| {
decode_proto_or_bincode("block_rewards", rewards_df.as_slice())
})
})
.transpose()?;

let blockhash = {
// Hash of the last entry has the same value as that block's `blockhash` in
// CAR files.
Expand Down Expand Up @@ -648,9 +670,7 @@ async fn read_next_slot<R: tokio::io::AsyncRead + Unpin>(
blocktime,
transactions,
transaction_metas,
// TODO: Work with rewards?
#[allow(dead_code)]
block_rewards: DecodedField::Empty,
block_rewards,
};

Ok(Some(block))
Expand Down
7 changes: 2 additions & 5 deletions crates/extractors/solana/src/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use std::{num::NonZeroU32, sync::Arc, time::Instant};

use datasets_common::network_id::NetworkId;
pub use solana_client::{
rpc_config,
rpc_response::{RewardType, UiReturnDataEncoding},
};
pub use solana_client::{rpc_config, rpc_response::UiReturnDataEncoding};
use solana_clock::Slot;
pub use solana_transaction_status_client_types::{
EncodedTransaction, EncodedTransactionWithStatusMeta, TransactionStatusMeta,
EncodedTransaction, EncodedTransactionWithStatusMeta, Reward, TransactionStatusMeta,
TransactionTokenBalance, UiConfirmedBlock, UiInstruction, UiMessage, UiRawMessage,
UiTransaction, UiTransactionStatusMeta, UiTransactionTokenBalance,
};
Expand Down
Loading