Skip to content

Commit

Permalink
change(state): Add an init function for a standalone `ReadStateServic…
Browse files Browse the repository at this point in the history
…e` (#8595)

* Adds an init_read_only() fn in zebra-state

* moves elasticsearch initialization to `FinalizedState::new_with_debug()`

* Updates callers of `FinalizedState::{new, new_with_debug}` to pass a bool to try enabling elasticsearch

---------

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
  • Loading branch information
arya2 and oxarbitrage committed Jun 12, 2024
1 parent 139e1c3 commit 1b5f942
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 82 deletions.
21 changes: 0 additions & 21 deletions zebra-state/src/arbitrary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,6 @@ where
}
}

impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
fn from(prepared: SemanticallyVerifiedBlock) -> Self {
let SemanticallyVerifiedBlock {
block,
hash,
height,
new_outputs: _,
transaction_hashes,
} = prepared;

Self {
hash,
height,
time: block.header.time,
transactions: block.transactions.clone(),
transaction_hashes,
previous_block_hash: block.header.previous_block_hash,
}
}
}

impl SemanticallyVerifiedBlock {
/// Returns a [`ContextuallyVerifiedBlock`] created from this block,
/// with fake zero-valued spent UTXOs.
Expand Down
9 changes: 5 additions & 4 deletions zebra-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ pub use request::{
};
pub use response::{KnownBlock, MinedTx, ReadResponse, Response};
pub use service::{
chain_tip::{ChainTipChange, LatestChainTip, TipAction},
check, init, spawn_init,
chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip, TipAction},
check, init, init_read_only,
non_finalized_state::NonFinalizedState,
spawn_init, spawn_init_read_only,
watch_receiver::WatchReceiver,
OutputIndex, OutputLocation, TransactionIndex, TransactionLocation,
};
Expand Down Expand Up @@ -76,7 +78,6 @@ pub use response::GetBlockTemplateChainInfo;
#[cfg(any(test, feature = "proptest-impl"))]
pub use service::{
arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT},
chain_tip::{ChainTipBlock, ChainTipSender},
finalized_state::{RawBytes, KV, MAX_ON_DISK_HEIGHT},
init_test, init_test_services,
};
Expand All @@ -96,4 +97,4 @@ pub(crate) use config::hidden::{
write_database_format_version_to_disk, write_state_database_format_version_to_disk,
};

pub(crate) use request::ContextuallyVerifiedBlock;
pub use request::ContextuallyVerifiedBlock;
90 changes: 55 additions & 35 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ use std::{
time::{Duration, Instant},
};

#[cfg(feature = "elasticsearch")]
use elasticsearch::{
auth::Credentials::Basic,
cert::CertificateValidation,
http::transport::{SingleNodeConnectionPool, TransportBuilder},
http::Url,
Elasticsearch,
};

