Skip to content

Commit

Permalink
fix(metrics): measure call time and wait time separately (#1858)
Browse files Browse the repository at this point in the history
* fix(metrics): always report mailbox size, actor count

* fix(metrics): measure actual call time, without queueing time

* fix(metrics): measure lock time, wait time & call time
  • Loading branch information
folex committed Oct 30, 2023
1 parent 4f50bb9 commit 73bab7e
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 25 deletions.
31 changes: 20 additions & 11 deletions aquamarine/src/particle_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ use crate::log::builtin_log_fn;
#[derive(Clone, Debug)]
/// Performance statistics about executed function call
pub struct SingleCallStat {
/// If execution happened, then how much time it took
pub run_time: Option<Duration>,
/// If execution happened, then how much time it took to execute the call
pub call_time: Option<Duration>,
/// If execution happened, then how much time call waited to be scheduled on blocking pool
pub wait_time: Option<Duration>,
pub success: bool,
/// Whether function call was to builtin functions (like op noop) or to services
pub kind: FunctionKind,
Expand Down Expand Up @@ -139,7 +141,8 @@ impl<F: ParticleFunctionStatic> Functions<F> {
call_id,
result,
stat: SingleCallStat {
run_time: None,
call_time: None,
wait_time: None,
success: false,
kind: FunctionKind::NotHappened,
},
Expand All @@ -155,19 +158,21 @@ impl<F: ParticleFunctionStatic> Functions<F> {
json!(&args.function_args)
);
let service_id = args.service_id.clone();
let start = Instant::now();

let params = self.particle.clone();
let builtins = self.builtins.clone();
let particle_function = self.particle_function.clone();
let span = tracing::span!(tracing::Level::INFO, "Function");
let schedule_wait_start = Instant::now();
let result = tokio::task::Builder::new()
.name(&format!(
"Call function {}:{}",
args.service_id, args.function_name
))
.spawn_blocking(|| {
.spawn_blocking(move || {
Handle::current().block_on(async move {
// How much time it took to start execution on blocking pool
let schedule_wait_time = schedule_wait_start.elapsed();
let outcome = builtins.call(args, params).await;
// record whether call was handled by builtin or not. needed for stats.
let mut call_kind = FunctionKind::Service;
Expand All @@ -187,14 +192,17 @@ impl<F: ParticleFunctionStatic> Functions<F> {
// Builtins were called, return their outcome
outcome => outcome,
};
(outcome, call_kind)
// How much time it took to execute the call
// TODO: Time for ParticleFunction includes lock time, which is not good. Low priority cuz ParticleFunctions are barely used.
let call_time = schedule_wait_start.elapsed() - schedule_wait_time;
(outcome, call_kind, call_time, schedule_wait_time)
})
})
.expect("Could not spawn task");

async move {
let (result, call_kind) = result.await.expect("Could not 'Call function' join");
let elapsed = start.elapsed();
let (result, call_kind, call_time, wait_time) =
result.await.expect("Could not 'Call function' join");

let result = match result {
FunctionOutcome::NotDefined { args, .. } => Err(JError::new(format!(
Expand All @@ -211,15 +219,16 @@ impl<F: ParticleFunctionStatic> Functions<F> {
particle_id = particle_id,
"Failed host call {} ({}): {}",
log_args,
pretty(elapsed),
pretty(call_time),
err
)
} else {
builtin_log_fn(&service_id, &log_args, pretty(elapsed), particle_id);
builtin_log_fn(&service_id, &log_args, pretty(call_time), particle_id);
};

let stats = SingleCallStat {
run_time: Some(elapsed),
call_time: Some(call_time),
wait_time: Some(wait_time),
success: result.is_ok(),
kind: call_kind,
};
Expand Down
6 changes: 3 additions & 3 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {

let time = stat.interpretation_time.as_secs_f64();
m.interpretation_time_sec.observe(time);
m.total_actors_mailbox.set(mailbox_size as i64);
m.alive_actors.set(self.actors.len() as i64);
}
m.total_actors_mailbox.set(mailbox_size as i64);
m.alive_actors.set(self.actors.len() as i64);

for stat in &stats {
m.service_call(stat.success, stat.kind, stat.run_time)
m.service_call(stat.success, stat.kind, stat.call_time)
}
});

Expand Down
1 change: 1 addition & 0 deletions crates/peer-metrics/src/services_metrics/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl Stats {
ServiceCallStats::Success {
memory_delta_bytes,
call_time_sec,
lock_wait_time_sec: _,
timestamp,
} => {
self.memory_deltas_bytes.update(
Expand Down
15 changes: 12 additions & 3 deletions crates/peer-metrics/src/services_metrics/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ pub struct ServicesMetricsExternal {
pub modules_in_services_count: Histogram,

/// Service call time
pub call_time_msec: Family<ServiceTypeLabel, Histogram>,
pub call_time_sec: Family<ServiceTypeLabel, Histogram>,
pub lock_wait_time_sec: Family<ServiceTypeLabel, Histogram>,
pub call_success_count: Family<ServiceTypeLabel, Counter>,
pub call_failed_count: Family<ServiceTypeLabel, Counter>,

Expand Down Expand Up @@ -182,13 +183,20 @@ impl ServicesMetricsExternal {
"number of modules per services",
);

let call_time_msec: Family<_, _> = register(
let call_time_sec: Family<_, _> = register(
sub_registry,
Family::new_with_constructor(|| Histogram::new(execution_time_buckets())),
"call_time_msec",
"how long it took to execute a call",
);

let lock_wait_time_sec: Family<_, _> = register(
sub_registry,
Family::new_with_constructor(|| Histogram::new(execution_time_buckets())),
"lock_wait_time_sec",
"how long a service waited for Mutex",
);

let memory_metrics = ServicesMemoryMetrics {
mem_max_bytes,
mem_max_per_module_bytes,
Expand Down Expand Up @@ -217,7 +225,8 @@ impl ServicesMetricsExternal {
removal_count,
creation_failure_count,
modules_in_services_count,
call_time_msec,
call_time_sec,
lock_wait_time_sec,
call_success_count,
call_failed_count,
memory_metrics,
Expand Down
1 change: 1 addition & 0 deletions crates/peer-metrics/src/services_metrics/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub enum ServiceCallStats {
Success {
memory_delta_bytes: f64,
call_time_sec: f64,
lock_wait_time_sec: f64,
timestamp: u64,
},
Fail {
Expand Down
18 changes: 12 additions & 6 deletions crates/peer-metrics/src/services_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ServicesMetrics {
service_type: ServiceType::Builtin,
};
external
.call_time_msec
.call_time_sec
.get_or_create(&label)
.observe(call_time);
if is_ok {
Expand All @@ -99,11 +99,17 @@ impl ServicesMetrics {
) {
self.observe_external(|external| {
let label = ServiceTypeLabel { service_type };
if let Success { call_time_sec, .. } = &stats {
external
.call_time_msec
.get_or_create(&label)
.observe(*call_time_sec);
if let Success {
call_time_sec,
lock_wait_time_sec,
..
} = &stats
{
let call_time_metric = external.call_time_sec.get_or_create(&label);
call_time_metric.observe(*call_time_sec);

let lock_time_metric = external.lock_wait_time_sec.get_or_create(&label);
lock_time_metric.observe(*lock_wait_time_sec);
}
external.call_success_count.get_or_create(&label).inc();
self.observe_service_mem(service_id.clone(), label.service_type, memory);
Expand Down
7 changes: 5 additions & 2 deletions particle-services/src/app_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ impl ParticleAppServices {
particle: ParticleParams,
create_vault: bool,
) -> FunctionOutcome {
let call_time_start = Instant::now();
let services = self.services.read();
let aliases = self.aliases.read();
let worker_id = particle.host_id;
Expand Down Expand Up @@ -509,10 +508,12 @@ impl ParticleAppServices {
};
let function_name = function_args.function_name;

let lock_acquire_start = Instant::now();
let mut service = service.lock();
let old_memory = service.module_memory_stats();
let old_mem_usage = ServicesMetricsBuiltin::get_used_memory(&old_memory);
// TODO: set execution timeout https://github.com/fluencelabs/fluence/issues/1212
let call_time_start = Instant::now();
let result = service
.call(
function_name.clone(),
Expand All @@ -539,15 +540,17 @@ impl ParticleAppServices {
ServiceError::Engine(e)
})?;

let call_time_sec = call_time_start.elapsed().as_secs_f64();
if let Some(metrics) = self.metrics.as_ref() {
let call_time_sec = call_time_start.elapsed().as_secs_f64();
let lock_wait_time_sec = lock_acquire_start.elapsed().as_secs_f64();
let new_memory = service.module_memory_stats();
let new_memory_usage = ServicesMetricsBuiltin::get_used_memory(&new_memory);

let memory_delta_bytes = new_memory_usage - old_mem_usage;
let stats = ServiceCallStats::Success {
memory_delta_bytes: memory_delta_bytes as f64,
call_time_sec,
lock_wait_time_sec,
timestamp,
};

Expand Down

0 comments on commit 73bab7e

Please sign in to comment.