Skip to content

Commit

Permalink
chore(msd): Improvements and cleanup (#188)
Browse files Browse the repository at this point in the history
* cleaning up some leftover mutable references and changing to RwLock

* cleaning up errors and fixmes

* adding missing metrics
  • Loading branch information
NikolaMilosa committed Feb 19, 2024
1 parent d605561 commit 9425eba
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 99 deletions.
124 changes: 60 additions & 64 deletions rs/ic-observability/multiservice-discovery/src/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ic_registry_client::client::ThresholdSigPublicKey;
use serde::Deserialize;
use serde::Serialize;
use service_discovery::job_types::JobType;
use service_discovery::registry_sync::Interrupted;
use service_discovery::registry_sync::SyncError;
use service_discovery::IcServiceDiscovery;
use service_discovery::IcServiceDiscoveryError;
use service_discovery::TargetGroup;
Expand Down Expand Up @@ -145,32 +145,31 @@ impl TestDefinition {

/// Syncs the registry update the in-memory cache then stops.
pub async fn sync_and_stop(&self, skip_update_local_registry: bool) {
if let Err(e) = if skip_update_local_registry {
// The function, when invoked with use_current_version=true, prioritizes utilizing the
// local registry and omits the synchronization step. If however the local registry is found to be empty,
// perhaps as a consequence of a prior execution error or a buggy version, the function will then proceed
// to synchronize the registry.
// This mechanism ensures clarity in handling scenarios where a comparison between two MSD versions starts
// from a baseline of an empty registry due to issues in earlier runs.
sync_local_registry(
self.running_def.definition.log.clone(),
self.running_def.definition.registry_path.join("targets"),
self.running_def.definition.nns_urls.clone(),
true,
self.running_def.definition.public_key,
&self.running_def.stop_signal,
)
.await
} else {
self.running_def.initial_registry_sync().await
} {
// If skip_update_local_registry is true, first try and use the existing one
if skip_update_local_registry {
match self.running_def.initial_registry_sync(true).await {
Ok(()) => return,
Err(e) => {
error!(
self.running_def.definition.log,
"Error while running initial sync with the registry for definition named '{}': {:?}",
self.running_def.definition.name,
e
);
self.running_def.metrics.observe_sync(self.running_def.name(), false);
}
}
}
// If skip_update_local_registry is false, or the inital sync failed try to do a full initial sync
if let Err(e) = self.running_def.initial_registry_sync(false).await {
error!(
self.running_def.definition.log,
"Error while running initial sync for definition named '{}': {:?}", self.running_def.definition.name, e
"Error while running full initial sync with the registry for definition named '{}': {:?}",
self.running_def.definition.name,
e
);
return;
};

self.running_def.metrics.observe_sync(self.running_def.name(), false);
}
let _ = self
.running_def
.definition
Expand Down Expand Up @@ -210,7 +209,7 @@ impl Definition {
}

pub(crate) async fn run(self, rt: tokio::runtime::Handle, metrics: RunningDefinitionsMetrics) -> RunningDefinition {
fn wrap(mut definition: RunningDefinition, rt: tokio::runtime::Handle) -> impl FnMut() {
fn wrap(definition: RunningDefinition, rt: tokio::runtime::Handle) -> impl FnMut() {
move || {
rt.block_on(definition.run());
}
Expand All @@ -235,7 +234,7 @@ impl Definition {
}

impl RunningDefinition {
pub(crate) async fn end(&mut self) {
pub(crate) async fn end(&self) {
let mut ender = self.ender.lock().await;
if let Some(s) = ender.take() {
// We have pulled out the channel from its container. After this,
Expand Down Expand Up @@ -263,7 +262,7 @@ impl RunningDefinition {
.get_target_groups(job_type, self.definition.log.clone())
}

async fn initial_registry_sync(&self) -> Result<(), Interrupted> {
async fn initial_registry_sync(&self, use_current_version: bool) -> Result<(), SyncError> {
info!(
self.definition.log,
"Syncing local registry for {} started", self.definition.name
Expand All @@ -274,16 +273,11 @@ impl RunningDefinition {
self.definition.registry_path.display()
);

// FIXME: sync_local_registry() needs to update the metrics just
// as poll_loop() does. Otherwise an initially hung or failed
// sync_local_registry() is not going to be trackable via metrics.
// Right now, the callee simply says metrics sync successful once
// this function returns.
let r = sync_local_registry(
self.definition.log.clone(),
self.definition.registry_path.join("targets"),
self.definition.nns_urls.clone(),
false,
use_current_version,
self.definition.public_key,
&self.stop_signal,
)
Expand All @@ -293,17 +287,21 @@ impl RunningDefinition {
info!(
self.definition.log,
"Syncing local registry for {} completed", self.definition.name,
)
);
self.metrics.observe_sync(self.name(), true)
}
Err(_) => {
warn!(
self.definition.log,
"Interrupted initial sync of definition {}", self.definition.name
);
self.metrics.observe_sync(self.name(), false)
}
Err(_) => warn!(
self.definition.log,
"Interrupted initial sync of definition {}", self.definition.name
),
}
r
}

async fn poll_loop(&mut self) {
async fn poll_loop(&self) {
let interval = crossbeam::channel::tick(self.definition.poll_interval);
let mut tick = Instant::now();
loop {
Expand Down Expand Up @@ -343,8 +341,8 @@ impl RunningDefinition {

// Syncs the registry and keeps running, syncing as new
// registry versions come in.
async fn run(&mut self) {
if self.initial_registry_sync().await.is_err() {
async fn run(&self) {
if self.initial_registry_sync(false).await.is_err() {
// Initial sync was interrupted.
self.metrics.observe_end(self.name());
return;
Expand All @@ -359,11 +357,6 @@ impl RunningDefinition {
self.poll_loop().await;

self.metrics.observe_end(self.name());

// We used to delete storage here, but that was unsafe
// because another definition may be started in its name,
// so it is racy to delete the folder it will be using.
// So we no longer delete storage here.
}

pub(crate) async fn add_boundary_node(&mut self, target: BoundaryNode) -> Result<(), BoundaryNodeAlreadyExists> {
Expand Down Expand Up @@ -474,34 +467,34 @@ pub(super) struct DefinitionsSupervisor {
}

impl DefinitionsSupervisor {
pub(crate) fn new(rt: tokio::runtime::Handle, allow_mercury_deletion: bool, networks_state_file: Option<PathBuf>, log: Logger) -> Self {
pub(crate) fn new(
rt: tokio::runtime::Handle,
allow_mercury_deletion: bool,
networks_state_file: Option<PathBuf>,
log: Logger,
) -> Self {
DefinitionsSupervisor {
rt,
definitions: Arc::new(Mutex::new(BTreeMap::new())),
allow_mercury_deletion,
networks_state_file,
log
log,
}
}

// FIXME: if the file is corrupted, that may be a partial write from another
// MSD sharing the same directory. Retry the error.
// FIXME: definitions should be reloaded if the file is changed.
// FIXME: if the definitions loaded are the same as the currently-loaded
// definitions, no action should be taken on this MSD.
pub(crate) async fn load_or_create_defs(
&self,
metrics: RunningDefinitionsMetrics,
) -> Result<(), Box<dyn Error>> {
pub(crate) async fn load_or_create_defs(&self, metrics: RunningDefinitionsMetrics) -> Result<(), Box<dyn Error>> {
if let Some(networks_state_file) = self.networks_state_file.clone() {
if networks_state_file.exists() {
let file_content = fs::read_to_string(networks_state_file.clone())?;
let initial_definitions: Vec<FSDefinition> = serde_json::from_str(&file_content)?;
let names = initial_definitions.iter().map(|def| def.name.clone()).collect::<Vec<_>>();
let names = initial_definitions
.iter()
.map(|def| def.name.clone())
.collect::<Vec<_>>();
info!(
self.log,
"Definitions loaded from {:?}:\n{:?}",
networks_state_file.as_path(),
self.log,
"Definitions loaded from {:?}:\n{:?}",
networks_state_file.as_path(),
names
);
self.start(
Expand All @@ -518,7 +511,10 @@ impl DefinitionsSupervisor {
// FIXME: if the file contents on disk are the same as the contents about to
// be persisted, then the file should not be overwritten because it was
// already updated by another MSD sharing the same directory.
pub(crate) async fn persist_defs(&self, existing: &mut BTreeMap<String, RunningDefinition>) -> Result<(), Box<dyn Error>> {
pub(crate) async fn persist_defs(
&self,
existing: &mut BTreeMap<String, RunningDefinition>,
) -> Result<(), Box<dyn Error>> {
if let Some(networks_state_file) = self.networks_state_file.clone() {
retry::retry(retry::delay::Exponential::from_millis(10).take(5), || {
std::fs::OpenOptions::new()
Expand All @@ -531,7 +527,7 @@ impl DefinitionsSupervisor {
.cloned()
.map(|running_def| running_def.definition.into())
.collect::<Vec<_>>();

file.write_all(serde_json::to_string(&fs_def)?.as_bytes()).map(|_| file)
})
.and_then(|mut file| file.flush())
Expand Down Expand Up @@ -601,7 +597,7 @@ impl DefinitionsSupervisor {
// End them and join them all.
join_all(defs_to_end.iter_mut().map(|def| async { def.end().await })).await;
drop(defs_to_end);
drop(ic_names_to_end);
drop(ic_names_to_end);
// Now we add the incoming definitions.
for definition in definitions.into_iter() {
existing.insert(
Expand Down
12 changes: 5 additions & 7 deletions rs/ic-observability/multiservice-discovery/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn main() {
shutdown_signal: impl futures_util::Future<Output = ()>,
) -> Option<RunningDefinition> {
let def = get_mainnet_definition(cli_args, log.clone());
let mut test_def = TestDefinition::new(def, RunningDefinitionsMetrics::new());
let test_def = TestDefinition::new(def, RunningDefinitionsMetrics::new());
let sync_fut = test_def.sync_and_stop(cli_args.skip_update_local_registry);
tokio::select! {
_ = sync_fut => {
Expand All @@ -70,8 +70,8 @@ fn main() {
}
} else {
let supervisor = DefinitionsSupervisor::new(
rt.handle().clone(),
cli_args.start_without_mainnet,
rt.handle().clone(),
cli_args.start_without_mainnet,
cli_args.networks_state_file.clone(),
make_logger(),
);
Expand All @@ -82,10 +82,8 @@ fn main() {
let metrics_layer = HttpMetricsLayerBuilder::new().build();
let metrics = MSDMetrics::new();

rt.block_on(
supervisor.load_or_create_defs(metrics.running_definition_metrics.clone()),
)
.unwrap();
rt.block_on(supervisor.load_or_create_defs(metrics.running_definition_metrics.clone()))
.unwrap();

// First check if we should start the mainnet definition so we can
// serve it right after the server starts.
Expand Down
24 changes: 13 additions & 11 deletions rs/ic-observability/multiservice-discovery/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

use opentelemetry::{global, metrics::Observer, KeyValue};
use std::sync::Mutex;

const NETWORK: &str = "network";
const AXUM_APP: &str = "axum-app";
Expand Down Expand Up @@ -47,12 +49,12 @@ type LatestValuesByNetwork = HashMap<String, LatestValues>;

#[derive(Clone)]
pub struct RunningDefinitionsMetrics {
latest_values_by_network: Arc<Mutex<LatestValuesByNetwork>>,
latest_values_by_network: Arc<RwLock<LatestValuesByNetwork>>,
}

impl RunningDefinitionsMetrics {
pub fn new() -> Self {
let latest_values_by_network = Arc::new(Mutex::new(LatestValuesByNetwork::new()));
let latest_values_by_network = Arc::new(RwLock::new(LatestValuesByNetwork::new()));
let meter = global::meter(AXUM_APP);
let load_new_targets_error = meter
.clone()
Expand Down Expand Up @@ -85,7 +87,7 @@ impl RunningDefinitionsMetrics {
// We blocking-lock because this is not async code, and this code
// does not need to be async, since it just needs to read local data.
// C.f. https://docs.rs/tokio/1.24.2/tokio/sync/struct.Mutex.html#method.blocking_lock
let latest_values_by_network = s.lock().unwrap();
let latest_values_by_network = s.read().unwrap();
for (network, latest_values) in latest_values_by_network.iter() {
let attrs = [KeyValue::new(NETWORK, network.clone())];
for (instrument, measurement) in [
Expand All @@ -107,8 +109,8 @@ impl RunningDefinitionsMetrics {
}
}

pub fn observe_sync(&mut self, network: String, success: bool) {
let mut s = self.latest_values_by_network.lock().unwrap();
pub fn observe_sync(&self, network: String, success: bool) {
let mut s = self.latest_values_by_network.write().unwrap();
let latest_values = s.entry(network).or_insert(LatestValues::new());
latest_values.definitions_sync_successful = match success {
true => 1,
Expand All @@ -119,8 +121,8 @@ impl RunningDefinitionsMetrics {
}
}

pub fn observe_load(&mut self, network: String, success: bool) {
let mut s = self.latest_values_by_network.lock().unwrap();
pub fn observe_load(&self, network: String, success: bool) {
let mut s = self.latest_values_by_network.write().unwrap();
let latest_values = s.entry(network).or_insert(LatestValues::new());
latest_values.definitions_load_successful = match success {
true => 1,
Expand All @@ -131,8 +133,8 @@ impl RunningDefinitionsMetrics {
};
}

pub fn observe_end(&mut self, network: String) {
let mut s = self.latest_values_by_network.lock().unwrap();
pub fn observe_end(&self, network: String) {
let mut s = self.latest_values_by_network.write().unwrap();
s.remove(&network);
}
}
Loading

0 comments on commit 9425eba

Please sign in to comment.