diff --git a/Cargo.toml b/Cargo.toml index 91d1557..07e2f3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ bench = false [features] default = ["sysinfo-monitor"] sysinfo-monitor = ["dep:sysinfo"] +metrics = ["dep:metrics"] profile = ["dep:pprof"] [dependencies] @@ -28,12 +29,15 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10" fastrand = "2" +metrics = { version = "0.24", optional = true } sysinfo = { version = "0.33", optional = true } pprof = { version = "0.13", features = ["flamegraph", "criterion"], optional = true } [dev-dependencies] tokio = { version = "1", features = ["full", "test-util"] } criterion = { version = "0.5", features = ["async_tokio"] } +metrics = "0.24" +metrics-util = "0.19" [[bench]] name = "scheduler" diff --git a/docs/configuration.md b/docs/configuration.md index c8f7ed9..698524c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -320,6 +320,7 @@ scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await; | Feature | Default | Description | |---------|---------|-------------| | `sysinfo-monitor` | Enabled | Cross-platform CPU, disk IO, and network monitoring via `sysinfo`. Disable for mobile targets or when using a custom sampler. | +| `metrics` | Disabled | Emit counters, gauges, and histograms via the [`metrics`](https://crates.io/crates/metrics) crate facade. See [Metrics & Observability](metrics.md). | ```toml # Disable platform monitoring @@ -546,6 +547,9 @@ Scheduler::builder() | `group_minimum_slots(group, slots)` | Minimum guaranteed dispatch slots for a group, regardless of weight. | | `app_state(state)` | Register global state visible to all domains. | | `app_state_arc(arc)` | Register global state from a pre-existing `Arc`. | +| `metrics_prefix(prefix)` | Prefix for all `metrics` crate metric names (e.g. `"myapp"` → `myapp_taskmill_*`). See [Metrics](metrics.md). | +| `metrics_label(key, value)` | Global label applied to every emitted metric. | +| `disable_metric(name)` | Suppress emission of a specific metric by unprefixed name. | | `build()` | Build and return the `Scheduler`. | ### `Domain` builder methods diff --git a/docs/io-and-backpressure.md b/docs/io-and-backpressure.md index 0c9d9c5..74b1c99 100644 --- a/docs/io-and-backpressure.md +++ b/docs/io-and-backpressure.md @@ -237,3 +237,5 @@ for (name, value) in &snap.pressure_breakdown { println!(" {}: {:.0}%", name, value * 100.0); } ``` + +For production monitoring beyond snapshots, see [Metrics & Observability](metrics.md) — the `metrics` crate integration exports pressure gauges, gate denial counters, and rate limit token availability as standard Prometheus/StatsD metrics. diff --git a/docs/metrics.md b/docs/metrics.md new file mode 100644 index 0000000..5fed3c7 --- /dev/null +++ b/docs/metrics.md @@ -0,0 +1,206 @@ +# Observability Metrics + +Taskmill provides built-in observability through two complementary systems: + +1. **Always-on internal counters** — cheap `AtomicU64` counters maintained regardless of feature flags, exposed via `Scheduler::metrics_snapshot()`. +2. **`metrics` crate integration** (optional) — when the `metrics` Cargo feature is enabled, the scheduler emits counters, gauges, and histograms via the standard [`metrics`](https://crates.io/crates/metrics) facade. Consumers choose their exporter (Prometheus, StatsD, Datadog, etc.). + +## Quick Start + +### Without the `metrics` feature (default) + +```rust +let snap = scheduler.metrics_snapshot().await; +println!("submitted: {}, completed: {}, failed: {}", + snap.submitted, snap.completed, snap.failed); +println!("pending: {}, running: {}, pressure: {:.2}", + snap.pending, snap.running, snap.pressure); +``` + +### With the `metrics` feature + +```toml +[dependencies] +taskmill = { version = "0.6", features = ["metrics"] } +metrics-exporter-prometheus = "0.16" +``` + +```rust +// Install a Prometheus exporter (or any metrics recorder). +let builder = metrics_exporter_prometheus::PrometheusBuilder::new(); +builder.install().expect("failed to install Prometheus recorder"); + +// Build the scheduler — metrics are automatically emitted. +let scheduler = Scheduler::builder() + .store_path("tasks.db") + .domain(Domain::::new().task(MyExecutor)) + .metrics_prefix("myapp") // → myapp_taskmill_* + .metrics_label("service", "ingest") // global label on every metric + .build() + .await?; +``` + +## MetricsSnapshot + +`Scheduler::metrics_snapshot()` returns a `MetricsSnapshot` struct with: + +### Counters (cumulative since scheduler creation) + +| Field | Description | +|-------|-------------| +| `submitted` | Total tasks accepted into the queue | +| `dispatched` | Total tasks that entered Running state | +| `completed` | Total successful completions | +| `failed` | Total failures (retryable + permanent) | +| `failed_retryable` | Subset of `failed` that were retryable | +| `retried` | Total retry requeue attempts | +| `dead_lettered` | Tasks that exhausted retries | +| `superseded` | Tasks replaced by newer submissions with the same dedup key | +| `cancelled` | Explicit cancellations | +| `expired` | Tasks that hit TTL before dispatch | +| `preempted` | Tasks preempted by higher-priority work | +| `batches_submitted` | Total `submit_batch()` calls | +| `gate_denials` | Dispatch gate rejections (backpressure, IO budget, concurrency, rate limit) | +| `rate_limit_throttles` | Rate limit token exhaustion events | +| `group_pauses` | Group pause events | +| `group_resumes` | Group resume events | +| `dependency_failures` | Blocked tasks cancelled because a dependency failed | +| `recurring_skipped` | Recurring instances skipped due to pile-up prevention | + +### Gauges (point-in-time) + +| Field | Description | +|-------|-------------| +| `pending` | Current queue depth | +| `running` | Current running task count | +| `blocked` | Tasks waiting on unmet dependencies | +| `paused` | Tasks in pause state | +| `waiting` | Parent tasks waiting for children | +| `pressure` | Aggregate backpressure (0.0–1.0) | +| `max_concurrency` | Current concurrency cap | +| `groups_paused` | Number of currently paused groups | + +## `metrics` Crate Metrics Reference + +All metrics use the prefix `taskmill_` (customizable via `SchedulerBuilder::metrics_prefix`). + +### Counters + +| Metric | Labels | Description | +|--------|--------|-------------| +| `taskmill_tasks_submitted_total` | `type`, `module`, `group` | Total tasks accepted into the queue | +| `taskmill_tasks_dispatched_total` | `type`, `module`, `group` | Total tasks that entered Running state | +| `taskmill_tasks_completed_total` | `type`, `module`, `group` | Total successful completions | +| `taskmill_tasks_failed_total` | `type`, `module`, `group`, `retryable` | Total failures, split by retryability | +| `taskmill_tasks_retried_total` | `type`, `module`, `group` | Total retry requeue attempts | +| `taskmill_tasks_dead_lettered_total` | `type`, `module`, `group` | Tasks that exhausted retries | +| `taskmill_tasks_superseded_total` | `type`, `module`, `group` | Tasks replaced by newer submissions | +| `taskmill_tasks_cancelled_total` | `type`, `module`, `group` | Explicit cancellations | +| `taskmill_tasks_expired_total` | `type`, `module`, `group` | Tasks that hit TTL before dispatch | +| `taskmill_tasks_preempted_total` | `type`, `module` | Tasks preempted by higher-priority work | +| `taskmill_batches_submitted_total` | — | Total batch submission calls | +| `taskmill_gate_denials_total` | `reason` | Dispatch gate rejections by reason | +| `taskmill_rate_limit_throttles_total` | `scope_kind`, `scope` | Rate limit token exhaustion events | +| `taskmill_group_pauses_total` | `group` | Group pause events | +| `taskmill_group_resumes_total` | `group` | Group resume events | +| `taskmill_dependency_failures_total` | — | Blocked tasks cancelled because a dependency failed | +| `taskmill_recurring_skipped_total` | `type`, `module` | Recurring instances skipped | + +### Gauges + +Updated each dispatch cycle (~500ms default). + +| Metric | Labels | Description | +|--------|--------|-------------| +| `taskmill_tasks_pending` | — | Current queue depth | +| `taskmill_tasks_running` | — | Current running task count | +| `taskmill_tasks_blocked` | — | Tasks waiting on unmet dependencies | +| `taskmill_tasks_paused` | — | Tasks in pause state | +| `taskmill_tasks_waiting` | — | Parent tasks waiting for children | +| `taskmill_max_concurrency` | — | Current concurrency cap | +| `taskmill_pressure` | — | Aggregate backpressure (0.0–1.0) | +| `taskmill_pressure_source` | `source` | Per-source pressure level | +| `taskmill_groups_paused_count` | — | Number of currently paused groups | +| `taskmill_rate_limit_tokens_available` | `scope_kind`, `scope` | Current available tokens per rate-limit bucket | +| `taskmill_module_tasks_running` | `module` | Running tasks per registered module | + +### Histograms + +| Metric | Labels | Description | +|--------|--------|-------------| +| `taskmill_task_duration_seconds` | `type`, `module`, `status` | Wall-clock execution time (completed/failed) | +| `taskmill_task_queue_wait_seconds` | `type`, `module` | Time from submission to dispatch start | +| `taskmill_batch_size` | — | Number of tasks per batch submission call | +| `taskmill_completion_batch_size` | — | Number of completions coalesced per drain cycle | +| `taskmill_failure_batch_size` | — | Number of failures coalesced per drain cycle | + +## Recommended Dashboard Layout (Prometheus/Grafana) + +### Row 1 — Throughput & Queue Health + +| Panel | PromQL | Signal | +|-------|--------|--------| +| Submission rate | `rate(taskmill_tasks_submitted_total[5m])` | How fast work is arriving | +| Throughput | `rate(taskmill_tasks_completed_total[5m])` | How fast work is completing | +| Queue depth | `taskmill_tasks_pending` | Primary health signal | +| Absorption ratio | `rate(taskmill_tasks_dispatched_total[5m]) / rate(taskmill_tasks_submitted_total[5m])` | Values <1.0 = queue growing | + +### Row 2 — Failure & Retry Health + +| Panel | PromQL | Signal | +|-------|--------|--------| +| Failure rate | `rate(taskmill_tasks_failed_total[5m])` by `retryable` | Transient vs permanent | +| Retry ratio | `rate(taskmill_tasks_retried_total[5m]) / rate(taskmill_tasks_dispatched_total[5m])` | >10% warrants investigation | +| Dead letter rate | `rate(taskmill_tasks_dead_lettered_total[5m])` | Alert on any nonzero | +| Expiry rate | `rate(taskmill_tasks_expired_total[5m])` | Correlate with queue depth | + +### Row 3 — Latency Distributions + +| Panel | PromQL | Signal | +|-------|--------|--------| +| Execution p50/p95/p99 | `histogram_quantile(0.95, rate(taskmill_task_duration_seconds_bucket[5m]))` | Tail latency | +| Queue wait p50/p95/p99 | `histogram_quantile(0.95, rate(taskmill_task_queue_wait_seconds_bucket[5m]))` | Time in queue | + +### Row 4 — Capacity & Bottlenecks + +| Panel | PromQL | Signal | +|-------|--------|--------| +| Concurrency utilization | `taskmill_tasks_running / taskmill_max_concurrency` | Sustained 1.0 = at limit | +| Backpressure | `taskmill_pressure` | >0.8 = active throttling | +| Gate denials by reason | `rate(taskmill_gate_denials_total[5m])` by `reason` | Primary bottleneck | + +## Suggested Alert Rules + +| Alert | Condition | Severity | +|-------|-----------|----------| +| Queue growing | `taskmill_tasks_pending > ` for 10m | Warning | +| Dead letters | `rate(taskmill_tasks_dead_lettered_total[5m]) > 0` for 5m | Critical | +| High retry ratio | retry ratio > 0.1 for 15m | Warning | +| Sustained backpressure | `taskmill_pressure > 0.9` for 10m | Warning | +| Queue wait SLO breach | p95 queue wait > SLO for 5m | Warning | + +## Builder API + +```rust +Scheduler::builder() + // Prefix all metric names: "myapp_taskmill_*" + .metrics_prefix("myapp") + + // Add global labels to every metric + .metrics_label("service", "ingest") + .metrics_label("env", "production") + + // Suppress specific metrics (use unprefixed name) + .disable_metric("task_duration_seconds") + + .build() + .await?; +``` + +## Design Principles + +- **Zero-cost when unused** — no overhead when no `metrics` recorder is installed. Internal atomic counters cost a few cache lines. +- **Standard facade** — uses the `metrics` crate so consumers choose their exporter. +- **Source-level instrumentation** — metrics emitted where the event happens, not from a channel subscriber. +- **Bounded label cardinality** — only `type`, `module`, `group`, and `reason` appear as labels. Never `task_id`, `key`, or user-provided `tags`. +- **No allocations on the hot path** — label values are borrowed `&str` or small stack strings. diff --git a/docs/progress-and-events.md b/docs/progress-and-events.md index 921142d..6f02016 100644 --- a/docs/progress-and-events.md +++ b/docs/progress-and-events.md @@ -194,3 +194,8 @@ async fn scheduler_status( scheduler.snapshot().await } ``` + +## See also + +- [Metrics & Observability](metrics.md) — for production monitoring with Prometheus, StatsD, or Datadog. The `metrics` crate integration emits counters, gauges, and histograms that complement the real-time event stream described above. +- [Query APIs](query-apis.md) — for polling-based dashboards and task inspection. diff --git a/docs/query-apis.md b/docs/query-apis.md index 60a0552..d6276e8 100644 --- a/docs/query-apis.md +++ b/docs/query-apis.md @@ -58,7 +58,7 @@ These methods operate across all domains and are available directly on `Schedule | `scheduler.task(id)` | `Option` | Look up any active task by ID, regardless of which domain owns it. | | `scheduler.snapshot()` | `SchedulerSnapshot` | Global aggregates: total running, pending, pressure, progress, and recurring schedules. | -See [Multi-Module Applications](multi-module-apps.md#building-a-cross-module-dashboard) for dashboard patterns using these APIs. +See [Multi-Module Applications](multi-module-apps.md#building-a-cross-module-dashboard) for dashboard patterns using these APIs, and [Metrics & Observability](metrics.md) for production-grade metrics export to Prometheus, StatsD, or Datadog. ## Cancellation diff --git a/docs/quick-start.md b/docs/quick-start.md index 1e3cd6c..c1bb49c 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -556,6 +556,7 @@ Work through the topic guides in order: 4. [Persistence & Recovery](persistence-and-recovery.md) — understand crash safety and deduplication 5. [Configuration](configuration.md) — tune for your workload 6. [Query APIs](query-apis.md) — build dashboards and debug stuck tasks -7. [Multi-Module Applications](multi-module-apps.md) — assemble multiple domains, cross-domain dependencies, tags, and dashboards -8. [Writing a Reusable Module](library-modules.md) — publish a domain as a library crate -9. [Design](design.md) — understand the architecture for advanced use +7. [Metrics & Observability](metrics.md) — internal counters, `metrics` crate integration, Prometheus dashboards +8. [Multi-Module Applications](multi-module-apps.md) — assemble multiple domains, cross-domain dependencies, tags, and dashboards +9. [Writing a Reusable Module](library-modules.md) — publish a domain as a library crate +10. [Design](design.md) — understand the architecture for advanced use diff --git a/src/lib.rs b/src/lib.rs index 7f86409..5a00b26 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -801,6 +801,28 @@ //! when providing a custom [`ResourceSampler`]. Without this feature, calling //! [`SchedulerBuilder::with_resource_monitoring`] requires a custom sampler //! via [`resource_sampler()`](SchedulerBuilder::resource_sampler). +//! - **`metrics`**: Enables integration with the [`metrics`](https://crates.io/crates/metrics) crate. +//! When enabled, the scheduler emits counters, gauges, and histograms via the `metrics` +//! facade — consumers choose their exporter (Prometheus, StatsD, etc.). Internal atomic +//! counters and [`MetricsSnapshot`] are always available regardless of this feature. +//! +//! # Metrics +//! +//! Taskmill provides two levels of observability: +//! +//! 1. **Always-on internal counters** — [`Scheduler::metrics_snapshot()`] returns a +//! [`MetricsSnapshot`] with cumulative counters (submitted, dispatched, completed, +//! failed, retried, dead-lettered, etc.) and point-in-time gauges (pending, running, +//! pressure, max_concurrency). Available without any feature flags. +//! +//! 2. **`metrics` crate integration** (feature-gated) — when the `metrics` feature is +//! enabled, the scheduler emits ~30 metrics via the standard `metrics` facade. All +//! metric names use the `taskmill_` prefix (customizable via +//! [`SchedulerBuilder::metrics_prefix`]). Labels include `type`, `module`, `group`, +//! and `reason` (bounded cardinality). Configure with: +//! - [`SchedulerBuilder::metrics_prefix`] — custom prefix +//! - [`SchedulerBuilder::metrics_label`] — global labels +//! - [`SchedulerBuilder::disable_metric`] — suppress specific metrics pub mod backpressure; pub mod domain; @@ -826,9 +848,10 @@ pub use resource::network_pressure::NetworkPressure; pub use resource::sampler::SamplerConfig; pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot}; pub use scheduler::{ - AgingConfig, EstimatedProgress, GroupAllocationInfo, GroupLimits, PausedGroupInfo, - ProgressReporter, RateLimit, RateLimitInfo, Scheduler, SchedulerBuilder, SchedulerConfig, - SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, TaskProgress, + AgingConfig, EstimatedProgress, GroupAllocationInfo, GroupLimits, MetricsSnapshot, + PausedGroupInfo, ProgressReporter, RateLimit, RateLimitInfo, Scheduler, SchedulerBuilder, + SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, + TaskProgress, }; pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ diff --git a/src/scheduler/builder.rs b/src/scheduler/builder.rs index 3f39663..4e1343b 100644 --- a/src/scheduler/builder.rs +++ b/src/scheduler/builder.rs @@ -60,6 +60,9 @@ pub struct SchedulerBuilder { group_weights: Vec<(String, u32)>, default_group_weight: u32, group_min_slots: Vec<(String, usize)>, + metrics_prefix: Option, + metrics_global_labels: Vec<(String, String)>, + metrics_disabled: std::collections::HashSet, } impl SchedulerBuilder { @@ -85,6 +88,9 @@ impl SchedulerBuilder { group_weights: Vec::new(), default_group_weight: 1, group_min_slots: Vec::new(), + metrics_prefix: None, + metrics_global_labels: Vec::new(), + metrics_disabled: std::collections::HashSet::new(), } } @@ -351,6 +357,34 @@ impl SchedulerBuilder { self } + /// Set a prefix prepended to all metric names. + /// + /// Example: `metrics_prefix("myapp")` → + /// `myapp_taskmill_tasks_submitted_total` + pub fn metrics_prefix(mut self, prefix: impl Into) -> Self { + self.metrics_prefix = Some(prefix.into()); + self + } + + /// Add a global label applied to all emitted metrics. + /// + /// Example: `metrics_label("service", "ingest")` adds + /// `service="ingest"` to every metric. + pub fn metrics_label(mut self, key: impl Into, value: impl Into) -> Self { + self.metrics_global_labels.push((key.into(), value.into())); + self + } + + /// Disable emission of a specific metric by name. + /// + /// Useful for suppressing high-cardinality or expensive metrics + /// (e.g. histograms in extremely hot paths). The metric name should + /// be the full name without prefix (e.g. `"task_duration_seconds"`). + pub fn disable_metric(mut self, name: impl Into) -> Self { + self.metrics_disabled.insert(name.into()); + self + } + /// Build the scheduler. Opens the database and wires all components. /// /// # Errors @@ -498,7 +532,13 @@ impl SchedulerBuilder { .map_err(|e| StoreError::Database(e.into()))?; } - let scheduler = Scheduler::with_gate( + let metrics_config = super::MetricsConfig { + prefix: self.metrics_prefix, + global_labels: self.metrics_global_labels, + disabled: self.metrics_disabled, + }; + + let scheduler = Scheduler::with_gate_and_metrics( store, self.config, Arc::new(registry), @@ -506,6 +546,7 @@ impl SchedulerBuilder { app_state, module_registry, module_state, + metrics_config, ); // Load persisted group pause state (survives restarts). @@ -626,6 +667,10 @@ impl SchedulerBuilder { .store(true, std::sync::atomic::Ordering::Relaxed); } + // Register metric descriptions (once, at build time). + #[cfg(feature = "metrics")] + scheduler.inner.emitter.describe_metrics(); + Ok(scheduler) } } diff --git a/src/scheduler/control.rs b/src/scheduler/control.rs index 9684631..21d1502 100644 --- a/src/scheduler/control.rs +++ b/src/scheduler/control.rs @@ -137,6 +137,13 @@ impl Scheduler { .pause_group(group_key, &self.inner.store, &self.inner.event_tx) .await; + self.inner + .counters + .group_pauses + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + #[cfg(feature = "metrics")] + self.inner.emitter.record_group_pause(group_key); + let _ = self.inner.event_tx.send(SchedulerEvent::GroupPaused { group: group_key.to_string(), pending_count: pending_paused as usize, @@ -195,6 +202,13 @@ impl Scheduler { self.inner.work_notify.notify_one(); } + self.inner + .counters + .group_resumes + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + #[cfg(feature = "metrics")] + self.inner.emitter.record_group_resume(group_key); + let _ = self.inner.event_tx.send(SchedulerEvent::GroupResumed { group: group_key.to_string(), resumed_count: resumed_count as usize, diff --git a/src/scheduler/counters.rs b/src/scheduler/counters.rs new file mode 100644 index 0000000..2f655f8 --- /dev/null +++ b/src/scheduler/counters.rs @@ -0,0 +1,142 @@ +//! Internal atomic counters for scheduler throughput metrics. +//! +//! Always maintained regardless of feature flags. Exposed via +//! [`MetricsSnapshot`] for consumers who don't use the `metrics` crate. + +use std::sync::atomic::{AtomicU64, Ordering::Relaxed}; + +/// Internal atomic counters for scheduler throughput metrics. +/// +/// Always maintained regardless of feature flags. Counters are +/// incremented at the code path where the event happens (submit, +/// dispatch, completion, failure, etc.) and exposed via +/// [`Scheduler::metrics_snapshot()`](super::Scheduler::metrics_snapshot). +pub(crate) struct SchedulerCounters { + pub submitted: AtomicU64, + pub dispatched: AtomicU64, + pub completed: AtomicU64, + pub failed: AtomicU64, + pub failed_retryable: AtomicU64, + pub retried: AtomicU64, + pub dead_lettered: AtomicU64, + pub superseded: AtomicU64, + pub cancelled: AtomicU64, + pub expired: AtomicU64, + pub preempted: AtomicU64, + pub batches_submitted: AtomicU64, + pub gate_denials: AtomicU64, + pub rate_limit_throttles: AtomicU64, + pub group_pauses: AtomicU64, + pub group_resumes: AtomicU64, + pub dependency_failures: AtomicU64, + pub recurring_skipped: AtomicU64, +} + +impl SchedulerCounters { + pub(crate) fn new() -> Self { + Self { + submitted: AtomicU64::new(0), + dispatched: AtomicU64::new(0), + completed: AtomicU64::new(0), + failed: AtomicU64::new(0), + failed_retryable: AtomicU64::new(0), + retried: AtomicU64::new(0), + dead_lettered: AtomicU64::new(0), + superseded: AtomicU64::new(0), + cancelled: AtomicU64::new(0), + expired: AtomicU64::new(0), + preempted: AtomicU64::new(0), + batches_submitted: AtomicU64::new(0), + gate_denials: AtomicU64::new(0), + rate_limit_throttles: AtomicU64::new(0), + group_pauses: AtomicU64::new(0), + group_resumes: AtomicU64::new(0), + dependency_failures: AtomicU64::new(0), + recurring_skipped: AtomicU64::new(0), + } + } + + /// Take a snapshot of all counter values. + pub(crate) fn snapshot(&self) -> CounterSnapshot { + CounterSnapshot { + submitted: self.submitted.load(Relaxed), + dispatched: self.dispatched.load(Relaxed), + completed: self.completed.load(Relaxed), + failed: self.failed.load(Relaxed), + failed_retryable: self.failed_retryable.load(Relaxed), + retried: self.retried.load(Relaxed), + dead_lettered: self.dead_lettered.load(Relaxed), + superseded: self.superseded.load(Relaxed), + cancelled: self.cancelled.load(Relaxed), + expired: self.expired.load(Relaxed), + preempted: self.preempted.load(Relaxed), + batches_submitted: self.batches_submitted.load(Relaxed), + gate_denials: self.gate_denials.load(Relaxed), + rate_limit_throttles: self.rate_limit_throttles.load(Relaxed), + group_pauses: self.group_pauses.load(Relaxed), + group_resumes: self.group_resumes.load(Relaxed), + dependency_failures: self.dependency_failures.load(Relaxed), + recurring_skipped: self.recurring_skipped.load(Relaxed), + } + } +} + +/// Snapshot of counter values only (no gauges). Used internally to build +/// [`MetricsSnapshot`]. +pub(crate) struct CounterSnapshot { + pub submitted: u64, + pub dispatched: u64, + pub completed: u64, + pub failed: u64, + pub failed_retryable: u64, + pub retried: u64, + pub dead_lettered: u64, + pub superseded: u64, + pub cancelled: u64, + pub expired: u64, + pub preempted: u64, + pub batches_submitted: u64, + pub gate_denials: u64, + pub rate_limit_throttles: u64, + pub group_pauses: u64, + pub group_resumes: u64, + pub dependency_failures: u64, + pub recurring_skipped: u64, +} + +/// Point-in-time counter and gauge snapshot for consumers who don't use +/// the `metrics` crate. +/// +/// All counter values are cumulative since scheduler creation. Gauge values +/// reflect the current instant. Available without any feature flags. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct MetricsSnapshot { + // Counters (cumulative) + pub submitted: u64, + pub dispatched: u64, + pub completed: u64, + pub failed: u64, + pub failed_retryable: u64, + pub retried: u64, + pub dead_lettered: u64, + pub superseded: u64, + pub cancelled: u64, + pub expired: u64, + pub preempted: u64, + pub batches_submitted: u64, + pub gate_denials: u64, + pub rate_limit_throttles: u64, + pub group_pauses: u64, + pub group_resumes: u64, + pub dependency_failures: u64, + pub recurring_skipped: u64, + // Gauges (point-in-time) + pub pending: i64, + pub running: usize, + pub blocked: i64, + pub paused: i64, + pub waiting: i64, + pub pressure: f32, + pub max_concurrency: usize, + pub groups_paused: usize, +} diff --git a/src/scheduler/gate.rs b/src/scheduler/gate.rs index 55b02b5..c3d4c5d 100644 --- a/src/scheduler/gate.rs +++ b/src/scheduler/gate.rs @@ -61,6 +61,8 @@ pub struct GateContext<'a> { /// `dispatch_fair()` pass 1 where group slot budgets are already /// enforced by the allocation algorithm. pub skip_group_concurrency: bool, + /// Internal atomic counters for throughput metrics. + pub counters: &'a super::counters::SchedulerCounters, } // ── Dispatch Gate ────────────────────────────────────────────────── @@ -144,6 +146,8 @@ impl DispatchGate for DefaultDispatchGate { ctx: &'a GateContext<'a>, ) -> BoxFuture<'a, Result> { Box::pin(async move { + use std::sync::atomic::Ordering::Relaxed; + // Backpressure check. let current_pressure = self.pressure.lock().await.pressure(); if self.policy.should_throttle(task.priority, current_pressure) { @@ -152,6 +156,7 @@ impl DispatchGate for DefaultDispatchGate { pressure = current_pressure, "task throttled by backpressure — requeuing" ); + ctx.counters.gate_denials.fetch_add(1, Relaxed); return Ok(Admission::Deny); } @@ -163,6 +168,7 @@ impl DispatchGate for DefaultDispatchGate { expected_write = task.expected_io.disk_write, "task deferred — disk IO budget exhausted — requeuing" ); + ctx.counters.gate_denials.fetch_add(1, Relaxed); return Ok(Admission::Deny); } @@ -174,6 +180,7 @@ impl DispatchGate for DefaultDispatchGate { expected_tx = task.expected_io.net_tx, "task deferred — network IO budget exhausted — requeuing" ); + ctx.counters.gate_denials.fetch_add(1, Relaxed); return Ok(Admission::Deny); } @@ -185,6 +192,7 @@ impl DispatchGate for DefaultDispatchGate { group = group_key, "task deferred — group paused — requeuing" ); + ctx.counters.gate_denials.fetch_add(1, Relaxed); return Ok(Admission::Deny); } } @@ -204,6 +212,7 @@ impl DispatchGate for DefaultDispatchGate { limit, "task deferred — group concurrency saturated — requeuing" ); + ctx.counters.gate_denials.fetch_add(1, Relaxed); return Ok(Admission::Deny); } } @@ -227,6 +236,7 @@ impl DispatchGate for DefaultDispatchGate { cap, "task deferred — module concurrency saturated — requeuing" ); + ctx.counters.gate_denials.fetch_add(1, Relaxed); return Ok(Admission::Deny); } } @@ -240,6 +250,8 @@ impl DispatchGate for DefaultDispatchGate { task_type = task.task_type, "task deferred — task-type rate limit" ); + ctx.counters.gate_denials.fetch_add(1, Relaxed); + ctx.counters.rate_limit_throttles.fetch_add(1, Relaxed); return Ok(Admission::RateLimited(next)); } @@ -251,6 +263,8 @@ impl DispatchGate for DefaultDispatchGate { group = group_key, "task deferred — group rate limit" ); + ctx.counters.gate_denials.fetch_add(1, Relaxed); + ctx.counters.rate_limit_throttles.fetch_add(1, Relaxed); return Ok(Admission::RateLimited(next)); } } diff --git a/src/scheduler/metrics_bridge.rs b/src/scheduler/metrics_bridge.rs new file mode 100644 index 0000000..ebd3ad0 --- /dev/null +++ b/src/scheduler/metrics_bridge.rs @@ -0,0 +1,477 @@ +//! `metrics` crate integration — feature-gated metric emission. +//! +//! This module provides [`MetricsEmitter`], a thin wrapper that emits metrics +//! via the `metrics` crate facade. All methods compile to nothing when no +//! recorder is installed (the `metrics` crate's built-in no-op path). + +use std::time::Duration; + +use metrics::{counter, gauge, histogram, Label}; + +/// Thin wrapper that formats metric names with an optional prefix and +/// attaches global labels. All methods are `#[inline]` for zero overhead +/// when the recorder is a no-op. +#[allow(dead_code)] +pub(crate) struct MetricsEmitter { + prefix: String, + global_labels: Vec