use futures::future::FutureExt;
use tokio::sync::{oneshot, watch};
use tower::{util::BoxService, Service, ServiceExt};
Expand Down Expand Up @@ -319,29 +310,12 @@ impl StateService {
checkpoint_verify_concurrency_limit: usize,
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
let timer = CodeTimer::start();

#[cfg(feature = "elasticsearch")]
let finalized_state = {
let conn_pool = SingleNodeConnectionPool::new(
Url::parse(config.elasticsearch_url.as_str())
.expect("configured elasticsearch url is invalid"),
);
let transport = TransportBuilder::new(conn_pool)
.cert_validation(CertificateValidation::None)
.auth(Basic(
config.clone().elasticsearch_username,
config.clone().elasticsearch_password,
))
.build()
.expect("elasticsearch transport builder should not fail");
let elastic_db = Some(Elasticsearch::new(transport));

FinalizedState::new(&config, network, elastic_db)
};

#[cfg(not(feature = "elasticsearch"))]
let finalized_state = { FinalizedState::new(&config, network) };

let finalized_state = FinalizedState::new(
&config,
network,
#[cfg(feature = "elasticsearch")]
true,
);
timer.finish(module_path!(), line!(), "opening finalized state database");

let timer = CodeTimer::start();
Expand Down Expand Up @@ -387,7 +361,7 @@ impl StateService {

let read_service = ReadStateService::new(
&finalized_state,
block_write_task,
Some(block_write_task),
non_finalized_state_receiver,
);

Expand Down Expand Up @@ -828,14 +802,14 @@ impl ReadStateService {
/// and a watch channel for updating the shared recent non-finalized chain.
pub(crate) fn new(
finalized_state: &FinalizedState,
block_write_task: Arc<std::thread::JoinHandle<()>>,
block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
non_finalized_state_receiver: watch::Receiver<NonFinalizedState>,
) -> Self {
let read_service = Self {
network: finalized_state.network(),
db: finalized_state.db.clone(),
non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
block_write_task: Some(block_write_task),
block_write_task,
};

tracing::debug!("created new read-only state service");
Expand Down Expand Up @@ -1945,6 +1919,52 @@ pub fn init(
)
}

/// Initialize a read state service from the provided [`Config`].
/// Returns a read-only state service,
///
/// Each `network` has its own separate on-disk database.
///
/// To share access to the state, clone the returned [`ReadStateService`].
pub fn init_read_only(
config: Config,
network: &Network,
) -> (
ReadStateService,
ZebraDb,
tokio::sync::watch::Sender<NonFinalizedState>,
) {
let finalized_state = FinalizedState::new_with_debug(
&config,
network,
true,
#[cfg(feature = "elasticsearch")]
false,
true,
);
let (non_finalized_state_sender, non_finalized_state_receiver) =
tokio::sync::watch::channel(NonFinalizedState::new(network));

(
ReadStateService::new(&finalized_state, None, non_finalized_state_receiver),
finalized_state.db.clone(),
non_finalized_state_sender,
)
}

/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
pub fn spawn_init_read_only(
config: Config,
network: &Network,
) -> tokio::task::JoinHandle<(
ReadStateService,
ZebraDb,
tokio::sync::watch::Sender<NonFinalizedState>,
)> {
let network = network.clone();
tokio::task::spawn_blocking(move || init_read_only(config, &network))
}

/// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task.
/// Returns a [`tokio::task::JoinHandle`] with a boxed state service,
/// a read state service, and receivers for state chain tip updates.
Expand Down
16 changes: 11 additions & 5 deletions zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ impl From<ContextuallyVerifiedBlock> for ChainTipBlock {
}
}

impl From<CheckpointVerifiedBlock> for ChainTipBlock {
fn from(finalized: CheckpointVerifiedBlock) -> Self {
let CheckpointVerifiedBlock(SemanticallyVerifiedBlock {
impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
fn from(prepared: SemanticallyVerifiedBlock) -> Self {
let SemanticallyVerifiedBlock {
block,
hash,
height,
new_outputs: _,
transaction_hashes,
..
}) = finalized;
} = prepared;

Self {
hash,
Expand All @@ -128,6 +128,12 @@ impl From<CheckpointVerifiedBlock> for ChainTipBlock {
}
}

impl From<CheckpointVerifiedBlock> for ChainTipBlock {
fn from(CheckpointVerifiedBlock(prepared): CheckpointVerifiedBlock) -> Self {
prepared.into()
}
}

/// A sender for changes to the non-finalized and finalized chain tips.
#[derive(Debug)]
pub struct ChainTipSender {
Expand Down
34 changes: 31 additions & 3 deletions zebra-state/src/service/finalized_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ impl FinalizedState {
pub fn new(
config: &Config,
network: &Network,
#[cfg(feature = "elasticsearch")] elastic_db: Option<elasticsearch::Elasticsearch>,
#[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
) -> Self {
Self::new_with_debug(
config,
network,
false,
#[cfg(feature = "elasticsearch")]
elastic_db,
enable_elastic_db,
false,
)
}
Expand All @@ -162,9 +162,37 @@ impl FinalizedState {
config: &Config,
network: &Network,
debug_skip_format_upgrades: bool,
#[cfg(feature = "elasticsearch")] elastic_db: Option<elasticsearch::Elasticsearch>,
#[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
read_only: bool,
) -> Self {
#[cfg(feature = "elasticsearch")]
let elastic_db = if enable_elastic_db {
use elasticsearch::{
auth::Credentials::Basic,
cert::CertificateValidation,
http::transport::{SingleNodeConnectionPool, TransportBuilder},
http::Url,
Elasticsearch,
};

let conn_pool = SingleNodeConnectionPool::new(
Url::parse(config.elasticsearch_url.as_str())
.expect("configured elasticsearch url is invalid"),
);
let transport = TransportBuilder::new(conn_pool)
.cert_validation(CertificateValidation::None)
.auth(Basic(
config.clone().elasticsearch_username,
config.clone().elasticsearch_password,
))
.build()
.expect("elasticsearch transport builder should not fail");

Some(Elasticsearch::new(transport))
} else {
None
};

let db = ZebraDb::new(
config,
STATE_DATABASE_KIND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn test_raw_rocksdb_column_families_with_network(network: Network) {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

// Snapshot the column family names
Expand Down
4 changes: 2 additions & 2 deletions zebra-state/src/service/finalized_state/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn blocks_with_v5_transactions() -> Result<()> {
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)),
|((chain, count, network, _history_tree) in PreparedChain::default())| {
let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None);
let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false);
let mut height = Height(0);
// use `count` to minimize test failures, so they are easier to diagnose
for block in chain.iter().take(count) {
Expand Down Expand Up @@ -65,7 +65,7 @@ fn all_upgrades_and_wrong_commitments_with_fake_activation_heights() -> Result<(
.unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)),
|((chain, _count, network, _history_tree) in PreparedChain::default().with_valid_commitments().no_shrink())| {

let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None);
let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false);
let mut height = Height(0);
let heartwood_height = NetworkUpgrade::Heartwood.activation_height(&network).unwrap();
let heartwood_height_plus1 = (heartwood_height + 1).unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ fn test_block_and_transaction_data_with_network(network: Network) {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

// Assert that empty databases are the same, regardless of the network.
Expand Down
2 changes: 1 addition & 1 deletion zebra-state/src/service/non_finalized_state/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ fn rejection_restores_internal_state_genesis() -> Result<()> {
}
))| {
let mut state = NonFinalizedState::new(&network);
let finalized_state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None);
let finalized_state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
finalized_state.set_finalized_value_pool(fake_value_pool);
Expand Down
16 changes: 8 additions & 8 deletions zebra-state/src/service/non_finalized_state/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fn best_chain_wins_for_network(network: Network) -> Result<()> {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

state.commit_new_chain(block2.prepare(), &finalized_state)?;
Expand Down Expand Up @@ -194,7 +194,7 @@ fn finalize_pops_from_best_chain_for_network(network: Network) -> Result<()> {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -245,7 +245,7 @@ fn commit_block_extending_best_chain_doesnt_drop_worst_chains_for_network(
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -289,7 +289,7 @@ fn shorter_chain_can_be_best_chain_for_network(network: Network) -> Result<()> {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -334,7 +334,7 @@ fn longer_chain_with_more_work_wins_for_network(network: Network) -> Result<()>
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -378,7 +378,7 @@ fn equal_length_goes_to_more_work_for_network(network: Network) -> Result<()> {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -426,7 +426,7 @@ fn history_tree_is_updated_for_network_upgrade(
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

state
Expand Down Expand Up @@ -525,7 +525,7 @@ fn commitment_is_validated_for_network_upgrade(network: Network, network_upgrade
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

state
Expand Down
Loading

0 comments on commit 1b5f942

Please sign in to comment.