Skip to content

Commit

Permalink
feat: add service metrics for spells [fixes NET-441] (#1565)
Browse files Browse the repository at this point in the history
* add service type for spells to collect service metrics
* add service type of a service to detect spell services fast
  • Loading branch information
kmd-fl committed Apr 12, 2023
1 parent f762b8b commit 947dfa8
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 54 deletions.
55 changes: 32 additions & 23 deletions crates/peer-metrics/src/services_metrics/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ use crate::{execution_time_buckets, mem_buckets_4gib, mem_buckets_8gib, register
#[derive(Hash, Clone, Eq, PartialEq, Debug)]
pub enum ServiceType {
Builtin,
Spell(Option<String>),
Service(Option<String>),
}

impl EncodeLabelValue for ServiceType {
fn encode(&self, encoder: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> {
let label = match self {
ServiceType::Builtin => "builtin",
ServiceType::Spell(Some(x)) => x,
ServiceType::Spell(_) => "spell",
ServiceType::Service(Some(x)) => x,
ServiceType::Service(_) => "non-aliased-services",
};
Expand Down Expand Up @@ -66,15 +69,15 @@ impl ServicesMemoryMetrics {
#[derive(Clone)]
pub struct ServicesMetricsExternal {
/// Number of currently running services
pub services_count: Gauge,
pub services_count: Family<ServiceTypeLabel, Gauge>,
/// How long it took to create a service
pub creation_time_msec: Histogram,
pub creation_time_msec: Family<ServiceTypeLabel, Histogram>,
/// How long it took to remove a service
pub removal_time_msec: Histogram,
pub removal_time_msec: Family<ServiceTypeLabel, Histogram>,
/// Number of (srv create) calls
pub creation_count: Counter,
pub creation_count: Family<ServiceTypeLabel, Counter>,
/// Number of (srv remove) calls
pub removal_count: Counter,
pub removal_count: Family<ServiceTypeLabel, Counter>,

/// Number of (srv create) failures
pub creation_failure_count: Counter,
Expand All @@ -95,37 +98,37 @@ impl ServicesMetricsExternal {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("services");

let services_count = register(
let services_count: Family<_, _> = register(
sub_registry,
Gauge::default(),
Family::new_with_constructor(Gauge::default),
"services_count",
"number of currently running services",
);

let creation_time_msec = register(
let creation_time_msec: Family<_, _> = register(
sub_registry,
Histogram::new(execution_time_buckets()),
Family::new_with_constructor(|| Histogram::new(execution_time_buckets())),
"creation_time_msec",
"how long it took to create a service",
);

let removal_time_msec = register(
let removal_time_msec: Family<_, _> = register(
sub_registry,
Histogram::new(execution_time_buckets()),
Family::new_with_constructor(|| Histogram::new(execution_time_buckets())),
"removal_time_msec",
"how long it took to remove a service",
);

let creation_count = register(
let creation_count: Family<_, _> = register(
sub_registry,
Counter::default(),
Family::new_with_constructor(Counter::default),
"creation_count",
"number of srv create calls",
);

let removal_count = register(
let removal_count: Family<_, _> = register(
sub_registry,
Counter::default(),
Family::new_with_constructor(Counter::default),
"removal_count",
"number of srv remove calls",
);
Expand Down Expand Up @@ -228,16 +231,22 @@ impl ServicesMetricsExternal {
}

/// Collect all metrics that are relevant on service removal.
pub fn observe_removed(&self, removal_time: f64) {
self.removal_count.inc();
self.services_count.dec();
self.removal_time_msec.observe(removal_time);
pub fn observe_removed(&self, service_type: ServiceType, removal_time: f64) {
let label = ServiceTypeLabel { service_type };
self.removal_count.get_or_create(&label).inc();
self.services_count.get_or_create(&label).dec();
self.removal_time_msec
.get_or_create(&label)
.observe(removal_time);
}

pub fn observe_created(&self, modules_num: f64, creation_time: f64) {
self.services_count.inc();
pub fn observe_created(&self, service_type: ServiceType, modules_num: f64, creation_time: f64) {
let label = ServiceTypeLabel { service_type };
self.services_count.get_or_create(&label).inc();
self.modules_in_services_count.observe(modules_num);
self.creation_count.inc();
self.creation_time_msec.observe(creation_time);
self.creation_count.get_or_create(&label).inc();
self.creation_time_msec
.get_or_create(&label)
.observe(creation_time);
}
}
10 changes: 7 additions & 3 deletions crates/peer-metrics/src/services_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ impl ServicesMetrics {
creation_time: f64,
) {
self.observe_external(|external| {
external.observe_created(stats.modules_stats.len() as f64, creation_time);
external.observe_created(
service_type.clone(),
stats.modules_stats.len() as f64,
creation_time,
);
self.observe_service_mem(service_id, service_type, stats);
});
}
Expand All @@ -161,9 +165,9 @@ impl ServicesMetrics {
});
}

pub fn observe_removed(&self, removal_time: f64) {
pub fn observe_removed(&self, service_type: ServiceType, removal_time: f64) {
self.observe_external(|external| {
external.observe_removed(removal_time);
external.observe_removed(service_type, removal_time);
});
}

Expand Down
82 changes: 54 additions & 28 deletions particle-services/src/app_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use particle_args::{Args, JError};
use particle_execution::{FunctionOutcome, ParticleParams, ParticleVault, VaultError};
use particle_modules::ModuleRepository;
use peer_metrics::{
ServiceCallStats, ServiceMemoryStat, ServiceType, ServicesMetrics, ServicesMetricsBuiltin,
ServiceCallStats, ServiceMemoryStat, ServiceType as MetricServiceType, ServicesMetrics,
ServicesMetricsBuiltin,
};
use server_config::ServicesConfig;
use uuid_utils::uuid;
Expand All @@ -51,6 +52,18 @@ type ServiceAlias = String;
type Services = HashMap<ServiceId, Service>;
type Aliases = HashMap<PeerId, HashMap<ServiceAlias, ServiceId>>;

#[derive(Debug)]
pub enum ServiceType {
Service,
Spell,
}

impl ServiceType {
fn is_spell(&self) -> bool {
matches!(self, ServiceType::Spell)
}
}

#[derive(Debug, Serialize)]
pub struct ServiceInfo {
pub id: String,
Expand All @@ -69,6 +82,8 @@ pub struct Service {
pub service: Mutex<AppService>,
pub service_id: String,
pub blueprint_id: String,
// temp hack to detect if the service is a spell
pub service_type: ServiceType,
pub owner_id: PeerId,
pub aliases: Vec<ServiceAlias>,
pub worker_id: PeerId,
Expand All @@ -79,6 +94,7 @@ impl Service {
service: Mutex<AppService>,
service_id: String,
blueprint_id: String,
service_type: ServiceType,
owner_id: PeerId,
aliases: Vec<ServiceAlias>,
worker_id: PeerId,
Expand All @@ -87,6 +103,7 @@ impl Service {
service,
service_id,
blueprint_id,
service_type,
owner_id,
aliases,
worker_id,
Expand Down Expand Up @@ -324,17 +341,13 @@ impl ParticleAppServices {
)?;

// tmp hack to forbid spell removal via srv.remove
let blueprint_name = self
.modules
.get_blueprint_from_cache(&service.blueprint_id)?
.name;
if blueprint_name == "spell" && !allow_remove_spell {
if service.service_type.is_spell() && !allow_remove_spell {
return Err(Forbidden {
user: init_peer_id,
function: "remove_service",
reason: "cannot remove a spell",
});
} else if blueprint_name != "spell" && allow_remove_spell {
} else if !service.service_type.is_spell() && allow_remove_spell {
return Err(Forbidden {
user: init_peer_id,
function: "remove_spell",
Expand Down Expand Up @@ -374,6 +387,7 @@ impl ParticleAppServices {
)
}
let service = self.services.write().remove(&service_id).unwrap();
let service_type = self.get_service_type(&service, &service.worker_id);

if let Some(aliases) = self.aliases.write().get_mut(&service.worker_id) {
for alias in service.aliases.iter() {
Expand All @@ -383,7 +397,7 @@ impl ParticleAppServices {

let removal_end_time = removal_start_time.elapsed().as_secs();
if let Some(metrics) = self.metrics.as_ref() {
metrics.observe_removed(removal_end_time as f64);
metrics.observe_removed(service_type, removal_end_time as f64);
}

Ok(())
Expand Down Expand Up @@ -424,21 +438,8 @@ impl ParticleAppServices {
// },
// ));
// }

// Metrics collection are enables for services with aliases which are installed on root worker or worker spells.
let allowed_alias = if worker_id == self.config.local_peer_id {
service.aliases.first().cloned()
} else if service
.aliases
.first()
.map(|alias| alias == "worker-spell")
.unwrap_or(false)
{
Some("worker-spell".to_string())
} else {
None
};
let service_type = ServiceType::Service(allowed_alias);
let service_type = self.get_service_type(service, &worker_id);

// TODO: move particle vault creation to aquamarine::particle_functions
self.create_vault(&particle.id)?;
Expand Down Expand Up @@ -841,26 +842,30 @@ impl ParticleAppServices {
})?;
let stats = service.module_memory_stats();
let stats = ServiceMemoryStat::new(&stats);
let allowed_alias = if worker_id == self.config.local_peer_id {
aliases.first().cloned()

// Would be nice to determine that we create a spell service, but we don't have a reliable way to detect it yet
// so for now temp hack
let blueprint_name = self.modules.get_blueprint_from_cache(&blueprint_id)?.name;
let service_type = if blueprint_name == "spell" {
ServiceType::Spell
} else {
None
ServiceType::Service
};
let service_type = ServiceType::Service(allowed_alias);
let service = Service::new(
Mutex::new(service),
service_id.clone(),
blueprint_id,
service_type,
owner_id,
aliases,
worker_id,
);
// Save created service to disk, so it is recreated on restart
service.persist(&self.config.services_dir)?;

let service_type = self.get_service_type(&service, &worker_id);
let replaced = self.services.write().insert(service_id.clone(), service);
let creation_end_time = creation_start_time.elapsed().as_secs();
if let Some(m) = self.metrics.as_ref() {
let creation_end_time = creation_start_time.elapsed().as_secs();
m.observe_created(service_id, service_type, stats, creation_end_time as f64);
}

Expand All @@ -870,6 +875,27 @@ impl ParticleAppServices {
fn create_vault(&self, particle_id: &str) -> Result<(), VaultError> {
self.vault.create(particle_id)
}

fn get_service_type(&self, service: &Service, worker_id: &PeerId) -> MetricServiceType {
let allowed_alias = if self.config.local_peer_id.eq(worker_id) {
service.aliases.first().cloned()
} else if service
.aliases
.first()
.map(|alias| alias == "worker-spell")
.unwrap_or(false)
{
Some("worker-spell".to_string())
} else {
None
};

if service.service_type.is_spell() {
MetricServiceType::Spell(allowed_alias)
} else {
MetricServiceType::Service(allowed_alias)
}
}
}

fn is_unknown_function(err: &AppServiceError) -> bool {
Expand Down

0 comments on commit 947dfa8

Please sign in to comment.