Skip to content

Commit

Permalink
chore: build read state service with builder and expose it
Browse files Browse the repository at this point in the history
  • Loading branch information
tthebst committed Jan 16, 2024
1 parent 0f1dc5e commit 9c4c774
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 47 deletions.
39 changes: 24 additions & 15 deletions rs/http_endpoints/public/src/lib.rs
Expand Up @@ -18,6 +18,8 @@ mod status;
mod threads;
mod types;

pub use read_state::canister::{CanisterReadStateService, CanisterReadStateServiceBuilder};

cfg_if::cfg_if! {
if #[cfg(feature = "fuzzing_code")] {
pub mod validator_executor;
Expand Down Expand Up @@ -46,7 +48,7 @@ use crate::{
},
pprof::{PprofFlamegraphService, PprofHomeService, PprofProfileService},
query::QueryService,
read_state::{canister::CanisterReadStateService, subnet::SubnetReadStateService},
read_state::subnet::SubnetReadStateService,
state_reader_executor::StateReaderExecutor,
status::StatusService,
types::*,
Expand Down Expand Up @@ -294,6 +296,7 @@ pub fn start_server(

let delegation_from_nns = Arc::new(RwLock::new(delegation_from_nns));
let health_status = Arc::new(AtomicCell::new(ReplicaHealthStatus::Starting));
let state_reader_clone = state_reader.clone();
let state_reader_executor = StateReaderExecutor::new(state_reader);
let call_service = CallService::new_service(
config.clone(),
Expand Down Expand Up @@ -329,21 +332,27 @@ pub fn start_server(
Arc::clone(&registry_client),
query_execution_service,
);
let canister_read_state_service = CanisterReadStateService::new_service(
config.clone(),
log.clone(),
metrics.clone(),
Arc::clone(&health_status),
Arc::clone(&delegation_from_nns),
state_reader_executor.clone(),
ValidatorExecutor::new(
Arc::clone(&registry_client),
ingress_verifier.clone(),
&malicious_flags,
log.clone(),
),
Arc::clone(&registry_client),

let canister_read_state_service = BoxCloneService::new(
ServiceBuilder::new()
.layer(GlobalConcurrencyLimitLayer::new(
config.max_read_state_concurrent_requests,
))
.service(
CanisterReadStateServiceBuilder::builder(
state_reader_clone,
registry_client.clone(),
ingress_verifier,
delegation_from_nns.clone(),
)
.with_logger(log.clone())
.with_health_status(health_status.clone())
.with_metrics(metrics.clone())
.with_malicious_flags(malicious_flags)
.build(),
),
);

let subnet_read_state_service = SubnetReadStateService::new_service(
config.clone(),
log.clone(),
Expand Down
111 changes: 79 additions & 32 deletions rs/http_endpoints/public/src/read_state/canister.rs
Expand Up @@ -5,18 +5,21 @@ use crate::{
state_reader_executor::StateReaderExecutor,
types::ApiReqType,
validator_executor::ValidatorExecutor,
EndpointService, HttpError, HttpHandlerMetrics, ReplicaHealthStatus,
HttpError, HttpHandlerMetrics, ReplicaHealthStatus,
};
use bytes::Bytes;
use crossbeam::atomic::AtomicCell;
use http::Request;
use hyper::{Body, Response, StatusCode};
use ic_config::http_handler::Config;
use ic_crypto_interfaces_sig_verification::IngressSigVerifier;
use ic_crypto_tree_hash::{sparse_labeled_tree_from_paths, Label, Path, TooLongPathError};
use ic_interfaces_registry::RegistryClient;
use ic_logger::{error, ReplicaLogger};
use ic_interfaces_state_manager::StateReader;
use ic_logger::{error, replica_logger::no_op_logger, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_replicated_state::{canister_state::execution_state::CustomSectionType, ReplicatedState};
use ic_types::{
malicious_flags::MaliciousFlags,
messages::{
Blob, Certificate, CertificateDelegation, HttpReadStateContent, HttpReadStateResponse,
HttpRequest, HttpRequestEnvelope, MessageId, ReadState, SignedRequestBytes,
Expand All @@ -30,12 +33,10 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use tower::{
limit::concurrency::GlobalConcurrencyLimitLayer, util::BoxCloneService, Service, ServiceBuilder,
};
use tower::Service;

#[derive(Clone)]
pub(crate) struct CanisterReadStateService {
pub struct CanisterReadStateService {
log: ReplicaLogger,
metrics: HttpHandlerMetrics,
health_status: Arc<AtomicCell<ReplicaHealthStatus>>,
Expand All @@ -45,34 +46,80 @@ pub(crate) struct CanisterReadStateService {
registry_client: Arc<dyn RegistryClient>,
}

impl CanisterReadStateService {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_service(
config: Config,
log: ReplicaLogger,
metrics: HttpHandlerMetrics,
health_status: Arc<AtomicCell<ReplicaHealthStatus>>,
delegation_from_nns: Arc<RwLock<Option<CertificateDelegation>>>,
state_reader_executor: StateReaderExecutor,
validator_executor: ValidatorExecutor<ReadState>,
pub struct CanisterReadStateServiceBuilder {
log: Option<ReplicaLogger>,
metrics: Option<HttpHandlerMetrics>,
health_status: Option<Arc<AtomicCell<ReplicaHealthStatus>>>,
malicious_flags: Option<MaliciousFlags>,
delegation_from_nns: Arc<RwLock<Option<CertificateDelegation>>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
ingress_verifier: Arc<dyn IngressSigVerifier + Send + Sync>,
registry_client: Arc<dyn RegistryClient>,
}

impl CanisterReadStateServiceBuilder {
pub fn builder(
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
registry_client: Arc<dyn RegistryClient>,
) -> EndpointService {
let base_service = Self {
log,
metrics,
health_status,
ingress_verifier: Arc<dyn IngressSigVerifier + Send + Sync>,
delegation_from_nns: Arc<RwLock<Option<CertificateDelegation>>>,
) -> Self {
Self {
log: None,
metrics: None,
health_status: None,
malicious_flags: None,
delegation_from_nns,
state_reader_executor,
validator_executor,
state_reader,
ingress_verifier,
registry_client,
};
BoxCloneService::new(
ServiceBuilder::new()
.layer(GlobalConcurrencyLimitLayer::new(
config.max_read_state_concurrent_requests,
))
.service(base_service),
)
}
}

pub fn with_logger(mut self, log: ReplicaLogger) -> Self {
self.log = Some(log);
self
}

pub(crate) fn with_malicious_flags(mut self, malicious_flags: MaliciousFlags) -> Self {
self.malicious_flags = Some(malicious_flags);
self
}

pub fn with_health_status(
mut self,
health_status: Arc<AtomicCell<ReplicaHealthStatus>>,
) -> Self {
self.health_status = Some(health_status);
self
}

pub(crate) fn with_metrics(mut self, metrics: HttpHandlerMetrics) -> Self {
self.metrics = Some(metrics);
self
}

pub fn build(self) -> CanisterReadStateService {
let log = self.log.unwrap_or(no_op_logger());
let default_metrics_registry = MetricsRegistry::default();
CanisterReadStateService {
log: log.clone(),
metrics: self
.metrics
.unwrap_or_else(|| HttpHandlerMetrics::new(&default_metrics_registry)),
health_status: self
.health_status
.unwrap_or_else(|| Arc::new(AtomicCell::new(ReplicaHealthStatus::Healthy))),
delegation_from_nns: self.delegation_from_nns,
state_reader_executor: StateReaderExecutor::new(self.state_reader),
validator_executor: ValidatorExecutor::new(
self.registry_client.clone(),
self.ingress_verifier,
&self.malicious_flags.unwrap_or_default(),
log,
),
registry_client: self.registry_client,
}
}
}

Expand Down

0 comments on commit 9c4c774

Please sign in to comment.