Summary
Integrate with the Rust metrics crate (or provide a generic metrics trait) to emit counters, gauges, and histograms for scheduler health, throughput, and task lifecycle — enabling Prometheus, StatsD, or custom monitoring backends.
Updated for v0.6 — the original issue was filed against v0.5. This revision accounts for token-bucket rate limiting (#82), group-level pause/resume (#81), batch dispatch coalescing (#80), the DispatchGate admission control system, and the expanded SchedulerEvent enum.
Motivation
For long-running scheduler instances, operators need to monitor:
- Is the queue growing or draining? (Are we keeping up?)
- What's the current throughput? (tasks/sec)
- How many retries are happening? (Is the network degrading?)
- Are tasks expiring or being superseded? (Is the workload too bursty?)
- Are rate limits firing? (Are we over-provisioning or under-provisioning token budgets?)
- Are group pauses effective? (How many tasks are held back, and for how long?)
- Is the dispatch gate admitting or denying work? (Where's the bottleneck?)
TaskMill's SchedulerEvent broadcast provides real-time events, and SchedulerSnapshot provides point-in-time state, but building dashboards and alerting from these requires consumers to maintain their own counters and aggregation logic. Emitting standard metrics crate metrics gives instant compatibility with Prometheus (metrics-exporter-prometheus), StatsD, and any other backend.
Current Observability State
What exists today (v0.6):
- ~80
tracing::*!() calls across the codebase (debug/info/warn/error levels)
SchedulerEvent enum with 17 variants covering the full task lifecycle (Dispatched, Completed, Failed, Preempted, Cancelled, Superseded, Progress, Waiting, BatchSubmitted, TaskExpired, RecurringSkipped, RecurringCompleted, TaskUnblocked, DeadLettered, DependencyFailed, GroupPaused, GroupResumed, Paused, Resumed)
SchedulerSnapshot struct for point-in-time dashboard queries (includes rate_limits: Vec<RateLimitInfo> and paused_groups: Vec<PausedGroupInfo>)
RateLimitInfo snapshot with available_tokens per scope
TaskEventHeader with task_type, module, key, label, and tags metadata
What does NOT exist:
- No
metrics crate, prometheus, or opentelemetry dependency
- No counters, gauges, or histograms
- No
#[instrument] annotations
- No continuous metrics for rate limiting (only point-in-time snapshots)
- No gate admission/denial tracking
- No task duration or queue-wait-time measurement
- No batch size distribution tracking
Proposed Metrics
Counters (monotonically increasing)
| Metric |
Labels |
Description |
taskmill_tasks_submitted_total |
type, module, group |
Total tasks submitted |
taskmill_tasks_dispatched_total |
type, module, group |
Total tasks dispatched (entered running state) |
taskmill_tasks_completed_total |
type, module, group |
Total tasks completed successfully |
taskmill_tasks_failed_total |
type, module, group, retryable (true/false) |
Total task failures |
taskmill_tasks_retried_total |
type, module, group |
Total retry requeue attempts |
taskmill_tasks_dead_lettered_total |
type, module, group |
Tasks that exhausted retries |
taskmill_tasks_superseded_total |
type, module, group |
Tasks replaced by supersede |
taskmill_tasks_cancelled_total |
type, module, group |
Tasks cancelled by application |
taskmill_tasks_expired_total |
type, module, group |
Tasks expired via TTL |
taskmill_tasks_preempted_total |
type, module |
Tasks preempted by higher-priority work |
taskmill_batches_submitted_total |
— |
Total batch submissions |
taskmill_gate_denials_total |
reason (backpressure, io_budget_disk, io_budget_net, group_paused, group_concurrency, module_concurrency) |
Dispatch gate admission denials by reason |
taskmill_rate_limit_denials_total |
scope_kind (type/group), scope |
Rate limit token acquisition failures |
taskmill_group_pauses_total |
group |
Group pause events |
taskmill_group_resumes_total |
group |
Group resume events (including auto-resume) |
taskmill_dependency_failures_total |
— |
Blocked tasks cancelled due to dependency failure |
taskmill_recurring_skipped_total |
type, module |
Recurring instances skipped (pile-up prevention) |
Gauges (current value)
| Metric |
Labels |
Description |
taskmill_tasks_pending |
— |
Current pending task count |
taskmill_tasks_running |
— |
Current running task count |
taskmill_tasks_blocked |
— |
Tasks waiting on dependencies |
taskmill_tasks_paused |
— |
Tasks in preemption-paused state |
taskmill_tasks_waiting |
— |
Parent tasks waiting for children |
taskmill_max_concurrency |
— |
Current max concurrency setting |
taskmill_pressure |
— |
Aggregate backpressure level (0.0–1.0) |
taskmill_pressure_source |
source |
Per-source pressure breakdown |
taskmill_groups_paused |
— |
Number of currently paused groups |
taskmill_rate_limit_tokens_available |
scope_kind, scope |
Current available tokens per rate-limit scope |
taskmill_module_running |
module |
Running tasks per module (atomic counters already exist in code) |
Histograms (distribution)
| Metric |
Labels |
Description |
taskmill_task_duration_seconds |
type, module, status (completed/failed) |
Execution wall-clock time (started_at → now) |
taskmill_task_queue_wait_seconds |
type, module |
Time from submission to dispatch start |
taskmill_batch_size |
— |
Number of tasks per batch submission |
taskmill_completion_batch_size |
— |
Number of completions per drain cycle |
taskmill_failure_batch_size |
— |
Number of failures per drain cycle |
Proposed API
Feature-gated integration
[dependencies]
taskmill = { version = "0.6", features = ["metrics"] }
When the metrics feature is enabled, the scheduler automatically emits metrics via the metrics crate's global recorder. Consumers configure the backend:
// Consumer sets up their preferred exporter
let recorder = PrometheusBuilder::new().build_recorder();
metrics::set_global_recorder(recorder)?;
// TaskMill automatically emits to whatever recorder is installed
let scheduler = Scheduler::builder()
.metrics_prefix("myapp") // optional prefix: "myapp_taskmill_tasks_submitted_total"
.build()
.await?;
Custom labels
Consumers can add global labels that appear on all metrics:
Scheduler::builder()
.metrics_label("service", "mantle")
.metrics_label("instance", hostname)
.build()
.await?;
Opt-out per metric
For high-cardinality scenarios (many groups, many task types), consumers can disable specific metrics:
Scheduler::builder()
.disable_metric("taskmill_task_duration_seconds") // histograms are expensive
.build()
.await?;
Implementation Strategy
Where to emit metrics
Metrics should be emitted from the following instrumentation points (not from the event broadcast channel, to avoid coupling and ensure zero-cost when no recorder is installed):
-
Dispatch gate (gate.rs) — increment gate_denials_total with reason label at each Admission::Deny / Admission::RateLimited return. Also increment rate_limit_denials_total per scope.
-
Run loop (run_loop.rs) — update gauges in poll_and_dispatch() cycle: pending/running/blocked/paused counts (from SchedulerSnapshot fields), pressure level. Record completion_batch_size and failure_batch_size histograms in drain_completions() / drain_failures().
-
Task spawn (spawn/mod.rs) — increment dispatched_total counter when a task enters running state. Record queue-wait histogram (started_at - created_at).
-
Completion handler (spawn/completion.rs) — increment completed_total, record task_duration_seconds histogram.
-
Failure handler (spawn/failure.rs) — increment failed_total (with retryable label), retried_total, dead_lettered_total. Record duration histogram for failed tasks.
-
Submit path (submit.rs) — increment submitted_total, superseded_total, batches_submitted_total. Record batch_size histogram.
-
Control operations (control.rs) — increment group_pauses_total / group_resumes_total, update groups_paused gauge.
-
Rate limit snapshots (rate_limit.rs) — update rate_limit_tokens_available gauges from snapshot_info() periodically (e.g. each poll cycle).
MetricsSnapshot for non-metrics consumers
For CLIs and UIs that want stats without the metrics crate, provide a MetricsSnapshot struct that aggregates counters from atomics:
// Available without the "metrics" feature
let snapshot = scheduler.metrics_snapshot();
println!("dispatched: {}, completed: {}", snapshot.dispatched, snapshot.completed);
This can be built from internal AtomicU64 counters that are always maintained (they're cheap), with the metrics feature adding the bridge to the metrics facade.
Design Considerations
- The
metrics crate is the standard Rust metrics facade (like log/tracing for logging). It's zero-cost when no recorder is installed, making the feature flag lightweight.
- Label cardinality must be bounded —
type, module, and group are fine, but task_id, dedup_key, or key as labels would blow up storage. User-provided metadata tags should NOT automatically become metric labels.
- Histogram buckets for duration metrics should have sensible defaults for IO-bound workloads (e.g. 0.01s, 0.05s, 0.1s, 0.25s, 0.5s, 1s, 2.5s, 5s, 10s, 30s, 60s, 300s).
- Gate denial reasons are a new dimension that didn't exist in v0.5 — the
DefaultDispatchGate has 7 distinct denial paths (backpressure, disk IO budget, network IO budget, group paused, group concurrency, module concurrency, rate limited) that should each be separately tracked.
- Batch coalescing (completions and failures are batched via channels and processed in
drain_completions/drain_failures) means metrics must be emitted per-task within the batch, not per-batch.
- Fast dispatch path (batch
pop_next_batch when no gate checks are needed) bypasses the gate entirely, so gate denial metrics won't fire — this is correct behavior, not a gap.
- Module-level running counts already exist as
AtomicUsize in the scheduler (module_running: HashMap<String, AtomicUsize>) — these can feed the module_running gauge directly.
- The existing
SchedulerSnapshot struct already computes many of the gauge values (pending_count, paused_count, waiting_count, blocked_count, pressure, rate_limits) — the metrics layer should reuse this computation rather than duplicate it.
Summary
Integrate with the Rust
metricscrate (or provide a generic metrics trait) to emit counters, gauges, and histograms for scheduler health, throughput, and task lifecycle — enabling Prometheus, StatsD, or custom monitoring backends.Motivation
For long-running scheduler instances, operators need to monitor:
TaskMill's
SchedulerEventbroadcast provides real-time events, andSchedulerSnapshotprovides point-in-time state, but building dashboards and alerting from these requires consumers to maintain their own counters and aggregation logic. Emitting standardmetricscrate metrics gives instant compatibility with Prometheus (metrics-exporter-prometheus), StatsD, and any other backend.Current Observability State
What exists today (v0.6):
tracing::*!()calls across the codebase (debug/info/warn/error levels)SchedulerEventenum with 17 variants covering the full task lifecycle (Dispatched, Completed, Failed, Preempted, Cancelled, Superseded, Progress, Waiting, BatchSubmitted, TaskExpired, RecurringSkipped, RecurringCompleted, TaskUnblocked, DeadLettered, DependencyFailed, GroupPaused, GroupResumed, Paused, Resumed)SchedulerSnapshotstruct for point-in-time dashboard queries (includesrate_limits: Vec<RateLimitInfo>andpaused_groups: Vec<PausedGroupInfo>)RateLimitInfosnapshot withavailable_tokensper scopeTaskEventHeaderwithtask_type,module,key,label, andtagsmetadataWhat does NOT exist:
metricscrate,prometheus, oropentelemetrydependency#[instrument]annotationsProposed Metrics
Counters (monotonically increasing)
taskmill_tasks_submitted_totaltype,module,grouptaskmill_tasks_dispatched_totaltype,module,grouptaskmill_tasks_completed_totaltype,module,grouptaskmill_tasks_failed_totaltype,module,group,retryable(true/false)taskmill_tasks_retried_totaltype,module,grouptaskmill_tasks_dead_lettered_totaltype,module,grouptaskmill_tasks_superseded_totaltype,module,grouptaskmill_tasks_cancelled_totaltype,module,grouptaskmill_tasks_expired_totaltype,module,grouptaskmill_tasks_preempted_totaltype,moduletaskmill_batches_submitted_totaltaskmill_gate_denials_totalreason(backpressure, io_budget_disk, io_budget_net, group_paused, group_concurrency, module_concurrency)taskmill_rate_limit_denials_totalscope_kind(type/group),scopetaskmill_group_pauses_totalgrouptaskmill_group_resumes_totalgrouptaskmill_dependency_failures_totaltaskmill_recurring_skipped_totaltype,moduleGauges (current value)
taskmill_tasks_pendingtaskmill_tasks_runningtaskmill_tasks_blockedtaskmill_tasks_pausedtaskmill_tasks_waitingtaskmill_max_concurrencytaskmill_pressuretaskmill_pressure_sourcesourcetaskmill_groups_pausedtaskmill_rate_limit_tokens_availablescope_kind,scopetaskmill_module_runningmoduleHistograms (distribution)
taskmill_task_duration_secondstype,module,status(completed/failed)taskmill_task_queue_wait_secondstype,moduletaskmill_batch_sizetaskmill_completion_batch_sizetaskmill_failure_batch_sizeProposed API
Feature-gated integration
When the
metricsfeature is enabled, the scheduler automatically emits metrics via themetricscrate's global recorder. Consumers configure the backend:Custom labels
Consumers can add global labels that appear on all metrics:
Opt-out per metric
For high-cardinality scenarios (many groups, many task types), consumers can disable specific metrics:
Implementation Strategy
Where to emit metrics
Metrics should be emitted from the following instrumentation points (not from the event broadcast channel, to avoid coupling and ensure zero-cost when no recorder is installed):
Dispatch gate (gate.rs) — increment
gate_denials_totalwithreasonlabel at eachAdmission::Deny/Admission::RateLimitedreturn. Also incrementrate_limit_denials_totalper scope.Run loop (run_loop.rs) — update gauges in
poll_and_dispatch()cycle: pending/running/blocked/paused counts (fromSchedulerSnapshotfields), pressure level. Recordcompletion_batch_sizeandfailure_batch_sizehistograms indrain_completions()/drain_failures().Task spawn (spawn/mod.rs) — increment
dispatched_totalcounter when a task enters running state. Record queue-wait histogram (started_at - created_at).Completion handler (spawn/completion.rs) — increment
completed_total, recordtask_duration_secondshistogram.Failure handler (spawn/failure.rs) — increment
failed_total(withretryablelabel),retried_total,dead_lettered_total. Record duration histogram for failed tasks.Submit path (submit.rs) — increment
submitted_total,superseded_total,batches_submitted_total. Recordbatch_sizehistogram.Control operations (control.rs) — increment
group_pauses_total/group_resumes_total, updategroups_pausedgauge.Rate limit snapshots (rate_limit.rs) — update
rate_limit_tokens_availablegauges fromsnapshot_info()periodically (e.g. each poll cycle).MetricsSnapshotfor non-metricsconsumersFor CLIs and UIs that want stats without the
metricscrate, provide aMetricsSnapshotstruct that aggregates counters from atomics:This can be built from internal
AtomicU64counters that are always maintained (they're cheap), with themetricsfeature adding the bridge to themetricsfacade.Design Considerations
metricscrate is the standard Rust metrics facade (likelog/tracingfor logging). It's zero-cost when no recorder is installed, making the feature flag lightweight.type,module, andgroupare fine, buttask_id,dedup_key, orkeyas labels would blow up storage. User-provided metadatatagsshould NOT automatically become metric labels.DefaultDispatchGatehas 7 distinct denial paths (backpressure, disk IO budget, network IO budget, group paused, group concurrency, module concurrency, rate limited) that should each be separately tracked.drain_completions/drain_failures) means metrics must be emitted per-task within the batch, not per-batch.pop_next_batchwhen no gate checks are needed) bypasses the gate entirely, so gate denial metrics won't fire — this is correct behavior, not a gap.AtomicUsizein the scheduler (module_running: HashMap<String, AtomicUsize>) — these can feed themodule_runninggauge directly.SchedulerSnapshotstruct already computes many of the gauge values (pending_count, paused_count, waiting_count, blocked_count, pressure, rate_limits) — the metrics layer should reuse this computation rather than duplicate it.