Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bench = false
[features]
default = ["sysinfo-monitor"]
sysinfo-monitor = ["dep:sysinfo"]
metrics = ["dep:metrics"]
profile = ["dep:pprof"]

[dependencies]
Expand All @@ -28,12 +29,15 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10"
fastrand = "2"
metrics = { version = "0.24", optional = true }
sysinfo = { version = "0.33", optional = true }
pprof = { version = "0.13", features = ["flamegraph", "criterion"], optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }
criterion = { version = "0.5", features = ["async_tokio"] }
metrics = "0.24"
metrics-util = "0.19"

[[bench]]
name = "scheduler"
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await;
| Feature | Default | Description |
|---------|---------|-------------|
| `sysinfo-monitor` | Enabled | Cross-platform CPU, disk IO, and network monitoring via `sysinfo`. Disable for mobile targets or when using a custom sampler. |
| `metrics` | Disabled | Emit counters, gauges, and histograms via the [`metrics`](https://crates.io/crates/metrics) crate facade. See [Metrics & Observability](metrics.md). |

```toml
# Disable platform monitoring
Expand Down Expand Up @@ -546,6 +547,9 @@ Scheduler::builder()
| `group_minimum_slots(group, slots)` | Minimum guaranteed dispatch slots for a group, regardless of weight. |
| `app_state(state)` | Register global state visible to all domains. |
| `app_state_arc(arc)` | Register global state from a pre-existing `Arc`. |
| `metrics_prefix(prefix)` | Prefix for all `metrics` crate metric names (e.g. `"myapp"` → `myapp_taskmill_*`). See [Metrics](metrics.md). |
| `metrics_label(key, value)` | Global label applied to every emitted metric. |
| `disable_metric(name)` | Suppress emission of a specific metric by unprefixed name. |
| `build()` | Build and return the `Scheduler`. |

### `Domain<D>` builder methods
Expand Down
2 changes: 2 additions & 0 deletions docs/io-and-backpressure.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,5 @@ for (name, value) in &snap.pressure_breakdown {
println!(" {}: {:.0}%", name, value * 100.0);
}
```

For production monitoring beyond snapshots, see [Metrics & Observability](metrics.md) — the `metrics` crate integration exports pressure gauges, gate denial counters, and rate limit token availability as standard Prometheus/StatsD metrics.
206 changes: 206 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Observability Metrics

Taskmill provides built-in observability through two complementary systems:

1. **Always-on internal counters** — cheap `AtomicU64` counters maintained regardless of feature flags, exposed via `Scheduler::metrics_snapshot()`.
2. **`metrics` crate integration** (optional) — when the `metrics` Cargo feature is enabled, the scheduler emits counters, gauges, and histograms via the standard [`metrics`](https://crates.io/crates/metrics) facade. Consumers choose their exporter (Prometheus, StatsD, Datadog, etc.).

## Quick Start

### Without the `metrics` feature (default)

```rust
let snap = scheduler.metrics_snapshot().await;
println!("submitted: {}, completed: {}, failed: {}",
snap.submitted, snap.completed, snap.failed);
println!("pending: {}, running: {}, pressure: {:.2}",
snap.pending, snap.running, snap.pressure);
```

### With the `metrics` feature

```toml
[dependencies]
taskmill = { version = "0.6", features = ["metrics"] }
metrics-exporter-prometheus = "0.16"
```

```rust
// Install a Prometheus exporter (or any metrics recorder).
let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
builder.install().expect("failed to install Prometheus recorder");

// Build the scheduler — metrics are automatically emitted.
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.domain(Domain::<MyApp>::new().task(MyExecutor))
.metrics_prefix("myapp") // → myapp_taskmill_*
.metrics_label("service", "ingest") // global label on every metric
.build()
.await?;
```

## MetricsSnapshot

`Scheduler::metrics_snapshot()` returns a `MetricsSnapshot` struct with:

### Counters (cumulative since scheduler creation)

| Field | Description |
|-------|-------------|
| `submitted` | Total tasks accepted into the queue |
| `dispatched` | Total tasks that entered Running state |
| `completed` | Total successful completions |
| `failed` | Total failures (retryable + permanent) |
| `failed_retryable` | Subset of `failed` that were retryable |
| `retried` | Total retry requeue attempts |
| `dead_lettered` | Tasks that exhausted retries |
| `superseded` | Tasks replaced by newer submissions with the same dedup key |
| `cancelled` | Explicit cancellations |
| `expired` | Tasks that hit TTL before dispatch |
| `preempted` | Tasks preempted by higher-priority work |
| `batches_submitted` | Total `submit_batch()` calls |
| `gate_denials` | Dispatch gate rejections (backpressure, IO budget, concurrency, rate limit) |
| `rate_limit_throttles` | Rate limit token exhaustion events |
| `group_pauses` | Group pause events |
| `group_resumes` | Group resume events |
| `dependency_failures` | Blocked tasks cancelled because a dependency failed |
| `recurring_skipped` | Recurring instances skipped due to pile-up prevention |

### Gauges (point-in-time)

| Field | Description |
|-------|-------------|
| `pending` | Current queue depth |
| `running` | Current running task count |
| `blocked` | Tasks waiting on unmet dependencies |
| `paused` | Tasks in pause state |
| `waiting` | Parent tasks waiting for children |
| `pressure` | Aggregate backpressure (0.0–1.0) |
| `max_concurrency` | Current concurrency cap |
| `groups_paused` | Number of currently paused groups |

## `metrics` Crate Metrics Reference

All metrics use the prefix `taskmill_` (customizable via `SchedulerBuilder::metrics_prefix`).

### Counters

| Metric | Labels | Description |
|--------|--------|-------------|
| `taskmill_tasks_submitted_total` | `type`, `module`, `group` | Total tasks accepted into the queue |
| `taskmill_tasks_dispatched_total` | `type`, `module`, `group` | Total tasks that entered Running state |
| `taskmill_tasks_completed_total` | `type`, `module`, `group` | Total successful completions |
| `taskmill_tasks_failed_total` | `type`, `module`, `group`, `retryable` | Total failures, split by retryability |
| `taskmill_tasks_retried_total` | `type`, `module`, `group` | Total retry requeue attempts |
| `taskmill_tasks_dead_lettered_total` | `type`, `module`, `group` | Tasks that exhausted retries |
| `taskmill_tasks_superseded_total` | `type`, `module`, `group` | Tasks replaced by newer submissions |
| `taskmill_tasks_cancelled_total` | `type`, `module`, `group` | Explicit cancellations |
| `taskmill_tasks_expired_total` | `type`, `module`, `group` | Tasks that hit TTL before dispatch |
| `taskmill_tasks_preempted_total` | `type`, `module` | Tasks preempted by higher-priority work |
| `taskmill_batches_submitted_total` | — | Total batch submission calls |
| `taskmill_gate_denials_total` | `reason` | Dispatch gate rejections by reason |
| `taskmill_rate_limit_throttles_total` | `scope_kind`, `scope` | Rate limit token exhaustion events |
| `taskmill_group_pauses_total` | `group` | Group pause events |
| `taskmill_group_resumes_total` | `group` | Group resume events |
| `taskmill_dependency_failures_total` | — | Blocked tasks cancelled because a dependency failed |
| `taskmill_recurring_skipped_total` | `type`, `module` | Recurring instances skipped |

### Gauges

Updated each dispatch cycle (~500ms default).

| Metric | Labels | Description |
|--------|--------|-------------|
| `taskmill_tasks_pending` | — | Current queue depth |
| `taskmill_tasks_running` | — | Current running task count |
| `taskmill_tasks_blocked` | — | Tasks waiting on unmet dependencies |
| `taskmill_tasks_paused` | — | Tasks in pause state |
| `taskmill_tasks_waiting` | — | Parent tasks waiting for children |
| `taskmill_max_concurrency` | — | Current concurrency cap |
| `taskmill_pressure` | — | Aggregate backpressure (0.0–1.0) |
| `taskmill_pressure_source` | `source` | Per-source pressure level |
| `taskmill_groups_paused_count` | — | Number of currently paused groups |
| `taskmill_rate_limit_tokens_available` | `scope_kind`, `scope` | Current available tokens per rate-limit bucket |
| `taskmill_module_tasks_running` | `module` | Running tasks per registered module |

### Histograms

| Metric | Labels | Description |
|--------|--------|-------------|
| `taskmill_task_duration_seconds` | `type`, `module`, `status` | Wall-clock execution time (completed/failed) |
| `taskmill_task_queue_wait_seconds` | `type`, `module` | Time from submission to dispatch start |
| `taskmill_batch_size` | — | Number of tasks per batch submission call |
| `taskmill_completion_batch_size` | — | Number of completions coalesced per drain cycle |
| `taskmill_failure_batch_size` | — | Number of failures coalesced per drain cycle |

## Recommended Dashboard Layout (Prometheus/Grafana)

### Row 1 — Throughput & Queue Health

| Panel | PromQL | Signal |
|-------|--------|--------|
| Submission rate | `rate(taskmill_tasks_submitted_total[5m])` | How fast work is arriving |
| Throughput | `rate(taskmill_tasks_completed_total[5m])` | How fast work is completing |
| Queue depth | `taskmill_tasks_pending` | Primary health signal |
| Absorption ratio | `rate(taskmill_tasks_dispatched_total[5m]) / rate(taskmill_tasks_submitted_total[5m])` | Values <1.0 = queue growing |

### Row 2 — Failure & Retry Health

| Panel | PromQL | Signal |
|-------|--------|--------|
| Failure rate | `rate(taskmill_tasks_failed_total[5m])` by `retryable` | Transient vs permanent |
| Retry ratio | `rate(taskmill_tasks_retried_total[5m]) / rate(taskmill_tasks_dispatched_total[5m])` | >10% warrants investigation |
| Dead letter rate | `rate(taskmill_tasks_dead_lettered_total[5m])` | Alert on any nonzero |
| Expiry rate | `rate(taskmill_tasks_expired_total[5m])` | Correlate with queue depth |

### Row 3 — Latency Distributions

| Panel | PromQL | Signal |
|-------|--------|--------|
| Execution p50/p95/p99 | `histogram_quantile(0.95, rate(taskmill_task_duration_seconds_bucket[5m]))` | Tail latency |
| Queue wait p50/p95/p99 | `histogram_quantile(0.95, rate(taskmill_task_queue_wait_seconds_bucket[5m]))` | Time in queue |

### Row 4 — Capacity & Bottlenecks

| Panel | PromQL | Signal |
|-------|--------|--------|
| Concurrency utilization | `taskmill_tasks_running / taskmill_max_concurrency` | Sustained 1.0 = at limit |
| Backpressure | `taskmill_pressure` | >0.8 = active throttling |
| Gate denials by reason | `rate(taskmill_gate_denials_total[5m])` by `reason` | Primary bottleneck |

## Suggested Alert Rules

| Alert | Condition | Severity |
|-------|-----------|----------|
| Queue growing | `taskmill_tasks_pending > <threshold>` for 10m | Warning |
| Dead letters | `rate(taskmill_tasks_dead_lettered_total[5m]) > 0` for 5m | Critical |
| High retry ratio | retry ratio > 0.1 for 15m | Warning |
| Sustained backpressure | `taskmill_pressure > 0.9` for 10m | Warning |
| Queue wait SLO breach | p95 queue wait > SLO for 5m | Warning |

## Builder API

```rust
Scheduler::builder()
// Prefix all metric names: "myapp_taskmill_*"
.metrics_prefix("myapp")

// Add global labels to every metric
.metrics_label("service", "ingest")
.metrics_label("env", "production")

// Suppress specific metrics (use unprefixed name)
.disable_metric("task_duration_seconds")

.build()
.await?;
```

## Design Principles

- **Zero-cost when unused** — no overhead when no `metrics` recorder is installed. Internal atomic counters cost a few cache lines.
- **Standard facade** — uses the `metrics` crate so consumers choose their exporter.
- **Source-level instrumentation** — metrics emitted where the event happens, not from a channel subscriber.
- **Bounded label cardinality** — only `type`, `module`, `group`, and `reason` appear as labels. Never `task_id`, `key`, or user-provided `tags`.
- **No allocations on the hot path** — label values are borrowed `&str` or small stack strings.
5 changes: 5 additions & 0 deletions docs/progress-and-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,8 @@ async fn scheduler_status(
scheduler.snapshot().await
}
```

## See also

- [Metrics & Observability](metrics.md) — for production monitoring with Prometheus, StatsD, or Datadog. The `metrics` crate integration emits counters, gauges, and histograms that complement the real-time event stream described above.
- [Query APIs](query-apis.md) — for polling-based dashboards and task inspection.
2 changes: 1 addition & 1 deletion docs/query-apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ These methods operate across all domains and are available directly on `Schedule
| `scheduler.task(id)` | `Option<TaskRecord>` | Look up any active task by ID, regardless of which domain owns it. |
| `scheduler.snapshot()` | `SchedulerSnapshot` | Global aggregates: total running, pending, pressure, progress, and recurring schedules. |

See [Multi-Module Applications](multi-module-apps.md#building-a-cross-module-dashboard) for dashboard patterns using these APIs.
See [Multi-Module Applications](multi-module-apps.md#building-a-cross-module-dashboard) for dashboard patterns using these APIs, and [Metrics & Observability](metrics.md) for production-grade metrics export to Prometheus, StatsD, or Datadog.

## Cancellation

Expand Down
7 changes: 4 additions & 3 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ Work through the topic guides in order:
4. [Persistence & Recovery](persistence-and-recovery.md) — understand crash safety and deduplication
5. [Configuration](configuration.md) — tune for your workload
6. [Query APIs](query-apis.md) — build dashboards and debug stuck tasks
7. [Multi-Module Applications](multi-module-apps.md) — assemble multiple domains, cross-domain dependencies, tags, and dashboards
8. [Writing a Reusable Module](library-modules.md) — publish a domain as a library crate
9. [Design](design.md) — understand the architecture for advanced use
7. [Metrics & Observability](metrics.md) — internal counters, `metrics` crate integration, Prometheus dashboards
8. [Multi-Module Applications](multi-module-apps.md) — assemble multiple domains, cross-domain dependencies, tags, and dashboards
9. [Writing a Reusable Module](library-modules.md) — publish a domain as a library crate
10. [Design](design.md) — understand the architecture for advanced use
29 changes: 26 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,28 @@
//! when providing a custom [`ResourceSampler`]. Without this feature, calling
//! [`SchedulerBuilder::with_resource_monitoring`] requires a custom sampler
//! via [`resource_sampler()`](SchedulerBuilder::resource_sampler).
//! - **`metrics`**: Enables integration with the [`metrics`](https://crates.io/crates/metrics) crate.
//! When enabled, the scheduler emits counters, gauges, and histograms via the `metrics`
//! facade — consumers choose their exporter (Prometheus, StatsD, etc.). Internal atomic
//! counters and [`MetricsSnapshot`] are always available regardless of this feature.
//!
//! # Metrics
//!
//! Taskmill provides two levels of observability:
//!
//! 1. **Always-on internal counters** — [`Scheduler::metrics_snapshot()`] returns a
//! [`MetricsSnapshot`] with cumulative counters (submitted, dispatched, completed,
//! failed, retried, dead-lettered, etc.) and point-in-time gauges (pending, running,
//! pressure, max_concurrency). Available without any feature flags.
//!
//! 2. **`metrics` crate integration** (feature-gated) — when the `metrics` feature is
//! enabled, the scheduler emits ~30 metrics via the standard `metrics` facade. All
//! metric names use the `taskmill_` prefix (customizable via
//! [`SchedulerBuilder::metrics_prefix`]). Labels include `type`, `module`, `group`,
//! and `reason` (bounded cardinality). Configure with:
//! - [`SchedulerBuilder::metrics_prefix`] — custom prefix
//! - [`SchedulerBuilder::metrics_label`] — global labels
//! - [`SchedulerBuilder::disable_metric`] — suppress specific metrics

pub mod backpressure;
pub mod domain;
Expand All @@ -826,9 +848,10 @@ pub use resource::network_pressure::NetworkPressure;
pub use resource::sampler::SamplerConfig;
pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot};
pub use scheduler::{
AgingConfig, EstimatedProgress, GroupAllocationInfo, GroupLimits, PausedGroupInfo,
ProgressReporter, RateLimit, RateLimitInfo, Scheduler, SchedulerBuilder, SchedulerConfig,
SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, TaskProgress,
AgingConfig, EstimatedProgress, GroupAllocationInfo, GroupLimits, MetricsSnapshot,
PausedGroupInfo, ProgressReporter, RateLimit, RateLimitInfo, Scheduler, SchedulerBuilder,
SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader,
TaskProgress,
};
pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore};
pub use task::{
Expand Down
Loading
Loading