Skip to content

Optional queue depth reporting for OpenTelemetry metrics #735

@dahlia

Description

@dahlia

Summary

Add an optional MessageQueue API for reporting queue depth, so Fedify can export backlog metrics through OpenTelemetry.

The current observability milestone includes queue depth metrics. The deployment guide already treats queue depth as one of the main federation health signals, but the current MessageQueue interface has no way to ask a queue how many messages are waiting.

Current state

MessageQueue currently exposes only nativeRetrial, enqueue(), optional enqueueMany(), and listen(). That is enough to process messages, but not enough to emit backlog metrics from Fedify.

FederationQueueOptions can use separate queues for inbox, outbox, and fanout, or a single shared queue for all three. That distinction matters for metrics: if one queue instance is shared, reporting the same depth once per logical role would double- or triple-count the backlog.

The built-in queues differ in what they can count cheaply:

  • InProcessMessageQueue: count the in-memory pending array.
  • RedisMessageQueue: count the sorted set used for queued messages, with ready and delayed counts derived from score if that matches the current schema.
  • PostgresMessageQueue, MysqlMessageQueue, and SqliteMessageQueue: count rows in their queue tables, with ready and delayed counts based on created + delay, deliver_after, or scheduled.
  • AmqpMessageQueue: use RabbitMQ queue declarations/checks for ready message counts where queue names can be enumerated reliably.
  • DenoKvMessageQueue and WorkersMessageQueue: likely cannot expose depth from their underlying queue APIs, so they should be allowed to omit this API.

Proposed solution

Add an optional queue introspection method to MessageQueue. One possible API is:

export interface MessageQueueDepth {
  readonly queued: number;
  readonly ready?: number;
  readonly delayed?: number;
}

export interface MessageQueue {
  readonly nativeRetrial?: boolean;
  enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void>;
  enqueueMany?: (
    messages: readonly any[],
    options?: MessageQueueEnqueueOptions,
  ) => Promise<void>;
  listen(
    handler: (message: any) => Promise<void> | void,
    options?: MessageQueueListenOptions,
  ): Promise<void>;
  getDepth?(): Promise<MessageQueueDepth>;
}

queued should mean messages still waiting in the backend queue. It should not include messages currently being handled by a worker. ready should mean messages eligible for immediate processing. delayed should mean messages scheduled for later delivery. Backends that cannot split ready and delayed messages can return only queued.

Once #619 adds OpenTelemetry metrics support, Fedify can register an observable gauge such as fedify.queue.depth for queues that implement this method.

Suggested attributes:

  • fedify.queue.role: inbox, outbox, fanout, or shared
  • fedify.queue.state: queued, ready, or delayed
  • fedify.queue.backend: best-effort backend name such as redis, postgres, sqlite, or amqp

When the same queue instance is used for multiple roles, Fedify should emit a single shared series rather than one series per role.

Scope

Implement the optional method for queue backends where depth can be read without changing their storage model:

  • InProcessMessageQueue
  • RedisMessageQueue
  • PostgresMessageQueue
  • MysqlMessageQueue
  • SqliteMessageQueue
  • ParallelMessageQueue, by delegating to the wrapped queue

Implement AmqpMessageQueue only for queue names Fedify can enumerate reliably.

Do not add getDepth() to DenoKvMessageQueue or WorkersMessageQueue unless their platform APIs expose a reliable count.

Update docs/manual/mq.md to explain what queue depth means, which implementations support it, and why some platform queues cannot expose it.

Acceptance criteria

  • MessageQueue has a non-breaking optional API for reading queue depth.
  • Supported built-in queue implementations report at least total queued messages.
  • Ready and delayed counts are reported where the backend can distinguish them cheaply.
  • Shared queue instances are not double-counted when exported as metrics.
  • Tests cover InProcessMessageQueue and at least one SQL or Redis-backed queue.
  • The message queue documentation describes the API and implementation support matrix.

Open questions

  • Should the method be named getDepth(), getStats(), or something else?
  • Should in-flight or unacknowledged messages be part of this issue, or should they be tracked separately as worker processing metrics?
  • Should this API try to expose per-message-type depth for shared queues, or should users who need per-role depth use separate queue backends?

Metadata

Metadata

Assignees

Projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions