Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions src/args.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,49 @@
use clap::{ArgAction, Parser};

use std::str::FromStr;

use crate::clients::beacon::types::BlockId;

#[derive(Debug, Clone)]
pub enum NumThreads {
Auto,
Number(u32),
}

impl FromStr for NumThreads {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case("auto") {
Ok(NumThreads::Auto)
} else {
s.parse::<u32>()
.map(NumThreads::Number)
.map_err(|_| format!("Invalid value for num_threads: {}", s))
}
}
}

impl std::fmt::Display for NumThreads {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NumThreads::Auto => write!(f, "auto"),
NumThreads::Number(n) => write!(f, "{}", n),
}
}
}

impl NumThreads {
pub fn resolve(&self) -> u32 {
match self {
NumThreads::Auto => std::thread::available_parallelism()
.map(|n| n.get() as u32)
.unwrap_or(1),
NumThreads::Number(n) => *n,
}
}
}

/// Blobscan's indexer for the EIP-4844 upgrade.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand All @@ -14,13 +56,13 @@ pub struct Args {
#[arg(short, long)]
pub to_slot: Option<BlockId>,

/// Number of threads used for parallel indexing
#[arg(short, long)]
pub num_threads: Option<u32>,
/// Number of threads used for parallel indexing ("auto" or a number)
#[arg(short, long, default_value_t = NumThreads::Auto)]
pub num_threads: NumThreads,

/// Amount of slots to be processed before saving latest slot in the database
#[arg(short, long)]
pub slots_per_save: Option<u32>,
/// Amount of slots to be processed before saving latest synced slot in the db
#[arg(short, long, default_value_t = 1000)]
pub slots_per_save: u32,

/// Disable slot checkpoint saving when syncing
#[arg(short = 'c', long, action = ArgAction::SetTrue)]
Expand Down
27 changes: 20 additions & 7 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::{fmt, str::FromStr};

use alloy::primitives::{Bytes, B256};
use alloy::{consensus::Bytes48, eips::eip4844::HeapBlob, primitives::B256};
use serde::{Deserialize, Serialize};

use crate::clients::common::ClientError;

use super::CommonBeaconClient;

pub type KzgCommitment = Bytes48;

pub type Proof = Bytes48;

#[derive(Serialize, Debug, Clone, PartialEq)]
pub enum BlockId {
Head,
Expand All @@ -15,16 +19,25 @@ pub enum BlockId {
Hash(B256),
}

#[derive(Serialize, Debug)]
#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum Topic {
Head,
FinalizedCheckpoint,
}

impl fmt::Display for Topic {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Topic::Head => write!(f, "head"),
Topic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),
}
}
}

