Skip to content

Commit

Permalink
test(http-endpoint): Refactored helper function to start endpoint for…
Browse files Browse the repository at this point in the history
… integration tests
  • Loading branch information
DSharifi committed Feb 7, 2024
1 parent a0ede14 commit 1251260
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 295 deletions.
153 changes: 102 additions & 51 deletions rs/http_endpoints/public/tests/common/mod.rs
Expand Up @@ -21,7 +21,7 @@ use ic_interfaces_state_manager::{CertifiedStateSnapshot, Labeled, StateReader};
use ic_interfaces_state_manager_mocks::MockStateManager;
use ic_logger::replica_logger::no_op_logger;
use ic_metrics::MetricsRegistry;
use ic_pprof::PprofCollector;
use ic_pprof::{Pprof, PprofCollector};
use ic_protobuf::registry::{
crypto::v1::{AlgorithmId as AlgorithmIdProto, PublicKey as PublicKeyProto},
provisional_whitelist::v1::ProvisionalWhitelist as ProvisionalWhitelistProto,
Expand Down Expand Up @@ -220,7 +220,7 @@ pub fn default_latest_certified_height() -> Height {
}

/// Basic state manager with one subnet (nns) at height 1.
pub fn basic_state_manager_mock() -> MockStateManager {
fn basic_state_manager_mock() -> MockStateManager {
let mut mock_state_manager = MockStateManager::new();

mock_state_manager
Expand All @@ -247,7 +247,7 @@ pub fn basic_state_manager_mock() -> MockStateManager {
}

// Basic mock consensus pool cache at height 1.
pub fn basic_consensus_pool_cache() -> MockConsensusPoolCache {
fn basic_consensus_pool_cache() -> MockConsensusPoolCache {
let mut mock_consensus_cache = MockConsensusPoolCache::new();
mock_consensus_cache
.expect_finalized_block()
Expand Down Expand Up @@ -396,58 +396,109 @@ mock! {
fn exceeds_threshold(&self) -> bool;
}
}
pub fn start_http_endpoint(
rt: tokio::runtime::Handle,

pub struct HttpEndpointBuilder {
rt_handle: tokio::runtime::Handle,
config: Config,
state_manager: Arc<dyn StateReader<State = ReplicatedState>>,
consensus_cache: Arc<dyn ConsensusPoolCache>,
registry_client: Arc<dyn RegistryClient>,
delegation_from_nns: Option<CertificateDelegation>,
pprof_collector: Arc<dyn PprofCollector>,
) -> (
IngressFilterHandle,
Receiver<UnvalidatedArtifactMutation<IngressArtifact>>,
QueryExecutionHandle,
) {
let metrics = MetricsRegistry::new();
let (ingress_filter, ingress_filter_handle) = setup_ingress_filter_mock();
let (query_exe, query_exe_handler) = setup_query_execution_mock();
// Run test on "nns" to avoid fetching root delegation
let subnet_id = subnet_test_id(1);
let nns_subnet_id = subnet_test_id(1);
let node_id = node_test_id(1);

let tls_handshake = Arc::new(MockTlsHandshake::new());
let sig_verifier = Arc::new(temp_crypto_component_with_fake_registry(node_test_id(0)));
let crypto = Arc::new(CryptoReturningOk::default());

let (ingress_tx, ingress_rx) = crossbeam::channel::unbounded();
let mut ingress_pool_throtller = MockIngressPoolThrottler::new();
ingress_pool_throtller
.expect_exceeds_threshold()
.returning(|| false);
start_server(
rt,
&metrics,
config,
ingress_filter,
query_exe,
Arc::new(RwLock::new(ingress_pool_throtller)),
ingress_tx,
state_manager,
crypto as Arc<_>,
registry_client,
tls_handshake,
sig_verifier,
node_id,
subnet_id,
nns_subnet_id,
no_op_logger(),
consensus_cache,
SubnetType::Application,
MaliciousFlags::default(),
delegation_from_nns,
pprof_collector,
);
(ingress_filter_handle, ingress_rx, query_exe_handler)
}

impl HttpEndpointBuilder {
pub fn new(rt_handle: tokio::runtime::Handle, config: Config) -> Self {
Self {
rt_handle,
config,
state_manager: Arc::new(basic_state_manager_mock()),
consensus_cache: Arc::new(basic_consensus_pool_cache()),
registry_client: Arc::new(basic_registry_client()),
delegation_from_nns: None,
pprof_collector: Arc::new(Pprof),
}
}

pub fn with_state_manager(
mut self,
state_manager: impl StateReader<State = ReplicatedState> + 'static,
) -> Self {
self.state_manager = Arc::new(state_manager);
self
}

pub fn with_consensus_cache(
mut self,
consensus_cache: impl ConsensusPoolCache + 'static,
) -> Self {
self.consensus_cache = Arc::new(consensus_cache);
self
}

pub fn with_registry_client(mut self, registry_client: impl RegistryClient + 'static) -> Self {
self.registry_client = Arc::new(registry_client);
self
}

pub fn with_delegation_from_nns(mut self, delegation_from_nns: CertificateDelegation) -> Self {
self.delegation_from_nns.replace(delegation_from_nns);
self
}

pub fn with_pprof_collector(mut self, pprof_collector: impl PprofCollector + 'static) -> Self {
self.pprof_collector = Arc::new(pprof_collector);
self
}

pub fn run(
self,
) -> (
IngressFilterHandle,
Receiver<UnvalidatedArtifactMutation<IngressArtifact>>,
QueryExecutionHandle,
) {
let metrics = MetricsRegistry::new();
let (ingress_filter, ingress_filter_handle) = setup_ingress_filter_mock();
let (query_exe, query_exe_handler) = setup_query_execution_mock();
// Run test on "nns" to avoid fetching root delegation
let subnet_id = subnet_test_id(1);
let nns_subnet_id = subnet_test_id(1);
let node_id = node_test_id(1);

let tls_handshake = Arc::new(MockTlsHandshake::new());
let sig_verifier = Arc::new(temp_crypto_component_with_fake_registry(node_test_id(0)));
let crypto = Arc::new(CryptoReturningOk::default());

let (ingress_tx, ingress_rx) = crossbeam::channel::unbounded();
let mut ingress_pool_throtller = MockIngressPoolThrottler::new();
ingress_pool_throtller
.expect_exceeds_threshold()
.returning(|| false);

start_server(
self.rt_handle,
&metrics,
self.config,
ingress_filter,
query_exe,
Arc::new(RwLock::new(ingress_pool_throtller)),
ingress_tx,
self.state_manager,
crypto as Arc<_>,
self.registry_client,
tls_handshake,
sig_verifier,
node_id,
subnet_id,
nns_subnet_id,
no_op_logger(),
self.consensus_cache,
SubnetType::Application,
MaliciousFlags::default(),
self.delegation_from_nns,
self.pprof_collector,
);
(ingress_filter_handle, ingress_rx, query_exe_handler)
}
}
67 changes: 12 additions & 55 deletions rs/http_endpoints/public/tests/load_shed_test.rs
@@ -1,10 +1,9 @@
pub mod common;

use crate::common::{
basic_consensus_pool_cache, basic_registry_client, basic_state_manager_mock,
default_certified_state_reader, default_get_latest_state, default_latest_certified_height,
default_read_certified_state, get_free_localhost_socket_addr, start_http_endpoint,
wait_for_status_healthy,
default_read_certified_state, get_free_localhost_socket_addr, wait_for_status_healthy,
HttpEndpointBuilder,
};
use async_trait::async_trait;
use hyper::{Body, Client, Method, Request, StatusCode};
Expand All @@ -17,7 +16,7 @@ use ic_agent::{
};
use ic_config::http_handler::Config;
use ic_interfaces_state_manager_mocks::MockStateManager;
use ic_pprof::{Error, Pprof, PprofCollector};
use ic_pprof::{Error, PprofCollector};
use ic_types::{
messages::{Blob, HttpQueryResponse, HttpQueryResponseReply},
time::current_time,
Expand All @@ -44,21 +43,9 @@ fn test_load_shedding_query() {
..Default::default()
};

let mock_state_manager = basic_state_manager_mock();
let mock_consensus_cache = basic_consensus_pool_cache();
let mock_registry_client = basic_registry_client();

let canister = Principal::from_text("223xb-saaaa-aaaaf-arlqa-cai").unwrap();

let (_, _, mut query_handler) = start_http_endpoint(
rt.handle().clone(),
config,
Arc::new(mock_state_manager),
Arc::new(mock_consensus_cache),
Arc::new(mock_registry_client),
None,
Arc::new(Pprof),
);
let (_, _, mut query_handler) = HttpEndpointBuilder::new(rt.handle().clone(), config).run();

let query_exec_running = Arc::new(Notify::new());
let load_shedder_returned = Arc::new(Notify::new());
Expand Down Expand Up @@ -203,21 +190,12 @@ fn test_load_shedding_read_state() {
default_certified_state_reader()
});

let mock_consensus_cache = basic_consensus_pool_cache();
let mock_registry_client = basic_registry_client();
let _ = HttpEndpointBuilder::new(rt.handle().clone(), config)
.with_state_manager(mock_state_manager)
.run();

let canister = Principal::from_text("223xb-saaaa-aaaaf-arlqa-cai").unwrap();

let _ = start_http_endpoint(
rt.handle().clone(),
config,
Arc::new(mock_state_manager),
Arc::new(mock_consensus_cache),
Arc::new(mock_registry_client),
None,
Arc::new(Pprof),
);

let ok_agent = Agent::builder()
.with_transport(ReqwestHttpReplicaV2Transport::create(format!("http://{}", addr)).unwrap())
.build()
Expand Down Expand Up @@ -322,19 +300,9 @@ fn test_load_shedding_pprof() {
load_shedded_responses_finished.clone(),
);

let mock_state_manager = basic_state_manager_mock();
let mock_consensus_cache = basic_consensus_pool_cache();
let mock_registry_client = basic_registry_client();

let _ = start_http_endpoint(
rt.handle().clone(),
config,
Arc::new(mock_state_manager),
Arc::new(mock_consensus_cache),
Arc::new(mock_registry_client),
None,
Arc::new(mock_pprof),
);
let _ = HttpEndpointBuilder::new(rt.handle().clone(), config)
.with_pprof_collector(mock_pprof)
.run();

let flame_graph_req = move || {
Request::builder()
Expand Down Expand Up @@ -407,21 +375,10 @@ fn test_load_shedding_update_call() {
..Default::default()
};

let mock_state_manager = basic_state_manager_mock();
let mock_consensus_cache = basic_consensus_pool_cache();
let mock_registry_client = basic_registry_client();

let canister = Principal::from_text("223xb-saaaa-aaaaf-arlqa-cai").unwrap();

let (mut ingress_filter, _ingress_rx, _) = start_http_endpoint(
rt.handle().clone(),
config,
Arc::new(mock_state_manager),
Arc::new(mock_consensus_cache),
Arc::new(mock_registry_client),
None,
Arc::new(Pprof),
);
let (mut ingress_filter, _ingress_rx, _) =
HttpEndpointBuilder::new(rt.handle().clone(), config).run();

let ingress_filter_running = Arc::new(Notify::new());
let load_shedder_returned = Arc::new(Notify::new());
Expand Down

0 comments on commit 1251260

Please sign in to comment.