Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect metrics for spell particles separately [fixes NET-439] #1550

Merged
merged 4 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions connection-pool/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,11 @@ impl NetworkBehaviour for ConnectionPoolBehaviour {
HandlerMessage::InParticle(particle) => {
log::trace!(target: "network", "{}: received particle {} from {}; queue {}", self.peer_id, particle.id, from, self.queue.len());
self.meter(|m| {
m.particle_queue_size.set(self.queue.len() as i64 + 1);
m.received_particles.inc();
m.particle_sizes.observe(particle.data.len() as f64);
m.incoming_particle(
&particle.id,
self.queue.len() as i64 + 1,
particle.data.len() as f64,
)
});
self.queue.push_back(particle);
self.wake();
Expand Down
22 changes: 18 additions & 4 deletions crates/peer-metrics/src/connection_pool.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::{ParticleLabel, ParticleType};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;

#[derive(Clone)]
pub struct ConnectionPoolMetrics {
pub received_particles: Counter,
pub particle_sizes: Histogram,
pub received_particles: Family<ParticleLabel, Counter>,
pub particle_sizes: Family<ParticleLabel, Histogram>,
pub connected_peers: Gauge,
pub particle_queue_size: Gauge,
}
Expand All @@ -15,15 +17,16 @@ impl ConnectionPoolMetrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("connection_pool");

let received_particles = Counter::default();
let received_particles = Family::default();
sub_registry.register(
"received_particles",
"Number of particles received from the network (not unique)",
received_particles.clone(),
);

// from 100 bytes to 100 MB
let particle_sizes = Histogram::new(exponential_buckets(100.0, 10.0, 7));
let particle_sizes: Family<_, _> =
Family::new_with_constructor(|| Histogram::new(exponential_buckets(100.0, 10.0, 7)));
sub_registry.register(
"particle_sizes",
"Distribution of particle data sizes",
Expand Down Expand Up @@ -51,4 +54,15 @@ impl ConnectionPoolMetrics {
particle_queue_size,
}
}

pub fn incoming_particle(&self, particle_id: &str, queue_len: i64, particle_len: f64) {
self.particle_queue_size.set(queue_len);
let label = ParticleLabel {
particle_type: ParticleType::from_particle(particle_id),
};
self.received_particles.get_or_create(&label).inc();
self.particle_sizes
.get_or_create(&label)
.observe(particle_len);
}
}
26 changes: 22 additions & 4 deletions crates/peer-metrics/src/connectivity.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::{ParticleLabel, ParticleType};
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
Expand All @@ -15,11 +16,12 @@ pub enum Resolution {
pub struct ResolutionLabel {
action: Resolution,
}

#[derive(Clone)]
pub struct ConnectivityMetrics {
contact_resolve: Family<ResolutionLabel, Counter>,
pub particle_send_success: Counter,
pub particle_send_failure: Counter,
pub particle_send_success: Family<ParticleLabel, Counter>,
pub particle_send_failure: Family<ParticleLabel, Counter>,
pub bootstrap_disconnected: Counter,
pub bootstrap_connected: Counter,
}
Expand All @@ -35,14 +37,14 @@ impl ConnectivityMetrics {
contact_resolve.clone(),
);

let particle_send_success = Counter::default();
let particle_send_success = Family::default();
sub_registry.register(
"particle_send_success",
"Number of sent particles",
particle_send_success.clone(),
);

let particle_send_failure = Counter::default();
let particle_send_failure = Family::default();
sub_registry.register(
"particle_send_failure",
"Number of errors on particle sending",
Expand Down Expand Up @@ -77,4 +79,20 @@ impl ConnectivityMetrics {
.get_or_create(&ResolutionLabel { action: resolution })
.inc();
}

pub fn send_particle_ok(&self, particle: &str) {
self.particle_send_success
.get_or_create(&ParticleLabel {
particle_type: ParticleType::from_particle(particle),
})
.inc();
}

pub fn send_particle_failed(&self, particle: &str) {
self.particle_send_failure
.get_or_create(&ParticleLabel {
particle_type: ParticleType::from_particle(particle),
})
.inc();
}
}
14 changes: 12 additions & 2 deletions crates/peer-metrics/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::{ParticleLabel, ParticleType};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry;

#[derive(Clone)]
pub struct DispatcherMetrics {
pub expired_particles: Counter,
pub expired_particles: Family<ParticleLabel, Counter>,
}

impl DispatcherMetrics {
Expand All @@ -23,7 +25,7 @@ impl DispatcherMetrics {
// Box::new(parallelism),
// );

let expired_particles = Counter::default();
let expired_particles = Family::default();
sub_registry.register(
"particles_expired",
"Number of particles expired by TTL",
Expand All @@ -32,4 +34,12 @@ impl DispatcherMetrics {

DispatcherMetrics { expired_particles }
}

pub fn particle_expired(&self, particle_id: &str) {
self.expired_particles
.get_or_create(&ParticleLabel {
particle_type: ParticleType::from_particle(particle_id),
})
.inc();
}
}
23 changes: 22 additions & 1 deletion crates/peer-metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;

use prometheus_client::encoding::EncodeMetric;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue, EncodeMetric};
use prometheus_client::registry::Registry;

pub use connection_pool::ConnectionPoolMetrics;
Expand Down Expand Up @@ -31,6 +31,27 @@ mod vm_pool;
// - count 'Error processing inbound ProtocolMessage: unexpected end of file'
// - number of scheduled script executions

#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
pub enum ParticleType {
Spell,
Common,
}

impl ParticleType {
fn from_particle(particle_id: &str) -> Self {
if particle_id.starts_with("spell_") {
ParticleType::Spell
} else {
ParticleType::Common
}
}
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
pub struct ParticleLabel {
particle_type: ParticleType,
}

/// from 100 microseconds to 120 seconds
pub(self) fn execution_time_buckets() -> std::vec::IntoIter<f64> {
vec![
Expand Down
8 changes: 6 additions & 2 deletions particle-node/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,15 @@ impl Connectivity {
let sent = self.connection_pool.send(contact.clone(), particle).await;
match &sent {
SendStatus::Ok => {
metrics.map(|m| m.particle_send_success.inc());
if let Some(m) = metrics {
m.send_particle_ok(&id)
}
log::info!("Sent particle {} to {}", id, contact);
}
err => {
metrics.map(|m| m.particle_send_failure.inc());
if let Some(m) = metrics {
m.send_particle_failed(&id);
}
log::warn!(
"Failed to send particle {} to {}, reason: {:?}",
id,
Expand Down
4 changes: 3 additions & 1 deletion particle-node/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ impl Dispatcher {
let metrics = metrics.clone();

if particle.is_expired() {
metrics.map(|m| m.expired_particles.inc());
if let Some(m) = metrics {
m.particle_expired(&particle.id);
}
log::info!("Particle {} expired", particle.id);
return async {}.boxed();
}
Expand Down