Skip to content

Commit

Permalink
feat: Metric recording call durations
Browse files Browse the repository at this point in the history
  • Loading branch information
alin-at-dfinity committed Feb 15, 2024
1 parent 85d174e commit f61d37d
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 42 deletions.
2 changes: 2 additions & 0 deletions rs/execution_environment/src/execution/common.rs
Expand Up @@ -5,6 +5,7 @@ use ic_registry_subnet_type::SubnetType;
use lazy_static::lazy_static;
use prometheus::IntCounter;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use crate::execution_environment::ExecutionResponse;
use crate::{as_round_instructions, metrics::CallTreeMetrics, ExecuteMessageResult, RoundLimits};
Expand Down Expand Up @@ -566,6 +567,7 @@ pub(crate) fn finish_call_with_error(
response,
instructions_used,
heap_delta: NumBytes::from(0),
call_duration: Some(Duration::from_secs(0)),
}
}

Expand Down
3 changes: 3 additions & 0 deletions rs/execution_environment/src/execution/replicated_query.rs
Expand Up @@ -4,6 +4,8 @@
// A replicated query is a call to a `canister_query` function in update
// context.

use std::time::Duration;

use crate::execution::common::{
finish_call_with_error, validate_message, wasm_result_to_query_response,
};
Expand Down Expand Up @@ -167,5 +169,6 @@ pub fn execute_replicated_query(
response,
instructions_used,
heap_delta: NumBytes::from(0),
call_duration: Some(Duration::from_secs(0)),
}
}
6 changes: 5 additions & 1 deletion rs/execution_environment/src/execution/response.rs
Expand Up @@ -242,6 +242,7 @@ impl ResponseHelper {
heap_delta: NumBytes::from(0),
instructions_used: NumInstructions::from(0),
response: ExecutionResponse::Empty,
call_duration: Some(round.time.saturating_duration_since(call_context.time())),
});
}
// Since the call context has responded, passing `Ok(None)` will produce
Expand Down Expand Up @@ -516,7 +517,7 @@ impl ResponseHelper {
.get()
.saturating_sub(instructions_left.get()),
);
let action = self
let (action, call_context) = self
.canister
.system_state
.call_context_manager_mut()
Expand Down Expand Up @@ -571,6 +572,8 @@ impl ResponseHelper {
response,
instructions_used,
heap_delta,
call_duration: call_context
.map(|call_context| round.time.saturating_duration_since(call_context.time())),
}
}

Expand Down Expand Up @@ -875,6 +878,7 @@ pub fn execute_response(
instructions_used: NumInstructions::from(0),
heap_delta: NumBytes::from(0),
response: ExecutionResponse::Empty,
call_duration: None,
};
}
};
Expand Down
4 changes: 3 additions & 1 deletion rs/execution_environment/src/execution/update.rs
Expand Up @@ -440,7 +440,7 @@ impl UpdateHelper {
.get()
.saturating_sub(output.num_instructions_left.get()),
);
let action = self
let (action, call_context) = self
.canister
.system_state
.call_context_manager_mut()
Expand Down Expand Up @@ -485,6 +485,8 @@ impl UpdateHelper {
response,
instructions_used,
heap_delta,
call_duration: call_context
.map(|call_context| round.time.saturating_duration_since(call_context.time())),
}
}

Expand Down
17 changes: 14 additions & 3 deletions rs/execution_environment/src/execution_environment.rs
Expand Up @@ -79,12 +79,11 @@ use prometheus::IntCounter;
use rand::RngCore;
use std::{
collections::{BTreeMap, HashMap},
convert::Into,
convert::TryFrom,
convert::{Into, TryFrom},
fmt, mem,
str::FromStr,
sync::{Arc, Mutex},
time::Instant,
time::{Duration, Instant},
};
use strum::ParseError;

Expand Down Expand Up @@ -119,6 +118,9 @@ pub enum ExecuteMessageResult {

/// The size of the heap delta the canister produced
heap_delta: NumBytes,

/// The call duration, if the call context completed.
call_duration: Option<Duration>,
},
Paused {
/// The old state of the canister before execution
Expand Down Expand Up @@ -1391,8 +1393,12 @@ impl ExecutionEnvironment {
response: ExecutionResponse::Request(response),
instructions_used: _,
heap_delta: _,
call_duration,
} = &result
{
if let Some(duration) = call_duration {
self.metrics.call_durations.observe(duration.as_secs_f64());
}
debug_assert_eq!(request_cycles, response.refund);
}
result
Expand Down Expand Up @@ -2807,6 +2813,7 @@ impl ExecutionEnvironment {
response,
instructions_used,
heap_delta,
call_duration,
} => {
let ingress_status = match response {
ExecutionResponse::Ingress(ingress_status) => Some(ingress_status),
Expand All @@ -2821,6 +2828,10 @@ impl ExecutionEnvironment {
}
ExecutionResponse::Empty => None,
};
if let Some(duration) = call_duration {
self.metrics.call_durations.observe(duration.as_secs_f64());
}

(
canister,
Some(instructions_used),
Expand Down
11 changes: 9 additions & 2 deletions rs/execution_environment/src/execution_environment_metrics.rs
Expand Up @@ -4,11 +4,11 @@ use ic_cycles_account_manager::{
use ic_error_types::ErrorCode;
use ic_logger::{error, ReplicaLogger};
use ic_management_canister_types as ic00;
use ic_metrics::buckets::decimal_buckets;
use ic_metrics::buckets::{decimal_buckets, decimal_buckets_with_zero};
use ic_metrics::MetricsRegistry;
use ic_replicated_state::metadata_state::subnet_call_context_manager::InstallCodeCallId;
use ic_types::CanisterId;
use prometheus::{HistogramVec, IntCounter};
use prometheus::{Histogram, HistogramVec, IntCounter};
use std::str::FromStr;

pub const FINISHED_OUTCOME_LABEL: &str = "finished";
Expand All @@ -26,6 +26,7 @@ pub(crate) struct ExecutionEnvironmentMetrics {
pub(crate) compute_allocation_in_install_code_total: IntCounter,
pub(crate) memory_allocation_in_install_code_total: IntCounter,
pub(crate) controller_in_update_settings_total: IntCounter,
pub(crate) call_durations: Histogram,

/// Critical error for responses above the maximum allowed size.
pub(crate) response_cycles_refund_error: IntCounter,
Expand Down Expand Up @@ -85,6 +86,12 @@ impl ExecutionEnvironmentMetrics {
"execution_controller_in_update_settings_total",
"Total number of times controller used in update_settings requests",
),
call_durations: metrics_registry.histogram(
"execution_call_duration_seconds",
"Call durations, measured as call context age when completed / dropped.",
// Buckets: 0s, 0.1s, 0.2s, 0.5s, ..., 5M seconds
decimal_buckets_with_zero(-1, 6),
),
response_cycles_refund_error: metrics_registry
.error_counter(CRITICAL_ERROR_RESPONSE_CYCLES_REFUND),
execution_cycles_refund_error: metrics_registry
Expand Down
Expand Up @@ -485,6 +485,7 @@ impl<'a> QueryContext<'a> {
// This `unwrap()` cannot fail because of the non-optional `call_context_id`.
.unwrap()
.on_canister_result(call_context_id, callback_id, result, instructions_used)
.0
}

/// Observe System API call counters in the corresponding metrics.
Expand Down
Expand Up @@ -426,14 +426,14 @@ impl CallContextManager {
}

/// Accepts a canister result and produces an action that should be taken
/// by the caller.
/// by the caller; and the call context, if completed.
pub fn on_canister_result(
&mut self,
call_context_id: CallContextId,
callback_id: Option<CallbackId>,
result: Result<Option<WasmResult>, HypervisorError>,
instructions_used: NumInstructions,
) -> CallContextAction {
) -> (CallContextAction, Option<CallContext>) {
enum OutstandingCalls {
Yes,
No,
Expand Down Expand Up @@ -473,50 +473,60 @@ impl CallContextManager {
// the compiler to tell us if we handled all the possible cases.
match (result, responded, outstanding_calls) {
(Ok(None), Responded::No, OutstandingCalls::Yes)
| (Err(_), Responded::No, OutstandingCalls::Yes) => CallContextAction::NotYetResponded,
| (Err(_), Responded::No, OutstandingCalls::Yes) => {
(CallContextAction::NotYetResponded, None)
}

(Ok(None), Responded::Yes, OutstandingCalls::Yes)
| (Err(_), Responded::Yes, OutstandingCalls::Yes) => {
CallContextAction::AlreadyResponded
(CallContextAction::AlreadyResponded, None)
}
(Ok(None), Responded::Yes, OutstandingCalls::No)
| (Err(_), Responded::Yes, OutstandingCalls::No) => {
self.call_contexts.remove(&call_context_id);
CallContextAction::AlreadyResponded
}
| (Err(_), Responded::Yes, OutstandingCalls::No) => (
CallContextAction::AlreadyResponded,
self.call_contexts.remove(&call_context_id),
),

(Ok(None), Responded::No, OutstandingCalls::No) => {
let refund = context.available_cycles;
self.call_contexts.remove(&call_context_id);
CallContextAction::NoResponse { refund }
(
CallContextAction::NoResponse { refund },
self.call_contexts.remove(&call_context_id),
)
}

(Ok(Some(WasmResult::Reply(payload))), Responded::No, OutstandingCalls::No) => {
let refund = context.available_cycles;
self.call_contexts.remove(&call_context_id);
CallContextAction::Reply { payload, refund }
(
CallContextAction::Reply { payload, refund },
self.call_contexts.remove(&call_context_id),
)
}
(Ok(Some(WasmResult::Reply(payload))), Responded::No, OutstandingCalls::Yes) => {
let refund = context.available_cycles;
context.mark_responded();
CallContextAction::Reply { payload, refund }
(CallContextAction::Reply { payload, refund }, None)
}

(Ok(Some(WasmResult::Reject(payload))), Responded::No, OutstandingCalls::No) => {
let refund = context.available_cycles;
self.call_contexts.remove(&call_context_id);
CallContextAction::Reject { payload, refund }
(
CallContextAction::Reject { payload, refund },
self.call_contexts.remove(&call_context_id),
)
}
(Ok(Some(WasmResult::Reject(payload))), Responded::No, OutstandingCalls::Yes) => {
let refund = context.available_cycles;
context.mark_responded();
CallContextAction::Reject { payload, refund }
(CallContextAction::Reject { payload, refund }, None)
}

(Err(error), Responded::No, OutstandingCalls::No) => {
let refund = context.available_cycles;
self.call_contexts.remove(&call_context_id);
CallContextAction::Fail { error, refund }
(
CallContextAction::Fail { error, refund },
self.call_contexts.remove(&call_context_id),
)
}

// The following can never happen since we handle at the SystemApi level if a canister
Expand Down
Expand Up @@ -50,10 +50,13 @@ fn call_context_handling() {
// Call context 3 was not responded and does not have outstanding calls,
// so we should generate the response ourselves.
assert_eq!(
call_context_manager.on_canister_result(call_context_id3, None, Ok(None), 0.into()),
CallContextAction::NoResponse {
refund: Cycles::zero(),
}
(
CallContextAction::NoResponse {
refund: Cycles::zero(),
},
call_context_manager.call_context(call_context_id3).cloned()
),
call_context_manager.on_canister_result(call_context_id3, None, Ok(None), 0.into())
);

// First they're unanswered
Expand Down Expand Up @@ -157,10 +160,13 @@ fn call_context_handling() {
Ok(Some(WasmResult::Reply(vec![1]))),
0.into()
),
CallContextAction::Reply {
payload: vec![1],
refund: Cycles::zero(),
}
(
CallContextAction::Reply {
payload: vec![1],
refund: Cycles::zero(),
},
None
)
);

assert_eq!(call_context_manager.callbacks().len(), 2);
Expand Down Expand Up @@ -206,16 +212,19 @@ fn call_context_handling() {
// We mark the CallContext 2 as responded and it is deleted as it has no
// outstanding calls
assert_eq!(
(
CallContextAction::Reply {
payload: vec![],
refund: Cycles::zero(),
},
call_context_manager.call_context(call_context_id2).cloned()
),
call_context_manager.on_canister_result(
call_context_id2,
Some(callback_id3),
Ok(Some(WasmResult::Reply(vec![]))),
0.into()
),
CallContextAction::Reply {
payload: vec![],
refund: Cycles::zero(),
}
)
);
assert_eq!(call_context_manager.callbacks().len(), 1);
assert_eq!(call_context_manager.call_contexts().len(), 1);
Expand All @@ -237,13 +246,16 @@ fn call_context_handling() {
}
);
assert_eq!(
(
CallContextAction::AlreadyResponded,
call_context_manager.call_context(call_context_id1).cloned()
),
call_context_manager.on_canister_result(
call_context_id1,
Some(callback_id2),
Ok(None),
0.into()
),
CallContextAction::AlreadyResponded
)
);

// Since CallContext 1 was already responded, make sure we're in a clean state
Expand Down Expand Up @@ -319,7 +331,7 @@ fn test_call_context_instructions_executed_is_updated() {
// Finish a successful execution with 1K instructions.
assert_eq!(
call_context_manager.on_canister_result(call_context_id, None, Ok(None), 1_000.into()),
CallContextAction::NotYetResponded
(CallContextAction::NotYetResponded, None)
);
assert_eq!(
call_context_manager
Expand All @@ -338,7 +350,7 @@ fn test_call_context_instructions_executed_is_updated() {
Err(HypervisorError::InstructionLimitExceeded),
2_000.into()
),
CallContextAction::NotYetResponded
(CallContextAction::NotYetResponded, None)
);

// Now there should be 1K + 2K instructions_executed in the call context.
Expand Down
1 change: 1 addition & 0 deletions rs/test_utilities/execution_environment/src/lib.rs
Expand Up @@ -1085,6 +1085,7 @@ impl ExecutionTest {
response,
instructions_used,
heap_delta,
call_duration: _,
} => (canister, response, instructions_used, heap_delta),
ExecuteMessageResult::Paused { .. } => {
unreachable!("Unexpected paused execution")
Expand Down

0 comments on commit f61d37d

Please sign in to comment.