Skip to content

Commit

Permalink
feat: collect metrics for spell particles separately [fixes NET-439] (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kmd-fl committed Apr 6, 2023
1 parent 460446b commit 5711171
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 17 deletions.
8 changes: 5 additions & 3 deletions connection-pool/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,11 @@ impl NetworkBehaviour for ConnectionPoolBehaviour {
HandlerMessage::InParticle(particle) => {
log::trace!(target: "network", "{}: received particle {} from {}; queue {}", self.peer_id, particle.id, from, self.queue.len());
self.meter(|m| {
m.particle_queue_size.set(self.queue.len() as i64 + 1);
m.received_particles.inc();
m.particle_sizes.observe(particle.data.len() as f64);
m.incoming_particle(
&particle.id,
self.queue.len() as i64 + 1,
particle.data.len() as f64,
)
});
self.queue.push_back(particle);
self.wake();
Expand Down
22 changes: 18 additions & 4 deletions crates/peer-metrics/src/connection_pool.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::{ParticleLabel, ParticleType};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;

#[derive(Clone)]
pub struct ConnectionPoolMetrics {
pub received_particles: Counter,
pub particle_sizes: Histogram,
pub received_particles: Family<ParticleLabel, Counter>,
pub particle_sizes: Family<ParticleLabel, Histogram>,
pub connected_peers: Gauge,
pub particle_queue_size: Gauge,
}
Expand All @@ -15,15 +17,16 @@ impl ConnectionPoolMetrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("connection_pool");

let received_particles = Counter::default();
let received_particles = Family::default();
sub_registry.register(
"received_particles",
"Number of particles received from the network (not unique)",
received_particles.clone(),
);

// from 100 bytes to 100 MB
let particle_sizes = Histogram::new(exponential_buckets(100.0, 10.0, 7));
let particle_sizes: Family<_, _> =
Family::new_with_constructor(|| Histogram::new(exponential_buckets(100.0, 10.0, 7)));
sub_registry.register(
"particle_sizes",
"Distribution of particle data sizes",
Expand Down Expand Up @@ -51,4 +54,15 @@ impl ConnectionPoolMetrics {
particle_queue_size,
}
}

pub fn incoming_particle(&self, particle_id: &str, queue_len: i64, particle_len: f64) {
self.particle_queue_size.set(queue_len);
let label = ParticleLabel {
particle_type: ParticleType::from_particle(particle_id),
};
self.received_particles.get_or_create(&label).inc();
self.particle_sizes
.get_or_create(&label)
.observe(particle_len);
}
}
26 changes: 22 additions & 4 deletions crates/peer-metrics/src/connectivity.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::{ParticleLabel, ParticleType};
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
Expand All @@ -15,11 +16,12 @@ pub enum Resolution {
pub struct ResolutionLabel {
action: Resolution,
}

#[derive(Clone)]
pub struct ConnectivityMetrics {
contact_resolve: Family<ResolutionLabel, Counter>,
pub particle_send_success: Counter,
pub particle_send_failure: Counter,
pub particle_send_success: Family<ParticleLabel, Counter>,
pub particle_send_failure: Family<ParticleLabel, Counter>,
pub bootstrap_disconnected: Counter,
pub bootstrap_connected: Counter,
}
Expand All @@ -35,14 +37,14 @@ impl ConnectivityMetrics {
contact_resolve.clone(),
);

let particle_send_success = Counter::default();
let particle_send_success = Family::default();
sub_registry.register(
"particle_send_success",
"Number of sent particles",
particle_send_success.clone(),
);

let particle_send_failure = Counter::default();
let particle_send_failure = Family::default();
sub_registry.register(
"particle_send_failure",
"Number of errors on particle sending",
Expand Down Expand Up @@ -77,4 +79,20 @@ impl ConnectivityMetrics {
.get_or_create(&ResolutionLabel { action: resolution })
.inc();
}

pub fn send_particle_ok(&self, particle: &str) {
self.particle_send_success
.get_or_create(&ParticleLabel {
particle_type: ParticleType::from_particle(particle),
})
.inc();
}

pub fn send_particle_failed(&self, particle: &str) {
self.particle_send_failure
.get_or_create(&ParticleLabel {
particle_type: ParticleType::from_particle(particle),
})
.inc();
}
}
14 changes: 12 additions & 2 deletions crates/peer-metrics/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::{ParticleLabel, ParticleType};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry;

#[derive(Clone)]
pub struct DispatcherMetrics {
pub expired_particles: Counter,
pub expired_particles: Family<ParticleLabel, Counter>,
}

impl DispatcherMetrics {
Expand All @@ -23,7 +25,7 @@ impl DispatcherMetrics {
// Box::new(parallelism),
// );

let expired_particles = Counter::default();
let expired_particles = Family::default();
sub_registry.register(
"particles_expired",
"Number of particles expired by TTL",
Expand All @@ -32,4 +34,12 @@ impl DispatcherMetrics {

DispatcherMetrics { expired_particles }
}

pub fn particle_expired(&self, particle_id: &str) {
self.expired_particles
.get_or_create(&ParticleLabel {
particle_type: ParticleType::from_particle(particle_id),
})
.inc();
}
}
23 changes: 22 additions & 1 deletion crates/peer-metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;

use prometheus_client::encoding::EncodeMetric;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue, EncodeMetric};
use prometheus_client::registry::Registry;

pub use connection_pool::ConnectionPoolMetrics;
Expand Down Expand Up @@ -31,6 +31,27 @@ mod vm_pool;
// - count 'Error processing inbound ProtocolMessage: unexpected end of file'
// - number of scheduled script executions

#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
pub enum ParticleType {
Spell,
Common,
}

impl ParticleType {
fn from_particle(particle_id: &str) -> Self {
if particle_id.starts_with("spell_") {
ParticleType::Spell
} else {
ParticleType::Common
}
}
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
pub struct ParticleLabel {
particle_type: ParticleType,
}

/// from 100 microseconds to 120 seconds
pub(self) fn execution_time_buckets() -> std::vec::IntoIter<f64> {
vec![
Expand Down
8 changes: 6 additions & 2 deletions particle-node/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,15 @@ impl Connectivity {
let sent = self.connection_pool.send(contact.clone(), particle).await;
match &sent {
SendStatus::Ok => {
metrics.map(|m| m.particle_send_success.inc());
if let Some(m) = metrics {
m.send_particle_ok(&id)
}
log::info!("Sent particle {} to {}", id, contact);
}
err => {
metrics.map(|m| m.particle_send_failure.inc());
if let Some(m) = metrics {
m.send_particle_failed(&id);
}
log::warn!(
"Failed to send particle {} to {}, reason: {:?}",
id,
Expand Down
4 changes: 3 additions & 1 deletion particle-node/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ impl Dispatcher {
let metrics = metrics.clone();

if particle.is_expired() {
metrics.map(|m| m.expired_particles.inc());
if let Some(m) = metrics {
m.particle_expired(&particle.id);
}
log::info!("Particle {} expired", particle.id);
return async {}.boxed();
}
Expand Down

0 comments on commit 5711171

Please sign in to comment.