Skip to content

Commit

Permalink
feat(metrics): add additional spell metrics [fixes NET-437] (#1569)
Browse files Browse the repository at this point in the history
* feat(metrics): add additional spell metrics
  • Loading branch information
kmd-fl committed Apr 17, 2023
1 parent a9be656 commit ab851ac
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/peer-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use services_metrics::{
ServiceCallStats, ServiceMemoryStat, ServiceType, ServicesMetrics, ServicesMetricsBackend,
ServicesMetricsBuiltin, ServicesMetricsExternal,
};
pub use spell_metrics::SpellMetrics;
pub use vm_pool::VmPoolMetrics;

mod connection_pool;
Expand All @@ -22,6 +23,7 @@ mod info;
mod network_protocol;
mod particle_executor;
mod services_metrics;
mod spell_metrics;
mod vm_pool;

// TODO:
Expand Down
80 changes: 80 additions & 0 deletions crates/peer-metrics/src/spell_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::register;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;

#[derive(Clone)]
pub struct SpellMetrics {
// How much spell _particles_ were created by the node
spell_particles_created: Counter,
// How much spells are scheduled to run _now_
spell_scheduled_now: Gauge,
// Distribution of spell's scheduled periods
spell_periods: Histogram,
}

impl SpellMetrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("spell");

let spell_particles_created = register(
sub_registry,
Counter::default(),
"particles_created",
"Number of spell particles created",
);

let spell_scheduled_now = register(
sub_registry,
Gauge::default(),
"scheduled_now",
"Number of spell particles scheduled to run now",
);

let spell_periods = register(
sub_registry,
Histogram::new(Self::periods_buckets()),
"periods",
"Spell particle periods",
);

Self {
spell_particles_created,
spell_scheduled_now,
spell_periods,
}
}

fn periods_buckets() -> std::vec::IntoIter<f64> {
// 0.0 sec, 1 sec, 30 sec, 1 min, 5 min, 10 min, 1 hour, 12 hours, 1 day, 1 week, 1 month
// 0 means that the spell is oneshot or reacts only on events
vec![
0.0,
1.0,
30.0,
60.0,
60.0 * 5.0,
60.0 * 10.0,
60.0 * 60.0,
60.0 * 60.0 * 12.0,
60.0 * 60.0 * 24.0,
60.0 * 60.0 * 24.0 * 7.0,
60.0 * 60.0 * 24.0 * 30.0,
]
.into_iter()
}

pub fn observe_started_spell(&self, period: u32) {
self.spell_scheduled_now.inc();
self.spell_periods.observe(period as f64)
}

pub fn observe_finished_spell(&self) {
self.spell_scheduled_now.dec();
}

pub fn observe_spell_cast(&self) {
self.spell_particles_created.inc();
}
}
1 change: 1 addition & 0 deletions crates/spell-event-bus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ futures = { workspace = true }
thiserror = { workspace = true }
log = { workspace = true }
fluence-spell-dtos = { workspace = true }
peer-metrics = { workspace = true }

[dev-dependencies]
libp2p = { workspace = true }
Expand Down
19 changes: 13 additions & 6 deletions crates/spell-event-bus/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::config::{SpellTriggerConfigs, TriggerConfig};
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::{future, FutureExt};
use peer_metrics::SpellMetrics;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::pin::Pin;
Expand Down Expand Up @@ -162,10 +163,13 @@ pub struct SpellEventBus {
recv_cmd_channel: mpsc::UnboundedReceiver<Command>,
/// Notify when trigger happened
send_events: mpsc::UnboundedSender<TriggerEvent>,
/// Spell metrics
spell_metrics: Option<SpellMetrics>,
}

