Skip to content

Commit

Permalink
Merge branch 'rumenov/artifactpoo' into 'master'
Browse files Browse the repository at this point in the history
chore: instantiate the artifact pools only during the consensus and P2P construction

The consensus pool is the first object required by the IC stack initialization. 

See merge request dfinity-lab/public/ic!13084
  • Loading branch information
rumenov committed Jun 24, 2023
2 parents e6d1f34 + 1e05c0c commit 5317b1a
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 61 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ MACRO_DEPENDENCIES = [

DEV_DEPENDENCIES = [
"//rs/artifact_manager",
"//rs/artifact_pool",
"//rs/cycles_account_manager",
"//rs/execution_environment",
"//rs/https_outcalls/client",
Expand Down
1 change: 1 addition & 0 deletions rs/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tower = "0.4.12"

[dev-dependencies]
ic-artifact-manager = { path = "../artifact_manager" }
ic-artifact-pool = { path = "../artifact_pool" }
ic-https-outcalls-adapter-client = { path = "../https_outcalls/client" }
ic-cycles-account-manager = { path = "../cycles_account_manager" }
ic-execution-environment = { path = "../execution_environment" }
Expand Down
41 changes: 23 additions & 18 deletions rs/p2p/tests/framework/p2p_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::framework::file_tree_artifact_mgr::ArtifactChunkingTestImpl;
use ic_artifact_pool::consensus_pool::ConsensusPoolImpl;
use ic_config::subnet_config::SubnetConfig;
use ic_cycles_account_manager::CyclesAccountManager;
use ic_execution_environment::IngressHistoryReaderImpl;
Expand All @@ -7,11 +8,10 @@ use ic_interfaces_registry::RegistryClient;
use ic_interfaces_transport::Transport;
use ic_logger::{debug, info, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_protobuf::types::v1 as pb;
use ic_registry_client::client::RegistryClientImpl;
use ic_registry_subnet_type::SubnetType;
use ic_replica_setup_ic_network::{
create_networking_stack, init_artifact_pools, P2PStateSyncClient,
};
use ic_replica_setup_ic_network::{setup_consensus_and_p2p, P2PStateSyncClient};
use ic_test_utilities::{
consensus::make_catch_up_package_with_empty_transcript,
crypto::fake_tls_handshake::FakeTlsHandshake,
Expand All @@ -30,7 +30,7 @@ use ic_test_utilities_metrics::fetch_int_gauge;
use ic_test_utilities_registry::FakeLocalStoreCertifiedTimeReader;
use ic_types::replica_config::ReplicaConfig;
use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tempfile::Builder;

Expand Down Expand Up @@ -95,19 +95,21 @@ fn execute_test(
subnet_config.cycles_account_manager_config,
));

let artifact_pools = init_artifact_pools(
node_id,
let cup = make_catch_up_package_with_empty_transcript(registry.clone(), subnet_id);

let consensus_pool = Arc::new(RwLock::new(ConsensusPoolImpl::new(
subnet_id,
artifact_pool_config,
pb::CatchUpPackage::from(&cup),
artifact_pool_config.clone(),
metrics_registry.clone(),
log.clone(),
make_catch_up_package_with_empty_transcript(registry.clone(), subnet_id),
);
)));

let (_, p2p_runner) = create_networking_stack(
let (_, p2p_runner) = setup_consensus_and_p2p(
&metrics_registry,
log.clone(),
rt_handle,
artifact_pool_config,
transport_config,
Default::default(),
node_id,
Expand All @@ -117,6 +119,8 @@ fn execute_test(
sev_handshake,
Arc::clone(&state_manager) as Arc<_>,
Arc::clone(&state_manager) as Arc<_>,
consensus_pool,
cup,
no_state_sync_client,
xnet_payload_builder as Arc<_>,
self_validating_payload_builder as Arc<_>,
Expand All @@ -126,7 +130,6 @@ fn execute_test(
Arc::clone(&fake_crypto) as Arc<_>,
registry.clone(),
ingress_hist_reader,
artifact_pools,
cycles_account_manager,
fake_local_store_certified_time_reader,
Box::new(ic_https_outcalls_adapter_client::BrokenCanisterHttpClient {}),
Expand Down Expand Up @@ -258,19 +261,20 @@ fn execute_test_chunking_pool(
let fake_local_store_certified_time_reader =
Arc::new(FakeLocalStoreCertifiedTimeReader::new(time_source));

let artifact_pools = init_artifact_pools(
node_id,
let cup = make_catch_up_package_with_empty_transcript(registry.clone(), subnet_id);
let consensus_pool = Arc::new(RwLock::new(ConsensusPoolImpl::new(
subnet_id,
artifact_pool_config,
pb::CatchUpPackage::from(&cup),
artifact_pool_config.clone(),
metrics_registry.clone(),
log.clone(),
make_catch_up_package_with_empty_transcript(registry.clone(), subnet_id),
);
)));

let (_, p2p_runner) = create_networking_stack(
let (_, p2p_runner) = setup_consensus_and_p2p(
&metrics_registry,
log.clone(),
rt_handle,
artifact_pool_config,
transport_config,
Default::default(),
node_id,
Expand All @@ -280,6 +284,8 @@ fn execute_test_chunking_pool(
sev_handshake,
Arc::clone(&state_manager) as Arc<_>,
Arc::clone(&state_manager) as Arc<_>,
consensus_pool,
cup,
state_sync_client,
xnet_payload_builder,
self_validating_payload_builder,
Expand All @@ -289,7 +295,6 @@ fn execute_test_chunking_pool(
Arc::clone(&fake_crypto) as Arc<_>,
registry.clone(),
ingress_hist_reader,
artifact_pools,
cycles_account_manager,
fake_local_store_certified_time_reader,
Box::new(ic_https_outcalls_adapter_client::BrokenCanisterHttpClient {}),
Expand Down
1 change: 1 addition & 0 deletions rs/replica/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("//bazel:defs.bzl", "rust_bench")
package(default_visibility = ["//visibility:public"])

DEPENDENCIES = [
"//rs/artifact_pool",
"//rs/async_utils",
"//rs/bitcoin/client",
"//rs/bitcoin/consensus",
Expand Down
1 change: 1 addition & 0 deletions rs/replica/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
base64 = "0.11.0"
clap = { version = "3.1.6", features = ["derive"] }
hex = "0.4.2"
ic-artifact-pool = { path = "../artifact_pool" }
ic-async-utils = { path = "../async_utils" }
ic-btc-adapter-client = { path = "../bitcoin/client" }
ic-btc-consensus = { path = "../bitcoin/consensus" }
Expand Down
52 changes: 23 additions & 29 deletions rs/replica/setup_ic_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use ic_artifact_pool::{
consensus_pool::ConsensusPoolImpl,
dkg_pool::DkgPoolImpl,
ecdsa_pool::EcdsaPoolImpl,
ensure_persistent_pool_replica_version_compatibility,
ingress_pool::{IngressPoolImpl, IngressPrioritizer},
};
use ic_config::{artifact_pool::ArtifactPoolConfig, transport::TransportConfig};
Expand Down Expand Up @@ -50,7 +49,6 @@ use ic_interfaces_transport::Transport;
use ic_logger::{info, replica_logger::ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_p2p::{start_p2p, AdvertBroadcasterImpl, MAX_ADVERT_BUFFER};
use ic_protobuf::types::v1 as pb;
use ic_registry_client_helpers::subnet::SubnetRegistry;
use ic_replicated_state::ReplicatedState;
use ic_state_manager::state_sync::{StateSync, StateSyncArtifact};
Expand Down Expand Up @@ -92,7 +90,6 @@ pub enum P2PStateSyncClient {
/// The collection of all artifact pools.
pub struct ArtifactPools {
ingress_pool: Arc<RwLock<IngressPoolImpl>>,
pub consensus_pool: Arc<RwLock<ConsensusPoolImpl>>,
certification_pool: Arc<RwLock<CertificationPoolImpl>>,
dkg_pool: Arc<RwLock<DkgPoolImpl>>,
ecdsa_pool: Arc<RwLock<EcdsaPoolImpl>>,
Expand All @@ -110,10 +107,11 @@ pub type CanisterHttpAdapterClient =
clippy::type_complexity,
clippy::new_ret_no_self
)]
pub fn create_networking_stack(
pub fn setup_consensus_and_p2p(
metrics_registry: &MetricsRegistry,
log: ReplicaLogger,
rt_handle: tokio::runtime::Handle,
artifact_pool_config: ArtifactPoolConfig,
transport_config: TransportConfig,
malicious_flags: MaliciousFlags,
node_id: NodeId,
Expand All @@ -125,6 +123,8 @@ pub fn create_networking_stack(
sev_handshake: Arc<dyn ValidateAttestedStream<Box<dyn TlsStream>> + Send + Sync>,
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
consensus_pool: Arc<RwLock<ConsensusPoolImpl>>,
catch_up_package: CatchUpPackage,
state_sync_client: P2PStateSyncClient,
xnet_payload_builder: Arc<dyn XNetPayloadBuilder>,
self_validating_payload_builder: Arc<dyn SelfValidatingPayloadBuilder>,
Expand All @@ -134,20 +134,27 @@ pub fn create_networking_stack(
ingress_sig_crypto: Arc<dyn IngressSigVerifier + Send + Sync>,
registry_client: Arc<dyn RegistryClient>,
ingress_history_reader: Box<dyn IngressHistoryReader>,
artifact_pools: ArtifactPools,
cycles_account_manager: Arc<CyclesAccountManager>,
local_store_time_reader: Arc<dyn LocalStoreCertifiedTimeReader>,
canister_http_adapter_client: CanisterHttpAdapterClient,
registry_poll_delay_duration_ms: u64,
) -> (IngressIngestionService, Vec<Box<dyn JoinGuard>>) {
let artifact_pools = init_artifact_pools(
node_id,
artifact_pool_config,
metrics_registry.clone(),
log.clone(),
catch_up_package,
);

let (advert_tx, advert_rx) = channel(MAX_ADVERT_BUFFER);
let advert_subscriber = Arc::new(AdvertBroadcasterImpl::new(
log.clone(),
metrics_registry,
advert_tx,
));
let ingress_pool = artifact_pools.ingress_pool.clone();
let consensus_pool_cache = artifact_pools.consensus_pool.read().unwrap().get_cache();
let consensus_pool_cache = consensus_pool.read().unwrap().get_cache();
let oldest_registry_version_in_use = consensus_pool_cache.get_oldest_registry_version_in_use();
// Now we setup the Artifact Pools and the manager.
let (artifact_manager, join_handles) = setup_artifact_manager(
Expand All @@ -167,6 +174,7 @@ pub fn create_networking_stack(
message_router,
ingress_history_reader,
artifact_pools,
consensus_pool,
malicious_flags,
cycles_account_manager,
local_store_time_reader,
Expand Down Expand Up @@ -240,6 +248,8 @@ fn setup_artifact_manager(
message_router: Arc<dyn MessageRouting>,
ingress_history_reader: Box<dyn IngressHistoryReader>,
artifact_pools: ArtifactPools,
consensus_pool: Arc<RwLock<ConsensusPoolImpl>>,

malicious_flags: MaliciousFlags,
cycles_account_manager: Arc<CyclesAccountManager>,
local_store_time_reader: Arc<dyn LocalStoreCertifiedTimeReader>,
Expand All @@ -252,17 +262,13 @@ fn setup_artifact_manager(
) {
// Initialize the time source.
let time_source = Arc::new(SysTimeSource::new());
let consensus_pool_cache = artifact_pools.consensus_pool.read().unwrap().get_cache();
let consensus_pool_cache = consensus_pool.read().unwrap().get_cache();

let mut backends: HashMap<ArtifactTag, Box<dyn manager::ArtifactManagerBackend>> =
HashMap::new();
let mut join_handles = vec![];

let consensus_block_cache = artifact_pools
.consensus_pool
.read()
.unwrap()
.get_block_cache();
let consensus_block_cache = consensus_pool.read().unwrap().get_block_cache();

if let P2PStateSyncClient::TestChunkingPool(pool_reader, client_on_state_change) =
state_sync_client
Expand Down Expand Up @@ -346,7 +352,7 @@ fn setup_artifact_manager(
metrics_registry.clone(),
Arc::clone(&consensus_crypto),
replica_logger.clone(),
&PoolReader::new(&*artifact_pools.consensus_pool.read().unwrap()),
&PoolReader::new(&*consensus_pool.read().unwrap()),
)));

{
Expand Down Expand Up @@ -376,7 +382,7 @@ fn setup_artifact_manager(
registry_poll_delay_duration_ms,
),
Arc::clone(&time_source) as Arc<_>,
Arc::clone(&artifact_pools.consensus_pool),
Arc::clone(&consensus_pool),
metrics_registry.clone(),
);
join_handles.push(jh);
Expand Down Expand Up @@ -523,17 +529,13 @@ fn setup_artifact_manager(
)
}

/// The function initializes the artifact pools.
pub fn init_artifact_pools(
fn init_artifact_pools(
node_id: NodeId,
subnet_id: SubnetId,
config: ArtifactPoolConfig,
registry: MetricsRegistry,
log: ReplicaLogger,
cup: CatchUpPackage,
catch_up_package: CatchUpPackage,
) -> ArtifactPools {
ensure_persistent_pool_replica_version_compatibility(config.persistent_pool_db_path());

let ingress_pool = Arc::new(RwLock::new(IngressPoolImpl::new(
node_id,
config.clone(),
Expand All @@ -547,16 +549,9 @@ pub fn init_artifact_pools(
registry.clone(),
Box::new(ecdsa::EcdsaStatsImpl::new(registry.clone())),
);
ecdsa_pool.add_initial_dealings(&cup);
ecdsa_pool.add_initial_dealings(&catch_up_package);
let ecdsa_pool = Arc::new(RwLock::new(ecdsa_pool));

let consensus_pool = Arc::new(RwLock::new(ConsensusPoolImpl::new(
subnet_id,
pb::CatchUpPackage::from(&cup),
config.clone(),
registry.clone(),
log.clone(),
)));
let certification_pool = Arc::new(RwLock::new(CertificationPoolImpl::new(
config,
log.clone(),
Expand All @@ -566,7 +561,6 @@ pub fn init_artifact_pools(
let canister_http_pool = Arc::new(RwLock::new(CanisterHttpPoolImpl::new(registry, log)));
ArtifactPools {
ingress_pool,
consensus_pool,
certification_pool,
dkg_pool,
ecdsa_pool,
Expand Down

0 comments on commit 5317b1a

Please sign in to comment.