#[derive(Deserialize, Debug)]
pub struct Block {
pub blob_kzg_commitments: Option<Vec<String>>,
pub blob_kzg_commitments: Option<Vec<KzgCommitment>>,
pub execution_payload: Option<ExecutionPayload>,
pub parent_root: B256,
#[serde(deserialize_with = "deserialize_number")]
Expand All @@ -41,7 +54,7 @@ pub struct ExecutionPayload {
#[derive(Deserialize, Debug)]
pub struct BlockBody {
pub execution_payload: Option<ExecutionPayload>,
pub blob_kzg_commitments: Option<Vec<String>>,
pub blob_kzg_commitments: Option<Vec<KzgCommitment>>,
}
#[derive(Deserialize, Debug)]
pub struct BlockMessage {
Expand All @@ -63,9 +76,9 @@ pub struct BlockResponse {

#[derive(Deserialize, Debug)]
pub struct Blob {
pub kzg_commitment: String,
pub kzg_proof: String,
pub blob: Bytes,
pub kzg_commitment: KzgCommitment,
pub kzg_proof: Proof,
pub blob: HeapBlob,
}

#[derive(Deserialize, Debug)]
Expand Down
61 changes: 29 additions & 32 deletions src/clients/blobscan/types.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use core::fmt;

use alloy::consensus::Transaction as Consensus;
use alloy::primitives::{Address, BlockNumber, BlockTimestamp, Bytes, TxIndex, B256, U256};
use alloy::eips::eip4844::{kzg_to_versioned_hash, HeapBlob};
use alloy::primitives::{Address, BlockNumber, BlockTimestamp, TxIndex, B256, U256};
use alloy::rpc::types::{Block as ExecutionBlock, Transaction as ExecutionTransaction};
use anyhow::{Context, Result};

use serde::{Deserialize, Serialize};

use crate::{clients::beacon::types::Blob as BeaconBlob, utils::web3::calculate_versioned_hash};
use crate::clients::beacon::types::{Blob as BeaconBlob, KzgCommitment, Proof};

#[derive(Serialize, Deserialize, Debug)]
pub struct BlobscanBlock {
Expand Down Expand Up @@ -44,9 +45,9 @@ pub struct Transaction {
#[serde(rename_all = "camelCase")]
pub struct Blob {
pub versioned_hash: B256,
pub commitment: String,
pub proof: String,
pub data: Bytes,
pub commitment: KzgCommitment,
pub proof: Proof,
pub data: HeapBlob,
pub tx_hash: B256,
pub index: u32,
}
Expand Down Expand Up @@ -223,38 +224,34 @@ impl<'a>
}
}

impl<'a> TryFrom<(&'a BeaconBlob, u32, B256)> for Blob {
type Error = anyhow::Error;

fn try_from(
(blob_data, index, tx_hash): (&'a BeaconBlob, u32, B256),
) -> Result<Self, Self::Error> {
Ok(Self {
tx_hash,
index,
commitment: blob_data.kzg_commitment.clone(),
proof: blob_data.kzg_proof.clone(),
data: blob_data.blob.clone(),
versioned_hash: calculate_versioned_hash(&blob_data.kzg_commitment)?,
})
}
}

impl<'a> From<(&'a BeaconBlob, &'a B256, usize, &'a B256)> for Blob {
fn from(
(blob_data, versioned_hash, index, tx_hash): (&'a BeaconBlob, &'a B256, usize, &'a B256),
) -> Self {
impl<'a> From<(&'a BeaconBlob, u32, &B256)> for Blob {
fn from((blob, index, tx_hash): (&'a BeaconBlob, u32, &B256)) -> Self {
Self {
tx_hash: *tx_hash,
index: index as u32,
commitment: blob_data.kzg_commitment.clone(),
proof: blob_data.kzg_proof.clone(),
data: blob_data.blob.clone(),
versioned_hash: *versioned_hash,
tx_hash: tx_hash.clone(),
index,
commitment: blob.kzg_commitment,
proof: blob.kzg_proof,
data: blob.blob.clone(),
versioned_hash: kzg_to_versioned_hash(blob.kzg_commitment.as_ref()),
}
}
}

// impl<'a> From<(&'a BeaconBlob, &'a B256, usize, &'a B256)> for Blob {
// fn from(
// (blob_data, versioned_hash, index, tx_hash): (&'a BeaconBlob, &'a B256, usize, &'a B256),
// ) -> Self {
// Self {
// tx_hash: *tx_hash,
// index: index as u32,
// commitment: blob_data.kzg_commitment.clone(),
// proof: blob_data.kzg_proof.clone(),
// data: blob_data.blob.clone(),
// versioned_hash: *versioned_hash,
// }
// }
// }

impl From<BlockchainSyncStateResponse> for BlockchainSyncState {
fn from(response: BlockchainSyncStateResponse) -> Self {
Self {
Expand Down
45 changes: 36 additions & 9 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,56 @@ use backoff::ExponentialBackoffBuilder;
use dyn_clone::DynClone;

use crate::{
args::Args,
clients::{
beacon::{BeaconClient, CommonBeaconClient, Config as BeaconClientConfig},
blobscan::{BlobscanClient, CommonBlobscanClient, Config as BlobscanClientConfig},
},
env::Environment,
};

pub struct Config {
pub blobscan_api_endpoint: String,
pub beacon_node_url: String,
pub execution_node_endpoint: String,
pub secret_key: String,
pub syncing_settings: SyncingSettings,
}

pub struct SyncingSettings {
pub concurrency: u32,
pub checkpoint_size: u32,
pub disable_checkpoints: bool,
}

impl From<&Args> for SyncingSettings {
fn from(args: &Args) -> Self {
SyncingSettings {
concurrency: args.num_threads.resolve(),
checkpoint_size: args.slots_per_save,
disable_checkpoints: args.disable_sync_checkpoint_save,
}
}
}

// #[cfg(test)]
// use crate::clients::{beacon::MockCommonBeaconClient, blobscan::MockCommonBlobscanClient};

pub trait CommonContext: Send + Sync + DynClone {
fn beacon_client(&self) -> &dyn CommonBeaconClient;
fn blobscan_client(&self) -> &dyn CommonBlobscanClient;
fn provider(&self) -> &dyn Provider<Ethereum>;
fn syncing_settings(&self) -> &SyncingSettings;
}

dyn_clone::clone_trait_object!(CommonContext);
// dyn_clone::clone_trait_object!(CommonContext<MockProvider>);

pub struct Config {
pub blobscan_api_endpoint: String,
pub beacon_node_url: String,
pub execution_node_endpoint: String,
pub secret_key: String,
}

struct ContextRef {
pub beacon_client: Box<dyn CommonBeaconClient>,
pub blobscan_client: Box<dyn CommonBlobscanClient>,
pub provider: Box<dyn Provider<Ethereum>>,
pub syncing_settings: SyncingSettings,
}

#[derive(Clone)]
Expand All @@ -53,6 +73,7 @@ impl Context {
beacon_node_url,
execution_node_endpoint,
secret_key,
syncing_settings,
} = config;
let exp_backoff = Some(ExponentialBackoffBuilder::default().build());

Expand All @@ -65,6 +86,7 @@ impl Context {

Ok(Self {
inner: Arc::new(ContextRef {
syncing_settings,
blobscan_client: Box::new(BlobscanClient::try_with_client(
client.clone(),
BlobscanClientConfig {
Expand Down Expand Up @@ -99,15 +121,20 @@ impl CommonContext for Context {
fn provider(&self) -> &dyn Provider<Ethereum> {
self.inner.provider.as_ref()
}

fn syncing_settings(&self) -> &SyncingSettings {
&self.inner.syncing_settings
}
}

impl From<&Environment> for Config {
fn from(env: &Environment) -> Self {
impl From<(&Environment, &Args)> for Config {
fn from((env, args): (&Environment, &Args)) -> Self {
Self {
blobscan_api_endpoint: env.blobscan_api_endpoint.clone(),
beacon_node_url: env.beacon_node_endpoint.clone(),
execution_node_endpoint: env.execution_node_endpoint.clone(),
secret_key: env.secret_key.clone(),
syncing_settings: args.into(),
}
}
}
Expand Down
Loading