impl SpellEventBus {
pub fn new(
spell_metrics: Option<SpellMetrics>,
sources: Vec<BoxStream<'static, PeerEvent>>,
) -> (
Self,
Expand All @@ -181,6 +185,7 @@ impl SpellEventBus {
sources,
recv_cmd_channel,
send_events,
spell_metrics,
};
(this, api, recv_events)
}
Expand Down Expand Up @@ -258,6 +263,8 @@ impl SpellEventBus {
if let Some(rescheduled) = Scheduled::at(scheduled_spell.data, Instant::now()) {
log::trace!("Reschedule: {:?}", rescheduled);
state.scheduled.push(rescheduled);
} else if let Some(m) = &self.spell_metrics {
m.observe_finished_spell();
}
}
},
Expand Down Expand Up @@ -381,7 +388,7 @@ mod tests {

#[tokio::test]
async fn test_subscribe_one() {
let (bus, api, event_receiver) = SpellEventBus::new(vec![]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let event_stream = UnboundedReceiverStream::new(event_receiver);

Expand All @@ -405,7 +412,7 @@ mod tests {

#[tokio::test]
async fn test_subscribe_many() {
let (bus, api, event_receiver) = SpellEventBus::new(vec![]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let event_stream = UnboundedReceiverStream::new(event_receiver);

Expand Down Expand Up @@ -438,7 +445,7 @@ mod tests {

#[tokio::test]
async fn test_subscribe_oneshot() {
let (bus, api, event_receiver) = SpellEventBus::new(vec![]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let event_stream = UnboundedReceiverStream::new(event_receiver);
let spell1_id = "spell1".to_string();
Expand Down Expand Up @@ -473,7 +480,7 @@ mod tests {
async fn test_subscribe_connect() {
let (send, recv) = mpsc::unbounded_channel();
let recv = UnboundedReceiverStream::new(recv).boxed();
let (bus, api, event_receiver) = SpellEventBus::new(vec![recv]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]);
let mut event_stream = UnboundedReceiverStream::new(event_receiver);
let bus = bus.start();

Expand Down Expand Up @@ -503,7 +510,7 @@ mod tests {
async fn test_unsubscribe() {
let (send, recv) = mpsc::unbounded_channel();
let recv = UnboundedReceiverStream::new(recv).boxed();
let (bus, api, mut event_receiver) = SpellEventBus::new(vec![recv]);
let (bus, api, mut event_receiver) = SpellEventBus::new(None, vec![recv]);
let bus = bus.start();

let spell1_id = "spell1".to_string();
Expand Down Expand Up @@ -532,7 +539,7 @@ mod tests {
async fn test_subscribe_many_spells_with_diff_event_types() {
let (recv, hdl) = emulate_connect(Duration::from_millis(10));
let recv = UnboundedReceiverStream::new(recv).boxed();
let (bus, api, event_receiver) = SpellEventBus::new(vec![recv]);
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]);
let event_stream = UnboundedReceiverStream::new(event_receiver);
let bus = bus.start();

Expand Down
6 changes: 4 additions & 2 deletions particle-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use particle_execution::ParticleFunctionStatic;
use particle_protocol::Particle;
use peer_metrics::{
ConnectionPoolMetrics, ConnectivityMetrics, ParticleExecutorMetrics, ServicesMetrics,
ServicesMetricsBackend, VmPoolMetrics,
ServicesMetricsBackend, SpellMetrics, VmPoolMetrics,
};
use prometheus_client::registry::Registry;
use script_storage::{ScriptStorageApi, ScriptStorageBackend, ScriptStorageConfig};
Expand Down Expand Up @@ -136,6 +136,7 @@ impl<RT: AquaRuntime> Node<RT> {
let connection_pool_metrics = metrics_registry.as_mut().map(ConnectionPoolMetrics::new);
let plumber_metrics = metrics_registry.as_mut().map(ParticleExecutorMetrics::new);
let vm_pool_metrics = metrics_registry.as_mut().map(VmPoolMetrics::new);
let spell_metrics = metrics_registry.as_mut().map(SpellMetrics::new);

#[allow(deprecated)]
let connection_limits = ConnectionLimits::default()
Expand Down Expand Up @@ -251,7 +252,7 @@ impl<RT: AquaRuntime> Node<RT> {
let sources = vec![recv_connection_pool_events.map(PeerEvent::from).boxed()];

let (spell_event_bus, spell_event_bus_api, spell_events_receiver) =
SpellEventBus::new(sources);
SpellEventBus::new(spell_metrics.clone(), sources);

let (sorcerer, mut custom_service_functions, spell_version) = Sorcerer::new(
builtins.services.clone(),
Expand All @@ -260,6 +261,7 @@ impl<RT: AquaRuntime> Node<RT> {
config.clone(),
spell_event_bus_api,
key_manager.clone(),
spell_metrics,
);

let node_info = NodeInfo {
Expand Down
1 change: 1 addition & 0 deletions sorcerer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ connection-pool = { workspace = true }
kademlia = { workspace = true }
fluence-libp2p = { workspace = true }
key-manager = { workspace = true }
peer-metrics = { workspace = true }

libp2p = { workspace = true }
fluence-keypair = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions sorcerer/src/script_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ impl Sorcerer {
let particle = self.make_spell_particle(event.spell_id.clone(), worker_id)?;

self.store_trigger(event.clone(), worker_id)?;
if let Some(m) = &self.spell_metrics {
m.observe_spell_cast();
}
self.aquamarine.clone().execute(particle, None).await?;
};

Expand Down
10 changes: 9 additions & 1 deletion sorcerer/src/sorcerer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use particle_builtins::{wrap, wrap_unit, CustomService};
use particle_execution::ServiceFunction;
use particle_modules::ModuleRepository;
use particle_services::ParticleAppServices;
use peer_metrics::SpellMetrics;
use serde_json::Value;
use server_config::ResolvedConfig;
use spell_event_bus::api::{from_user_config, SpellEventBusApi, TriggerEvent};
Expand All @@ -49,6 +50,7 @@ pub struct Sorcerer {
pub spell_event_bus_api: SpellEventBusApi,
pub spell_script_particle_ttl: Duration,
pub key_manager: KeyManager,
pub spell_metrics: Option<SpellMetrics>,
}

impl Sorcerer {
Expand All @@ -59,6 +61,7 @@ impl Sorcerer {
config: ResolvedConfig,
spell_event_bus_api: SpellEventBusApi,
key_manager: KeyManager,
spell_metrics: Option<SpellMetrics>,
) -> (Self, HashMap<String, CustomService>, String) {
let (spell_storage, spell_version) =
SpellStorage::create(&config.dir_config.spell_base_dir, &services, &modules)
Expand All @@ -71,6 +74,7 @@ impl Sorcerer {
spell_event_bus_api,
spell_script_particle_ttl: config.max_spell_particle_ttl,
key_manager,
spell_metrics,
};

let mut builtin_functions = sorcerer.make_spell_builtins();
Expand Down Expand Up @@ -104,11 +108,15 @@ impl Sorcerer {
spell_id,
"get_trigger_config",
)?;
let period = result.config.clock.period_sec;
let config = from_user_config(result.config)?;
if let Some(config) = config.and_then(|c| c.into_rescheduled()) {
self.spell_event_bus_api
.subscribe(spell_id.clone(), config.clone())
.subscribe(spell_id.clone(), config)
.await?;
if let Some(m) = &self.spell_metrics {
m.observe_started_spell(period);
}
} else {
log::warn!("Spell {spell_id} is not rescheduled since its config is either not found or not reschedulable");
}
Expand Down

0 comments on commit ab851ac

Please sign in to comment.