From 636a399e0b239ec7f9878acc978e3587ecbcb499 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 15:18:57 +0900 Subject: [PATCH 01/20] Add benchmark mode federation option Benchmark targets need a cooperative mode that enables safe-by-default benchmarking relaxations without changing normal federation behavior. This adds the public option and threads it through both createFederation() and FederationBuilder.build(), while preserving explicit option precedence and custom loader behavior. https://github.com/fedify-dev/fedify/issues/782 Assisted-by: Codex:gpt-5.5 --- .../fedify/src/federation/builder.test.ts | 12 +++ packages/fedify/src/federation/federation.ts | 14 ++++ .../fedify/src/federation/middleware.test.ts | 84 +++++++++++++++++++ packages/fedify/src/federation/middleware.ts | 31 ++++++- 4 files changed, 137 insertions(+), 4 deletions(-) diff --git a/packages/fedify/src/federation/builder.test.ts b/packages/fedify/src/federation/builder.test.ts index 038741f94..f4aab0ed0 100644 --- a/packages/fedify/src/federation/builder.test.ts +++ b/packages/fedify/src/federation/builder.test.ts @@ -166,6 +166,18 @@ test("FederationBuilder", async (t) => { }, ); + await t.step("passes benchmarkMode to the built federation", async () => { + const builder = createFederationBuilder(); + const federation = await builder.build({ + kv: new MemoryKvStore(), + benchmarkMode: true, + }); + const impl = federation as FederationImpl; + assertEquals(impl.benchmarkMode, true); + assertEquals(impl.allowPrivateAddress, true); + assertEquals(impl.signatureTimeWindow, false); + }); + await t.step("should snapshot router state on build", async () => { const builder = createFederationBuilder(); const kv = new MemoryKvStore(); diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index 7d650650c..c3fa5efbc 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -934,6 +934,20 @@ export interface FederationOptions { */ allowPrivateAddress?: boolean; + /** + * Whether to enable cooperative benchmark mode. This mode exposes + * benchmark-only endpoints and relaxes selected defaults for benchmark + * targets. Do not enable this option in production. + * + * When enabled, {@link FederationOptions.allowPrivateAddress} defaults to + * `true` unless a custom document loader is configured, and + * {@link FederationOptions.signatureTimeWindow} defaults to `false`. + * + * Turned off by default. + * @since 2.3.0 + */ + benchmarkMode?: boolean; + /** * Options for making `User-Agent` strings for HTTP requests. * If a string is provided, it is used as the `User-Agent` header. diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index d669daefd..b0287cf89 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -97,6 +97,90 @@ test("createFederation()", async (t) => { }), TypeError); }); + await t.step("benchmarkMode applies cooperative benchmark defaults", () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.allowPrivateAddress, true); + assertEquals(federation.signatureTimeWindow, false); + }); + + await t.step("benchmarkMode preserves explicit option overrides", () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + allowPrivateAddress: false, + signatureTimeWindow: { minutes: 10 }, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.allowPrivateAddress, false); + assertEquals(federation.signatureTimeWindow, { minutes: 10 }); + }); + + await t.step("benchmarkMode leaves custom loader factories alone", () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + documentLoaderFactory: () => mockDocumentLoader, + contextLoaderFactory: () => mockDocumentLoader, + authenticatedDocumentLoaderFactory: () => mockDocumentLoader, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.allowPrivateAddress, false); + }); + + await t.step("benchmarkMode rejects an explicit meterProvider", () => { + const [meterProvider] = createTestMeterProvider(); + assertThrows( + () => + createFederation({ + kv, + benchmarkMode: true, + meterProvider, + }), + TypeError, + "benchmarkMode requires Fedify to own the meterProvider", + ); + }); + + await t.step( + "benchmarkMode warns that benchmark-only relaxations are on", + async () => { + await withLogtapeLock(async () => { + const records: LogRecord[] = []; + await reset(); + try { + await configure({ + sinks: { + test(record) { + records.push(record); + }, + }, + loggers: [ + { + category: ["fedify", "federation", "benchmark"], + lowestLevel: "warning", + sinks: ["test"], + }, + ], + }); + createFederation({ kv, benchmarkMode: true }); + assertEquals(records.length, 1); + assertEquals(records[0].level, "warning"); + assertEquals( + records[0].rawMessage, + "Fedify benchmarkMode is enabled; benchmark-only relaxations are " + + "active and must not be used in production.", + ); + } finally { + await reset(); + } + }); + }, + ); + await t.step("origin", () => { const f = createFederation({ kv, origin: "http://example.com:8080" }); assertInstanceOf(f, FederationImpl); diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 105608188..7adf331e4 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -457,9 +457,32 @@ export class FederationImpl _meterProvider: MeterProvider | undefined; firstKnock?: HttpMessageSignaturesSpec; inboxChallengePolicy?: InboxChallengePolicy; + benchmarkMode: boolean; constructor(options: FederationOptions) { super(); + const benchmarkMode = options.benchmarkMode ?? false; + const hasCustomLoaderFactory = options.documentLoaderFactory != null || + options.contextLoaderFactory != null || + options.authenticatedDocumentLoaderFactory != null; + const allowPrivateAddress = options.allowPrivateAddress ?? + (benchmarkMode && !hasCustomLoaderFactory ? true : false); + const signatureTimeWindow = options.signatureTimeWindow ?? + (benchmarkMode ? false : { hours: 1 }); + if (benchmarkMode && options.meterProvider != null) { + throw new TypeError( + "benchmarkMode requires Fedify to own the meterProvider; " + + "OpenTelemetry metric readers cannot be added after a " + + "MeterProvider is constructed.", + ); + } + if (benchmarkMode) { + getLogger(["fedify", "federation", "benchmark"]).warn( + "Fedify benchmarkMode is enabled; benchmark-only relaxations are " + + "active and must not be used in production.", + ); + } + this.benchmarkMode = benchmarkMode; this.kv = options.kv; this.kvPrefixes = { ...({ @@ -566,7 +589,7 @@ export class FederationImpl this.router.trailingSlashInsensitive = options.trailingSlashInsensitive ?? false; this._initializeRouter(); - if (options.allowPrivateAddress || options.userAgent != null) { + if (allowPrivateAddress || options.userAgent != null) { if (options.documentLoaderFactory != null) { throw new TypeError( "Cannot set documentLoaderFactory with allowPrivateAddress or " + @@ -586,8 +609,8 @@ export class FederationImpl ); } } - const { allowPrivateAddress, userAgent } = options; - this.allowPrivateAddress = allowPrivateAddress ?? false; + const { userAgent } = options; + this.allowPrivateAddress = allowPrivateAddress; // The loader factory closures below read `this._meterProvider` at // call time, not when they are created. Factories are only invoked // after the constructor has assigned `_meterProvider` (see below), so @@ -685,7 +708,7 @@ export class FederationImpl this.onOutboxError = options.onOutboxError; this.permanentFailureStatusCodes = options.permanentFailureStatusCodes ?? [404, 410]; - this.signatureTimeWindow = options.signatureTimeWindow ?? { hours: 1 }; + this.signatureTimeWindow = signatureTimeWindow; this.skipSignatureVerification = options.skipSignatureVerification ?? false; this.inboxChallengePolicy = options.inboxChallengePolicy; this.outboxRetryPolicy = options.outboxRetryPolicy ?? From 06413f8bb404fa6b7e513494a2e6fd9646148148 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 15:26:41 +0900 Subject: [PATCH 02/20] Expose benchmark metrics snapshots The benchmark CLI needs target-side metrics without requiring a separate OpenTelemetry backend. This adds the benchmark-owned in-process reader, the stats endpoint, queue depth observations, and explicit signature verification buckets for low-latency measurements. https://github.com/fedify-dev/fedify/issues/782 Assisted-by: Codex:gpt-5.5 --- packages/fedify/package.json | 2 +- packages/fedify/src/federation/bench.ts | 146 ++++++++++++++++++ .../fedify/src/federation/metrics.test.ts | 52 +++++++ packages/fedify/src/federation/metrics.ts | 101 ++++++++++++ .../fedify/src/federation/middleware.test.ts | 80 ++++++++++ packages/fedify/src/federation/middleware.ts | 29 +++- pnpm-lock.yaml | 6 +- 7 files changed, 411 insertions(+), 5 deletions(-) create mode 100644 packages/fedify/src/federation/bench.ts diff --git a/packages/fedify/package.json b/packages/fedify/package.json index d5e13579a..216e72cd2 100644 --- a/packages/fedify/package.json +++ b/packages/fedify/package.json @@ -147,6 +147,7 @@ "@logtape/logtape": "catalog:", "@opentelemetry/api": "catalog:", "@opentelemetry/core": "catalog:", + "@opentelemetry/sdk-metrics": "catalog:", "@opentelemetry/sdk-trace-base": "catalog:", "@opentelemetry/semantic-conventions": "catalog:", "byte-encodings": "catalog:", @@ -159,7 +160,6 @@ "devDependencies": { "@fedify/fixture": "workspace:*", "@fedify/vocab-tools": "workspace:^", - "@opentelemetry/sdk-metrics": "catalog:", "@std/assert": "jsr:^0.226.0", "@std/path": "catalog:", "@types/node": "^24.2.1", diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts new file mode 100644 index 000000000..219686c1b --- /dev/null +++ b/packages/fedify/src/federation/bench.ts @@ -0,0 +1,146 @@ +import { + DataPointType, + type ExponentialHistogram, + type Histogram, + MeterProvider, + type MetricData, + MetricReader, + type ResourceMetrics, + type ScopeMetrics, +} from "@opentelemetry/sdk-metrics"; + +/** + * Metric reader owned by `benchmarkMode`. + */ +export class BenchmarkMetricReader extends MetricReader { + protected onShutdown(): Promise { + return Promise.resolve(); + } + + protected onForceFlush(): Promise { + return Promise.resolve(); + } +} + +export function createBenchmarkMeterProvider(): { + readonly meterProvider: MeterProvider; + readonly reader: BenchmarkMetricReader; +} { + const reader = new BenchmarkMetricReader(); + return { + meterProvider: new MeterProvider({ readers: [reader] }), + reader, + }; +} + +export interface BenchmarkMetricSnapshot { + readonly version: 1; + readonly source: "server"; + readonly generatedAt: string; + readonly scopeMetrics: readonly BenchmarkScopeMetrics[]; + readonly errors: readonly string[]; +} + +export interface BenchmarkScopeMetrics { + readonly scope: { + readonly name: string; + readonly version?: string; + }; + readonly metrics: readonly BenchmarkMetric[]; +} + +export interface BenchmarkMetric { + readonly name: string; + readonly description: string; + readonly unit: string; + readonly dataPointType: + | "histogram" + | "exponential_histogram" + | "gauge" + | "sum"; + readonly dataPoints: readonly BenchmarkDataPoint[]; +} + +export interface BenchmarkDataPoint { + readonly attributes: Record; + readonly startTime: readonly [number, number]; + readonly endTime: readonly [number, number]; + readonly value: + | number + | Histogram + | ExponentialHistogram; +} + +export async function collectBenchmarkMetrics( + reader: BenchmarkMetricReader, +): Promise { + const result = await reader.collect(); + return { + version: 1, + source: "server", + generatedAt: new Date().toISOString(), + scopeMetrics: serializeScopeMetrics(result.resourceMetrics), + errors: result.errors.map((error) => String(error)), + }; +} + +export async function handleBenchmarkStats( + request: Request, + reader: BenchmarkMetricReader, +): Promise { + if (request.method !== "GET") { + return new Response("Method not allowed", { + status: 405, + headers: { "Allow": "GET" }, + }); + } + return new Response(JSON.stringify(await collectBenchmarkMetrics(reader)), { + headers: { "Content-Type": "application/json" }, + }); +} + +function serializeScopeMetrics( + resourceMetrics: ResourceMetrics, +): readonly BenchmarkScopeMetrics[] { + return resourceMetrics.scopeMetrics.map(serializeScope); +} + +function serializeScope(scopeMetrics: ScopeMetrics): BenchmarkScopeMetrics { + return { + scope: { + name: scopeMetrics.scope.name, + version: scopeMetrics.scope.version, + }, + metrics: scopeMetrics.metrics.map(serializeMetric), + }; +} + +function serializeMetric(metric: MetricData): BenchmarkMetric { + return { + name: metric.descriptor.name, + description: metric.descriptor.description, + unit: metric.descriptor.unit, + dataPointType: serializeDataPointType(metric.dataPointType), + dataPoints: metric.dataPoints.map((point) => ({ + attributes: { ...point.attributes }, + startTime: point.startTime, + endTime: point.endTime, + value: point.value, + })), + }; +} + +function serializeDataPointType( + dataPointType: DataPointType, +): BenchmarkMetric["dataPointType"] { + switch (dataPointType) { + case DataPointType.HISTOGRAM: + return "histogram"; + case DataPointType.EXPONENTIAL_HISTOGRAM: + return "exponential_histogram"; + case DataPointType.GAUGE: + return "gauge"; + case DataPointType.SUM: + return "sum"; + } +} diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts index 0392fb7d5..6de1f2fa2 100644 --- a/packages/fedify/src/federation/metrics.test.ts +++ b/packages/fedify/src/federation/metrics.test.ts @@ -1,10 +1,17 @@ import { createTestMeterProvider, test } from "@fedify/fixture"; import { assertEquals, assertRejects } from "@std/assert"; +import { + DataPointType, + type HistogramMetricData, + MeterProvider, + MetricReader, +} from "@opentelemetry/sdk-metrics"; import type { DocumentLoader, RemoteDocument } from "@fedify/vocab-runtime"; import { FetchError } from "@fedify/vocab-runtime"; import type { MessageQueue } from "./mq.ts"; import { classifyFetchError, + getFederationMetrics, getRemoteHost, instrumentDocumentLoader, recordCircuitBreakerStateChange, @@ -22,6 +29,16 @@ import { recordWebFingerHandle, } from "./metrics.ts"; +class TestMetricReader extends MetricReader { + protected onShutdown(): Promise { + return Promise.resolve(); + } + + protected onForceFlush(): Promise { + return Promise.resolve(); + } +} + const noopQueue: MessageQueue = { enqueue() { return Promise.resolve(); @@ -79,6 +96,41 @@ test("recordFanoutRecipients() omits activity type when unknown", () => { ); }); +test("signature verification duration uses explicit low-latency buckets", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + getFederationMetrics(meterProvider).recordSignatureVerificationDuration( + 7, + "http", + "verified", + ); + + const result = await reader.collect(); + const metric = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => + metric.descriptor.name === "activitypub.signature.verification.duration" + ); + assertEquals(metric?.dataPointType, DataPointType.HISTOGRAM); + const histogram = metric as HistogramMetricData | undefined; + assertEquals(histogram?.dataPoints[0].value.buckets.boundaries, [ + 0.1, + 0.25, + 0.5, + 1, + 2.5, + 5, + 10, + 25, + 50, + 100, + 250, + 500, + 1000, + ]); + await meterProvider.shutdown(); +}); + test("recordInboxActivity() records counter with result and activity type", () => { const [meterProvider, recorder] = createTestMeterProvider(); for ( diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index b69b32ac7..b12e2e01d 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -6,6 +6,7 @@ import { type Histogram, type MeterProvider, metrics, + type ObservableGauge, type UpDownCounter, } from "@opentelemetry/api"; import metadata from "../../deno.json" with { type: "json" }; @@ -92,6 +93,15 @@ export interface QueueTaskCommonAttributes { activityType?: string; } +/** + * A queue to observe for `fedify.queue.depth`. + * @since 2.3.0 + */ +export interface QueueDepthGaugeEntry { + role: QueueTaskRole; + queue?: MessageQueue; +} + /** * The kind of ActivityPub signature verified, used as the * `activitypub.signature.kind` metric attribute. @@ -478,6 +488,7 @@ class FederationMetrics { readonly queueTaskFailed: Counter; readonly queueTaskDuration: Histogram; readonly queueTaskInFlight: UpDownCounter; + readonly queueDepth: ObservableGauge; readonly fanoutRecipients: Histogram; readonly inboxActivity: Counter; readonly outboxActivity: Counter; @@ -521,6 +532,23 @@ class FederationMetrics { "Duration of ActivityPub signature verification, including local " + "key lookup and remote key fetches.", unit: "ms", + advice: { + explicitBucketBoundaries: [ + 0.1, + 0.25, + 0.5, + 1, + 2.5, + 5, + 10, + 25, + 50, + 100, + 250, + 500, + 1000, + ], + }, }, ); this.signatureKeyFetchDuration = meter.createHistogram( @@ -635,6 +663,12 @@ class FederationMetrics { unit: "{task}", }, ); + this.queueDepth = meter.createObservableGauge("fedify.queue.depth", { + description: + "Messages waiting in configured Fedify queues, as reported by the " + + "queue backend.", + unit: "{message}", + }); this.fanoutRecipients = meter.createHistogram( "activitypub.fanout.recipients", { @@ -1167,6 +1201,73 @@ export function getQueueBackend(queue?: MessageQueue): string | undefined { return name; } +/** + * Registers a callback for observing queue backend depth. + * @since 2.3.0 + */ +export function registerQueueDepthGauge( + meterProvider: MeterProvider, + entries: readonly QueueDepthGaugeEntry[], +): void { + const uniqueQueues = new Map(); + for (const { role, queue } of entries) { + if (queue?.getDepth == null) continue; + const roles = uniqueQueues.get(queue); + if (roles == null) { + uniqueQueues.set(queue, [role]); + } else if (!roles.includes(role)) { + roles.push(role); + } + } + if (uniqueQueues.size < 1) return; + const gauge = getFederationMetrics(meterProvider).queueDepth; + gauge.addCallback(async (observableResult) => { + for (const [queue, roles] of uniqueQueues) { + const depth = await queue.getDepth!(); + const attributes = buildQueueDepthAttributes(queue, roles); + observableResult.observe(depth.queued, { + ...attributes, + "fedify.queue.depth.state": "queued", + }); + if (depth.ready != null) { + observableResult.observe(depth.ready, { + ...attributes, + "fedify.queue.depth.state": "ready", + }); + } + if (depth.delayed != null) { + observableResult.observe(depth.delayed, { + ...attributes, + "fedify.queue.depth.state": "delayed", + }); + } + } + }); +} + +function buildQueueDepthAttributes( + queue: MessageQueue, + roles: readonly QueueTaskRole[], +): Attributes { + const sortedRoles = [...roles].sort(); + const role = sortedRoles.length === 1 ? sortedRoles[0] : "shared"; + const attributes: Attributes = { + "fedify.queue.role": role, + }; + if (role === "shared") { + attributes["fedify.queue.roles"] = sortedRoles.join(","); + } + const backend = getQueueBackend(queue); + if (backend != null) { + attributes["fedify.queue.backend"] = backend; + } + const nativeRetrial = queue.nativeRetrial; + if (typeof nativeRetrial === "boolean") { + attributes["fedify.queue.native_retrial"] = nativeRetrial; + } + return attributes; +} + /** * Records `fedify.queue.task.enqueued` for an outgoing outbox enqueue and, * for the initial attempt, also records diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index b0287cf89..c2b55e2a0 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -64,6 +64,7 @@ import { InboxContextImpl, KvSpecDeterminer, } from "./middleware.ts"; +import { recordInboxActivity } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import type { InboxMessage, Message, OutboxMessage } from "./queue.ts"; @@ -298,6 +299,85 @@ test("createFederation()", async (t) => { }); }); +test("benchmarkMode stats endpoint", async (t) => { + await t.step("is absent when benchmarkMode is off", async () => { + const federation = createFederation({ kv: new MemoryKvStore() }); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/stats"), + { contextData: undefined }, + ); + assertEquals(response.status, 404); + }); + + await t.step("returns a v1 in-process metrics snapshot", async () => { + const queue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued: 3, ready: 2, delayed: 1 }); + }, + }; + const federation = createFederation({ + kv: new MemoryKvStore(), + benchmarkMode: true, + queue, + }); + recordInboxActivity( + (federation as FederationImpl).meterProvider, + "processed", + vocab.Create.typeId.href, + ); + + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/stats"), + { contextData: undefined }, + ); + + assertEquals(response.status, 200); + assertEquals(response.headers.get("Content-Type"), "application/json"); + const body = await response.json() as { + version: number; + source: string; + generatedAt: string; + scopeMetrics: { + metrics: { + name: string; + dataPointType: string; + dataPoints: { attributes: Record; value: unknown }[]; + }[]; + }[]; + }; + assertEquals(body.version, 1); + assertEquals(body.source, "server"); + assertEquals(Number.isNaN(Date.parse(body.generatedAt)), false); + const metrics = body.scopeMetrics.flatMap((scope) => scope.metrics); + assertExists( + metrics.find((metric) => metric.name === "activitypub.inbox.activity"), + ); + const queueDepth = metrics.find((metric) => + metric.name === "fedify.queue.depth" + ); + assertExists(queueDepth); + assertEquals(queueDepth.dataPointType, "gauge"); + assertEquals( + queueDepth.dataPoints.map((point) => ({ + state: point.attributes["fedify.queue.depth.state"], + role: point.attributes["fedify.queue.role"], + value: point.value, + })).sort((a, b) => String(a.state).localeCompare(String(b.state))), + [ + { state: "delayed", role: "shared", value: 1 }, + { state: "queued", role: "shared", value: 3 }, + { state: "ready", role: "shared", value: 2 }, + ], + ); + }); +}); + test({ name: "Federation.createContext()", permissions: { env: true, read: true }, diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 7adf331e4..279760b3f 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -57,6 +57,11 @@ import type { ActivityTransformer } from "../compat/types.ts"; import { getNodeInfo, type GetNodeInfoOptions } from "../nodeinfo/client.ts"; import { handleNodeInfo, handleNodeInfoJrd } from "../nodeinfo/handler.ts"; import type { JsonValue, NodeInfo } from "../nodeinfo/types.ts"; +import { + type BenchmarkMetricReader, + createBenchmarkMeterProvider, + handleBenchmarkStats, +} from "./bench.ts"; import { type HttpMessageSignaturesSpec, type HttpMessageSignaturesSpecDeterminer, @@ -139,6 +144,7 @@ import { recordInboxActivity, recordOutboxActivity, recordOutboxEnqueue, + registerQueueDepthGauge, } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import { acceptsJsonLd } from "./negotiation.ts"; @@ -458,6 +464,7 @@ export class FederationImpl firstKnock?: HttpMessageSignaturesSpec; inboxChallengePolicy?: InboxChallengePolicy; benchmarkMode: boolean; + benchmarkMetricReader?: BenchmarkMetricReader; constructor(options: FederationOptions) { super(); @@ -718,7 +725,18 @@ export class FederationImpl this.activityTransformers = options.activityTransformers ?? getDefaultActivityTransformers(); this._tracerProvider = options.tracerProvider; - this._meterProvider = options.meterProvider; + if (benchmarkMode) { + const benchmarkMetrics = createBenchmarkMeterProvider(); + this._meterProvider = benchmarkMetrics.meterProvider; + this.benchmarkMetricReader = benchmarkMetrics.reader; + registerQueueDepthGauge(this._meterProvider, [ + { role: "inbox", queue: this.inboxQueue }, + { role: "outbox", queue: this.outboxQueue }, + { role: "fanout", queue: this.fanoutQueue }, + ]); + } else { + this._meterProvider = options.meterProvider; + } this.firstKnock = options.firstKnock; } @@ -733,6 +751,9 @@ export class FederationImpl _initializeRouter(): void { this.router.add("/.well-known/webfinger", "webfinger"); this.router.add("/.well-known/nodeinfo", "nodeInfoJrd"); + if (this.benchmarkMode) { + this.router.add("/.well-known/fedify/bench/stats", "benchmarkStats"); + } } override _getTracer(): Tracer { @@ -2345,6 +2366,8 @@ export class FederationImpl context, nodeInfoDispatcher: this.nodeInfoDispatcher!, }); + case "benchmarkStats": + return await handleBenchmarkStats(request, this.benchmarkMetricReader!); } // Routes that require JSON-LD Accepts header: @@ -2634,6 +2657,7 @@ type FedifyEndpoint = | "featured" | "featured_tags" | "collection" + | "benchmark" | "not_found" | "not_acceptable" | "error"; @@ -2676,6 +2700,9 @@ function getEndpointCategory(routeName: string): FedifyEndpoint { return "featured"; case "featuredTags": return "featured_tags"; + case "benchmarkStats": + case "benchmarkTrigger": + return "benchmark"; default: return "not_found"; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f3eba0e1b..370670ad4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1070,6 +1070,9 @@ importers: '@opentelemetry/core': specifier: 'catalog:' version: 2.7.1(@opentelemetry/api@1.9.1) + '@opentelemetry/sdk-metrics': + specifier: 'catalog:' + version: 2.7.1(@opentelemetry/api@1.9.1) '@opentelemetry/sdk-trace-base': specifier: 'catalog:' version: 2.7.1(@opentelemetry/api@1.9.1) @@ -1204,9 +1207,6 @@ importers: '@fedify/vocab-tools': specifier: workspace:^ version: link:../vocab-tools - '@opentelemetry/sdk-metrics': - specifier: 'catalog:' - version: 2.7.1(@opentelemetry/api@1.9.1) '@std/assert': specifier: jsr:^0.226.0 version: '@jsr/std__assert@0.226.0' From 339836a4b361c5057b0497534f9837423e221dcc Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 15:32:31 +0900 Subject: [PATCH 03/20] Add cooperative benchmark trigger endpoint Benchmark clients need a target-side hook for driving application sendActivity() paths while keeping recipients constrained to known sink inboxes by default. This adds the trigger endpoint, request validation, sink-list enforcement, and explicit unsafe-recipient override for benchmark-only runs. https://github.com/fedify-dev/fedify/issues/782 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/bench.ts | 160 ++++++++++++++++++ .../fedify/src/federation/middleware.test.ts | 144 ++++++++++++++++ packages/fedify/src/federation/middleware.ts | 4 + 3 files changed, 308 insertions(+) diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts index 219686c1b..813792727 100644 --- a/packages/fedify/src/federation/bench.ts +++ b/packages/fedify/src/federation/bench.ts @@ -1,3 +1,4 @@ +import { Activity, Object as VocabObject, type Recipient } from "@fedify/vocab"; import { DataPointType, type ExponentialHistogram, @@ -8,6 +9,8 @@ import { type ResourceMetrics, type ScopeMetrics, } from "@opentelemetry/sdk-metrics"; +import type { Context } from "./context.ts"; +import { extractInboxes } from "./send.ts"; /** * Metric reader owned by `benchmarkMode`. @@ -99,6 +102,163 @@ export async function handleBenchmarkStats( }); } +export async function handleBenchmarkTrigger( + request: Request, + context: Context, +): Promise { + if (request.method !== "POST") { + return new Response("Method not allowed", { + status: 405, + headers: { "Allow": "POST" }, + }); + } + try { + const body = asRecord(await request.json(), "request body"); + const sender = parseSender(body.sender); + const sinks = parseSinks(body.sinks); + const recipients = await parseRecipients(body.recipients, context); + const activity = await parseActivity(body.activity, context); + const inboxes = extractInboxes({ recipients }); + const inboxUrls = Object.keys(inboxes); + const unsafeInboxes = inboxUrls.filter((inbox) => !sinks.has(inbox)); + if (unsafeInboxes.length > 0 && body.allowUnsafeRecipients !== true) { + return jsonResponse( + { + error: "unsafe_recipient", + unsafeInboxes, + }, + 403, + ); + } + const triggerId = crypto.randomUUID(); + await context.sendActivity(sender, recipients, activity); + return jsonResponse( + { + version: 1, + triggerId, + activityId: activity.id?.href ?? null, + recipientCount: recipients.length, + inboxCount: inboxUrls.length, + }, + 202, + ); + } catch (error) { + if (error instanceof BenchmarkTriggerError) { + return jsonResponse({ error: error.message }, error.status); + } + if (error instanceof SyntaxError) { + return jsonResponse({ error: "Invalid JSON request body." }, 400); + } + throw error; + } +} + +class BenchmarkTriggerError extends Error { + constructor(message: string, readonly status = 400) { + super(message); + } +} + +type BenchmarkSender = { identifier: string } | { username: string }; + +function parseSender(value: unknown): BenchmarkSender { + const sender = asRecord(value, "sender"); + if (typeof sender.identifier === "string") { + return { identifier: sender.identifier }; + } + if (typeof sender.username === "string") { + return { username: sender.username }; + } + throw new BenchmarkTriggerError( + "sender must be { identifier } or { username }.", + ); +} + +function parseSinks(value: unknown): Set { + if (!Array.isArray(value)) { + throw new BenchmarkTriggerError("sinks must be an array of inbox URLs."); + } + return new Set(value.map((sink) => { + if (typeof sink !== "string") { + throw new BenchmarkTriggerError("sinks must contain only URL strings."); + } + try { + return new URL(sink).href; + } catch { + throw new BenchmarkTriggerError("sinks must contain only valid URLs."); + } + })); +} + +async function parseRecipients( + value: unknown, + context: Context, +): Promise { + if (!Array.isArray(value)) { + throw new BenchmarkTriggerError("recipients must be an array."); + } + const recipients: Recipient[] = []; + for (const item of value) { + let object: VocabObject; + try { + object = await VocabObject.fromJsonLd(item, { + documentLoader: context.documentLoader, + contextLoader: context.contextLoader, + }); + } catch (error) { + throw new BenchmarkTriggerError( + `Invalid ActivityPub recipient: ${error}`, + ); + } + if (!isRecipient(object)) { + throw new BenchmarkTriggerError( + "each recipient must be an ActivityPub actor.", + ); + } + const recipient: Recipient = object; + if (recipient.id == null || recipient.inboxId == null) { + throw new BenchmarkTriggerError( + "each recipient must have id and inbox properties.", + ); + } + recipients.push(recipient); + } + return recipients; +} + +function isRecipient(value: unknown): value is Recipient { + return value != null && typeof value === "object" && "id" in value && + "inboxId" in value; +} + +async function parseActivity( + value: unknown, + context: Context, +): Promise { + try { + return await Activity.fromJsonLd(value, { + documentLoader: context.documentLoader, + contextLoader: context.contextLoader, + }); + } catch (error) { + throw new BenchmarkTriggerError(`Invalid ActivityPub activity: ${error}`); + } +} + +function asRecord(value: unknown, name: string): Record { + if (value == null || typeof value !== "object" || Array.isArray(value)) { + throw new BenchmarkTriggerError(`${name} must be an object.`); + } + return value as Record; +} + +function jsonResponse(body: unknown, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { "Content-Type": "application/json" }, + }); +} + function serializeScopeMetrics( resourceMetrics: ResourceMetrics, ): readonly BenchmarkScopeMetrics[] { diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index c2b55e2a0..38ac8f6d4 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -378,6 +378,150 @@ test("benchmarkMode stats endpoint", async (t) => { }); }); +test("benchmarkMode trigger endpoint", async (t) => { + const createTriggerTarget = () => { + const messages: OutboxMessage[] = []; + const queue: MessageQueue = { + enqueue(message: OutboxMessage) { + messages.push(message); + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + }; + const federation = createFederation({ + kv: new MemoryKvStore(), + benchmarkMode: true, + contextLoaderFactory: () => mockDocumentLoader, + queue: { outbox: queue }, + }); + federation + .setActorDispatcher( + "/users/{identifier}", + (ctx, identifier) => + new vocab.Person({ + id: ctx.getActorUri(identifier), + inbox: ctx.getInboxUri(identifier), + }), + ) + .setKeyPairsDispatcher(() => [ + { privateKey: rsaPrivateKey2, publicKey: rsaPublicKey2.publicKey! }, + ]); + return { federation, messages }; + }; + + const createTriggerBody = async ( + options: { + recipientInbox?: string; + sinks?: string[]; + allowUnsafeRecipients?: boolean; + } = {}, + ) => ({ + sender: { identifier: "alice" }, + sinks: options.sinks ?? ["https://sink.example/inbox"], + recipients: [ + { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Service", + id: "https://sink.example/actors/bob", + inbox: options.recipientInbox ?? "https://sink.example/inbox", + }, + ], + activity: await new vocab.Create({ + id: new URL("https://example.com/activities/bench-1"), + actor: new URL("https://example.com/users/alice"), + object: new vocab.Note({ + id: new URL("https://example.com/notes/bench-1"), + attribution: new URL("https://example.com/users/alice"), + content: "benchmark", + }), + }).toJsonLd({ contextLoader: mockDocumentLoader }), + allowUnsafeRecipients: options.allowUnsafeRecipients, + }); + + await t.step("is absent when benchmarkMode is off", async () => { + const federation = createFederation({ kv: new MemoryKvStore() }); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + }), + { contextData: undefined }, + ); + assertEquals(response.status, 404); + }); + + await t.step("rejects recipients outside the sink list", async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify( + await createTriggerBody({ + recipientInbox: "https://not-a-sink.example/inbox", + }), + ), + }), + { contextData: undefined }, + ); + assertEquals(response.status, 403); + assertEquals(messages, []); + }); + + await t.step( + "allows unsafe recipients only with an explicit override", + async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify( + await createTriggerBody({ + recipientInbox: "https://not-a-sink.example/inbox", + allowUnsafeRecipients: true, + }), + ), + }), + { contextData: undefined }, + ); + assertEquals(response.status, 202); + assertEquals(messages.length, 1); + assertEquals(messages[0].inbox, "https://not-a-sink.example/inbox"); + }, + ); + + await t.step("sends the activity to explicit sink recipients", async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(await createTriggerBody()), + }), + { contextData: undefined }, + ); + + assertEquals(response.status, 202); + const body = await response.json() as { + version: number; + triggerId: string; + activityId: string; + recipientCount: number; + inboxCount: number; + }; + assertEquals(body.version, 1); + assertEquals(typeof body.triggerId, "string"); + assertEquals(body.activityId, "https://example.com/activities/bench-1"); + assertEquals(body.recipientCount, 1); + assertEquals(body.inboxCount, 1); + assertEquals(messages.length, 1); + assertEquals(messages[0].type, "outbox"); + assertEquals(messages[0].inbox, "https://sink.example/inbox"); + }); +}); + test({ name: "Federation.createContext()", permissions: { env: true, read: true }, diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 279760b3f..4bb9a78a2 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -61,6 +61,7 @@ import { type BenchmarkMetricReader, createBenchmarkMeterProvider, handleBenchmarkStats, + handleBenchmarkTrigger, } from "./bench.ts"; import { type HttpMessageSignaturesSpec, @@ -753,6 +754,7 @@ export class FederationImpl this.router.add("/.well-known/nodeinfo", "nodeInfoJrd"); if (this.benchmarkMode) { this.router.add("/.well-known/fedify/bench/stats", "benchmarkStats"); + this.router.add("/.well-known/fedify/bench/trigger", "benchmarkTrigger"); } } @@ -2368,6 +2370,8 @@ export class FederationImpl }); case "benchmarkStats": return await handleBenchmarkStats(request, this.benchmarkMetricReader!); + case "benchmarkTrigger": + return await handleBenchmarkTrigger(request, context); } // Routes that require JSON-LD Accepts header: From 35e2e062697b163eddfdd14f571273b4bd2f5343 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 15:43:42 +0900 Subject: [PATCH 04/20] Document benchmark mode endpoints Benchmark mode introduces target-side endpoint contracts and metric behavior that users need to configure intentionally. This adds manual documentation, OpenTelemetry metric references, sidebar navigation, and the 2.3.0 changelog entry for the cooperative benchmark endpoints. https://github.com/fedify-dev/fedify/issues/744 https://github.com/fedify-dev/fedify/issues/782 Assisted-by: Codex:gpt-5.5 --- CHANGES.md | 13 ++++ docs/.vitepress/config.mts | 1 + docs/manual/benchmarking.md | 140 +++++++++++++++++++++++++++++++++++ docs/manual/federation.md | 21 ++++++ docs/manual/opentelemetry.md | 18 ++++- 5 files changed, 192 insertions(+), 1 deletion(-) create mode 100644 docs/manual/benchmarking.md diff --git a/CHANGES.md b/CHANGES.md index ace4583ef..9744e5da6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -126,6 +126,17 @@ To be released. held activities call the outbox permanent failure handler with `reason: "circuit-breaker-ttl"`. [[#620], [#778]] + - Added `benchmarkMode` to `createFederation()` and + `FederationBuilder.build()` for cooperative federation benchmarking. + When enabled, Fedify exposes `GET /.well-known/fedify/bench/stats` + for in-process OpenTelemetry metric snapshots and + `POST /.well-known/fedify/bench/trigger` for driving `sendActivity()` + to explicit benchmark sink recipients. Benchmark mode also defaults + `allowPrivateAddress` to `true` when built-in loaders are used, defaults + `signatureTimeWindow` to `false`, reports queue depth through the new + `fedify.queue.depth` gauge, and adds explicit low-latency buckets to + the signature verification duration histogram. [[#744], [#782]] + - Added OpenTelemetry metrics for ActivityPub fanout and activity lifecycle events, complementing the per-recipient `activitypub.delivery.*` counters and the per-task @@ -248,6 +259,7 @@ To be released. [#740]: https://github.com/fedify-dev/fedify/issues/740 [#741]: https://github.com/fedify-dev/fedify/issues/741 [#742]: https://github.com/fedify-dev/fedify/issues/742 +[#744]: https://github.com/fedify-dev/fedify/issues/744 [#748]: https://github.com/fedify-dev/fedify/pull/748 [#752]: https://github.com/fedify-dev/fedify/issues/752 [#753]: https://github.com/fedify-dev/fedify/pull/753 @@ -261,6 +273,7 @@ To be released. [#772]: https://github.com/fedify-dev/fedify/pull/772 [#777]: https://github.com/fedify-dev/fedify/pull/777 [#778]: https://github.com/fedify-dev/fedify/pull/778 +[#782]: https://github.com/fedify-dev/fedify/issues/782 ### @fedify/fixture diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 7fa2d3978..a71503133 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -154,6 +154,7 @@ const MANUAL = { { text: "Linting", link: "/manual/lint.md" }, { text: "Logging", link: "/manual/log.md" }, { text: "OpenTelemetry", link: "/manual/opentelemetry.md" }, + { text: "Benchmarking", link: "/manual/benchmarking.md" }, { text: "Deployment", link: "/manual/deploy.md" }, ], activeMatch: "/manual", diff --git a/docs/manual/benchmarking.md b/docs/manual/benchmarking.md new file mode 100644 index 000000000..3d392c131 --- /dev/null +++ b/docs/manual/benchmarking.md @@ -0,0 +1,140 @@ +--- +description: >- + Fedify can expose cooperative benchmark endpoints for measuring federation + workloads without requiring an external metrics backend. +--- + +Benchmarking +============ + +*This API is available since Fedify 2.3.0.* + +Fedify can run as a cooperative benchmark target by enabling +`~FederationOptions.benchmarkMode`. This mode exposes local benchmark +endpoints under `/.well-known/fedify/bench/` and configures an in-process +OpenTelemetry metrics reader so benchmark clients can collect server-side +measurements without a separate metrics backend. + +> [!WARNING] +> Do not enable `benchmarkMode` in production. It is intended for benchmark +> targets that you control. + + +Enabling benchmark mode +----------------------- + +Set `benchmarkMode: true` when creating the `Federation` object: + +~~~~ typescript twoslash +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { createFederation } from "@fedify/fedify"; + +const federation = createFederation({ +// ---cut-start--- + kv: null as unknown as KvStore, +// ---cut-end--- + benchmarkMode: true, +}); +~~~~ + +When enabled, Fedify changes only benchmark-target defaults: + + - `~FederationOptions.allowPrivateAddress` defaults to `true`, unless a + custom document loader factory is configured. + - `~FederationOptions.signatureTimeWindow` defaults to `false`. + - Explicit `allowPrivateAddress` and `signatureTimeWindow` values still win. + - Inbox idempotency is unchanged. Benchmark clients that need repeated + deliveries should mint unique activity IDs. + +If you provide `meterProvider` together with `benchmarkMode`, Fedify throws a +`TypeError`. OpenTelemetry metric readers have to be attached when a +`MeterProvider` is constructed, so benchmark mode owns its in-process provider. + + +Benchmark stats endpoint +------------------------ + +`GET /.well-known/fedify/bench/stats` returns a versioned JSON snapshot of the +server-side metrics collected by the benchmark mode reader: + +~~~~ json +{ + "version": 1, + "source": "server", + "generatedAt": "2026-06-02T00:00:00.000Z", + "scopeMetrics": [], + "errors": [] +} +~~~~ + +The `scopeMetrics` field contains serialized OpenTelemetry scope metrics. +Observable queue depth is included when configured queues implement +`MessageQueue.getDepth()`. + + +Benchmark trigger endpoint +-------------------------- + +`POST /.well-known/fedify/bench/trigger` asks the target application to call +`Context.sendActivity()` with an explicit sender, recipients, and activity. +This exercises the target's normal outbox and queue path. + +The request body has this shape: + +~~~~ json +{ + "sender": { "identifier": "alice" }, + "sinks": ["https://sink.example/inbox"], + "recipients": [ + { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Service", + "id": "https://sink.example/actors/bob", + "inbox": "https://sink.example/inbox" + } + ], + "activity": { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Create", + "id": "https://example.com/activities/bench-1", + "actor": "https://example.com/users/alice", + "object": { + "type": "Note", + "id": "https://example.com/notes/bench-1", + "content": "benchmark" + } + } +} +~~~~ + +The `sender` must be either `{ "identifier": string }` or +`{ "username": string }`. Recipients are parsed as ActivityPub actors and must +have `id` and `inbox` properties. The activity is parsed as an ActivityPub +`Activity`. + +By default, every recipient inbox must appear in the `sinks` list. This keeps +benchmark traffic pointed at benchmark sink inboxes. To bypass this guard for +a controlled run, set `"allowUnsafeRecipients": true`. + +A successful trigger returns `202 Accepted`: + +~~~~ json +{ + "version": 1, + "triggerId": "018f52d9-8cc2-7b9d-9f9a-f0d777dfc14b", + "activityId": "https://example.com/activities/bench-1", + "recipientCount": 1, + "inboxCount": 1 +} +~~~~ + + +Metrics +------- + +Benchmark mode uses the same Fedify metrics documented in +[*OpenTelemetry*](./opentelemetry.md), including queue task metrics, queue +depth, HTTP server metrics, and signature verification histograms. The +benchmark endpoints themselves are classified as `fedify.endpoint=benchmark` +in `fedify.http.server.request.*` metrics. diff --git a/docs/manual/federation.md b/docs/manual/federation.md index cefea841e..42bb3c558 100644 --- a/docs/manual/federation.md +++ b/docs/manual/federation.md @@ -275,6 +275,27 @@ Turned off by default. [SSRF]: https://owasp.org/www-community/attacks/Server_Side_Request_Forgery +### `benchmarkMode` + +*This API is available since Fedify 2.3.0.* + +Whether to enable cooperative benchmark mode. When enabled, Fedify exposes +benchmark endpoints under `/.well-known/fedify/bench/` and configures an +in-process metrics reader for benchmark clients. + +This mode changes only benchmark-target defaults: + + - `allowPrivateAddress` defaults to `true`, unless a custom document loader + factory is configured. + - `signatureTimeWindow` defaults to `false`. + - Explicit option values still win. + +> [!WARNING] +> Do not enable `benchmarkMode` in production. + +See the [*Benchmarking* section](./benchmarking.md) for endpoint details and +safety rules. + ### `userAgent` *This API is available since Fedify 1.3.0.* diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index f2f51b931..ff7848030 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -370,6 +370,7 @@ Fedify records the following OpenTelemetry metrics: | `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | | `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. | | `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. | +| `fedify.queue.depth` | Gauge | `{message}` | Reports queued, ready, and delayed queue depth when the queue backend supports it. | ### Metric attributes @@ -841,6 +842,15 @@ Fedify records the following OpenTelemetry metrics: Fedify process*, not cross-process totals. Aggregate it across replicas in your metrics backend. +`fedify.queue.depth` +: `fedify.queue.depth.state` is always present and is one of `queued`, + `ready`, or `delayed`. `fedify.queue.role` is `inbox`, `outbox`, + `fanout`, or `shared`; `shared` means the same queue instance backs more + than one Fedify queue role, and `fedify.queue.roles` lists those roles as a + comma-separated string. `fedify.queue.backend` and + `fedify.queue.native_retrial` follow the same rules as the queue task + metrics. + The `fedify.queue.task.*` metrics describe what Fedify's workers do with queued messages. They complement the backend-side [`MessageQueue.getDepth()` API](./mq.md#queue-depth-reporting), which @@ -849,6 +859,10 @@ Reading both signals together (task throughput plus backlog depth) makes it possible to distinguish a small, slow queue from a large, fast one and to set alerting thresholds for delivery latency under load. +When [`benchmarkMode`](./benchmarking.md) is enabled, Fedify serves a +versioned snapshot of these in-process metrics from +`/.well-known/fedify/bench/stats`. + The `activitypub.inbox.activity`, `activitypub.outbox.activity`, and `activitypub.fanout.recipients` metrics describe what is happening at the *activity* level, complementing the per-recipient @@ -958,9 +972,11 @@ for ActivityPub: | `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | | `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | | `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | -| `fedify.queue.role` | string | The Fedify queue role for the task: `inbox`, `outbox`, or `fanout`. | `"outbox"` | +| `fedify.queue.role` | string | The Fedify queue role: `inbox`, `outbox`, `fanout`, or `shared` for queue depth rows where one queue backs multiple roles. | `"outbox"` | | `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | | `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | +| `fedify.queue.depth.state` | string | Queue depth count kind: `queued`, `ready`, or `delayed`. | `"queued"` | +| `fedify.queue.roles` | string | Comma-separated queue roles when one queue instance backs multiple roles. | `"fanout,inbox,outbox"` | | `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | | `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | | `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | From effd053e7eb96b7ca41b3f3c8f31e291f9e557e2 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 16:26:09 +0900 Subject: [PATCH 05/20] Register queue depth metrics generally Queue depth is documented as a regular Fedify OpenTelemetry metric, not as a benchmark-only signal. Register the observable gauge after selecting the meter provider so normal applications with getDepth-capable queues can export it without enabling benchmark endpoints. Add a regression test that constructs a regular federation with an explicit MeterProvider and verifies fedify.queue.depth is collected. https://github.com/fedify-dev/fedify/issues/782 Assisted-by: Codex:gpt-5.5 --- .../fedify/src/federation/middleware.test.ts | 60 +++++++++++++++++++ packages/fedify/src/federation/middleware.ts | 10 ++-- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 38ac8f6d4..938a85f4b 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -51,6 +51,11 @@ import { } from "../testing/keys.ts"; import { FetchError, getDocumentLoader } from "@fedify/vocab-runtime"; import { SpanStatusCode } from "@opentelemetry/api"; +import { + DataPointType, + MeterProvider, + MetricReader, +} from "@opentelemetry/sdk-metrics"; import { getAuthenticatedDocumentLoader } from "../utils/docloader.ts"; import { CircuitBreaker } from "./circuit-breaker.ts"; @@ -80,6 +85,16 @@ async function withLogtapeLock(fn: () => Promise): Promise { return await run; } +class TestMetricReader extends MetricReader { + protected onShutdown(): Promise { + return Promise.resolve(); + } + + protected onForceFlush(): Promise { + return Promise.resolve(); + } +} + test("createFederation()", async (t) => { const kv = new MemoryKvStore(); @@ -378,6 +393,51 @@ test("benchmarkMode stats endpoint", async (t) => { }); }); +test("createFederation() registers queue depth for regular metrics", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + const queue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued: 5, ready: 4, delayed: 3 }); + }, + }; + createFederation({ + kv: new MemoryKvStore(), + meterProvider, + queue, + }); + + const result = await reader.collect(); + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + + assertExists(queueDepth); + assertEquals(queueDepth.dataPointType, DataPointType.GAUGE); + assertEquals( + queueDepth.dataPoints.map((point) => ({ + state: point.attributes["fedify.queue.depth.state"], + role: point.attributes["fedify.queue.role"], + value: point.value, + })).sort((a, b) => String(a.state).localeCompare(String(b.state))), + [ + { state: "delayed", role: "shared", value: 3 }, + { state: "queued", role: "shared", value: 5 }, + { state: "ready", role: "shared", value: 4 }, + ], + ); + } finally { + await meterProvider.shutdown(); + } +}); + test("benchmarkMode trigger endpoint", async (t) => { const createTriggerTarget = () => { const messages: OutboxMessage[] = []; diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 4bb9a78a2..12a39b7eb 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -730,14 +730,14 @@ export class FederationImpl const benchmarkMetrics = createBenchmarkMeterProvider(); this._meterProvider = benchmarkMetrics.meterProvider; this.benchmarkMetricReader = benchmarkMetrics.reader; - registerQueueDepthGauge(this._meterProvider, [ - { role: "inbox", queue: this.inboxQueue }, - { role: "outbox", queue: this.outboxQueue }, - { role: "fanout", queue: this.fanoutQueue }, - ]); } else { this._meterProvider = options.meterProvider; } + registerQueueDepthGauge(this.meterProvider, [ + { role: "inbox", queue: this.inboxQueue }, + { role: "outbox", queue: this.outboxQueue }, + { role: "fanout", queue: this.fanoutQueue }, + ]); this.firstKnock = options.firstKnock; } From c4a087b6805130e796de3189ebc2eb2007152e61 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 17:02:11 +0900 Subject: [PATCH 06/20] Document benchmark meter provider guard Benchmark mode owns its in-process metric reader, so application-provided MeterProvider instances cannot be combined with benchmarkMode: true. Make that TypeError contract visible on createFederation() and FederationBuilder.build(). Also document the common deployment pattern where one code path switches benchmark mode with an environment flag and passes the application meter provider only when benchmark mode is disabled. https://github.com/fedify-dev/fedify/issues/782 Assisted-by: Codex:gpt-5.5 --- docs/manual/benchmarking.md | 23 ++++++++++++++++++++ packages/fedify/src/federation/builder.ts | 7 ++++++ packages/fedify/src/federation/federation.ts | 3 +++ packages/fedify/src/federation/middleware.ts | 4 +++- 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/docs/manual/benchmarking.md b/docs/manual/benchmarking.md index 3d392c131..f25ae1551 100644 --- a/docs/manual/benchmarking.md +++ b/docs/manual/benchmarking.md @@ -51,6 +51,29 @@ If you provide `meterProvider` together with `benchmarkMode`, Fedify throws a `TypeError`. OpenTelemetry metric readers have to be attached when a `MeterProvider` is constructed, so benchmark mode owns its in-process provider. +If the same application code sometimes runs with benchmark mode and sometimes +runs with your normal OpenTelemetry pipeline, pass your application +`meterProvider` only when benchmark mode is off: + +~~~~ typescript twoslash +import type { KvStore } from "@fedify/fedify"; +import type { MeterProvider } from "@opentelemetry/api"; +// ---cut-start--- +declare const process: { env: Record }; +const kv = null as unknown as KvStore; +const meterProvider = null as unknown as MeterProvider; +// ---cut-end--- +import { createFederation } from "@fedify/fedify"; + +const benchmarkMode = process.env.FEDIFY_BENCHMARK === "1"; + +const federation = createFederation({ + kv, + benchmarkMode, + meterProvider: benchmarkMode ? undefined : meterProvider, +}); +~~~~ + Benchmark stats endpoint ------------------------ diff --git a/packages/fedify/src/federation/builder.ts b/packages/fedify/src/federation/builder.ts index 95c989a00..7e10c1909 100644 --- a/packages/fedify/src/federation/builder.ts +++ b/packages/fedify/src/federation/builder.ts @@ -195,6 +195,13 @@ export class FederationBuilderImpl this.collectionTypeIds = {}; } + /** + * Builds the federation object. + * @param options Parameters for initializing the federation object. + * @returns The federation object. + * @throws {TypeError} If `benchmarkMode: true` and `meterProvider` are both + * specified. + */ async build( options: FederationOptions, ): Promise> { diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index c3fa5efbc..55a9b1613 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -802,7 +802,10 @@ export interface FederationBuilder extends Federatable { /** * Builds the federation object. + * @param options Parameters for initializing the federation object. * @returns The federation object. + * @throws {TypeError} If `benchmarkMode: true` and `meterProvider` are both + * specified. */ build( options: FederationOptions, diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 12a39b7eb..2c753445f 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -424,8 +424,10 @@ export interface FederationOrigin { /** * Create a new {@link Federation} instance. - * @param parameters Parameters for initializing the instance. + * @param options Parameters for initializing the instance. * @returns A new {@link Federation} instance. + * @throws {TypeError} If `benchmarkMode: true` and `meterProvider` are both + * specified. * @since 0.10.0 */ export function createFederation( From 48aed08935c909f576ca93817b25c01662be489b Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 17:16:35 +0900 Subject: [PATCH 07/20] Add a PR link to the changelog --- CHANGES.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 9744e5da6..db2f9300f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -135,7 +135,7 @@ To be released. `allowPrivateAddress` to `true` when built-in loaders are used, defaults `signatureTimeWindow` to `false`, reports queue depth through the new `fedify.queue.depth` gauge, and adds explicit low-latency buckets to - the signature verification duration histogram. [[#744], [#782]] + the signature verification duration histogram. [[#744], [#782], [#787]] - Added OpenTelemetry metrics for ActivityPub fanout and activity lifecycle events, complementing the per-recipient @@ -274,6 +274,7 @@ To be released. [#777]: https://github.com/fedify-dev/fedify/pull/777 [#778]: https://github.com/fedify-dev/fedify/pull/778 [#782]: https://github.com/fedify-dev/fedify/issues/782 +[#787]: https://github.com/fedify-dev/fedify/pull/787 ### @fedify/fixture From e5ae8863371c7e49177511c281be62dfb237bab3 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 17:37:37 +0900 Subject: [PATCH 08/20] Refresh pnpm lockfile importers The benchmark mode change moved @opentelemetry/sdk-metrics into the @fedify/fedify runtime dependencies, but the lockfile still listed the specifier under @fedify/debugger. Frozen pnpm installs therefore failed before CI could run any checks. Move the importer entry to match the package manifests. https://github.com/fedify-dev/fedify/pull/787 Assisted-by: Codex:gpt-5.5 --- pnpm-lock.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 370670ad4..f6f7bec59 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1070,9 +1070,6 @@ importers: '@opentelemetry/core': specifier: 'catalog:' version: 2.7.1(@opentelemetry/api@1.9.1) - '@opentelemetry/sdk-metrics': - specifier: 'catalog:' - version: 2.7.1(@opentelemetry/api@1.9.1) '@opentelemetry/sdk-trace-base': specifier: 'catalog:' version: 2.7.1(@opentelemetry/api@1.9.1) @@ -1176,6 +1173,9 @@ importers: '@opentelemetry/core': specifier: 'catalog:' version: 2.7.1(@opentelemetry/api@1.9.1) + '@opentelemetry/sdk-metrics': + specifier: 'catalog:' + version: 2.7.1(@opentelemetry/api@1.9.1) '@opentelemetry/sdk-trace-base': specifier: 'catalog:' version: 2.7.1(@opentelemetry/api@1.9.1) From ef19392e279dc5be98c34a8824d9550d2022df73 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 19:31:41 +0900 Subject: [PATCH 09/20] Isolate queue depth metric failures Queue depth observation is best-effort because third-party queue backends may fail while reporting their depth. Skip failed or missing snapshots so one backend cannot poison the whole OpenTelemetry collection pass. https://github.com/fedify-dev/fedify/pull/787#discussion_r3339711524 Assisted-by: Codex:gpt-5.5 --- .../fedify/src/federation/metrics.test.ts | 66 +++++++++++++++++++ packages/fedify/src/federation/metrics.ts | 8 ++- 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts index 6de1f2fa2..2b39668ba 100644 --- a/packages/fedify/src/federation/metrics.test.ts +++ b/packages/fedify/src/federation/metrics.test.ts @@ -27,6 +27,7 @@ import { recordOutboxActivity, recordOutboxEnqueue, recordWebFingerHandle, + registerQueueDepthGauge, } from "./metrics.ts"; class TestMetricReader extends MetricReader { @@ -131,6 +132,71 @@ test("signature verification duration uses explicit low-latency buckets", async await meterProvider.shutdown(); }); +test("registerQueueDepthGauge() skips unavailable depth snapshots", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + const throwingQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + throw new TypeError("backend unavailable"); + }, + }; + const nullDepthQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve(null as never); + }, + }; + const healthyQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued: 7 }); + }, + }; + + registerQueueDepthGauge(meterProvider, [ + { role: "inbox", queue: throwingQueue }, + { role: "outbox", queue: nullDepthQueue }, + { role: "fanout", queue: healthyQueue }, + ]); + + const result = await reader.collect(); + assertEquals(result.errors, []); + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + assertEquals(queueDepth?.dataPointType, DataPointType.GAUGE); + assertEquals( + queueDepth?.dataPoints.map((point) => ({ + state: point.attributes["fedify.queue.depth.state"], + role: point.attributes["fedify.queue.role"], + value: point.value, + })), + [ + { state: "queued", role: "fanout", value: 7 }, + ], + ); + } finally { + await meterProvider.shutdown(); + } +}); + test("recordInboxActivity() records counter with result and activity type", () => { const [meterProvider, recorder] = createTestMeterProvider(); for ( diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index b12e2e01d..ad2db15c4 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -1223,7 +1223,13 @@ export function registerQueueDepthGauge( const gauge = getFederationMetrics(meterProvider).queueDepth; gauge.addCallback(async (observableResult) => { for (const [queue, roles] of uniqueQueues) { - const depth = await queue.getDepth!(); + let depth; + try { + depth = await queue.getDepth!(); + } catch { + continue; + } + if (depth == null) continue; const attributes = buildQueueDepthAttributes(queue, roles); observableResult.observe(depth.queued, { ...attributes, From 9989d6dbdca4a1f4ecf8fab9f8ea2f1514aafe5c Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 19:32:01 +0900 Subject: [PATCH 10/20] Reject unreadable benchmark trigger bodies Some runtimes throw non-SyntaxError exceptions while parsing invalid or empty JSON request bodies. Parse the trigger body in its own guarded step so those requests consistently receive a 400 response. https://github.com/fedify-dev/fedify/pull/787#discussion_r3339711544 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/bench.ts | 11 +++++++---- .../fedify/src/federation/middleware.test.ts | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts index 813792727..0178dca47 100644 --- a/packages/fedify/src/federation/bench.ts +++ b/packages/fedify/src/federation/bench.ts @@ -112,8 +112,14 @@ export async function handleBenchmarkTrigger( headers: { "Allow": "POST" }, }); } + let json: unknown; try { - const body = asRecord(await request.json(), "request body"); + json = await request.json(); + } catch { + return jsonResponse({ error: "Invalid JSON request body." }, 400); + } + try { + const body = asRecord(json, "request body"); const sender = parseSender(body.sender); const sinks = parseSinks(body.sinks); const recipients = await parseRecipients(body.recipients, context); @@ -146,9 +152,6 @@ export async function handleBenchmarkTrigger( if (error instanceof BenchmarkTriggerError) { return jsonResponse({ error: error.message }, error.status); } - if (error instanceof SyntaxError) { - return jsonResponse({ error: "Invalid JSON request body." }, 400); - } throw error; } } diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 938a85f4b..ba793f6ed 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -58,6 +58,7 @@ import { } from "@opentelemetry/sdk-metrics"; import { getAuthenticatedDocumentLoader } from "../utils/docloader.ts"; import { CircuitBreaker } from "./circuit-breaker.ts"; +import { handleBenchmarkTrigger } from "./bench.ts"; const documentLoader = getDocumentLoader(); import type { Context, GetActorOptions } from "./context.ts"; @@ -511,6 +512,23 @@ test("benchmarkMode trigger endpoint", async (t) => { assertEquals(response.status, 404); }); + await t.step("rejects unreadable JSON request bodies", async () => { + const request = { + method: "POST", + json() { + throw new TypeError("body is unavailable"); + }, + } as unknown as Request; + const response = await handleBenchmarkTrigger( + request, + {} as Context, + ); + assertEquals(response.status, 400); + assertEquals(await response.json(), { + error: "Invalid JSON request body.", + }); + }); + await t.step("rejects recipients outside the sink list", async () => { const { federation, messages } = createTriggerTarget(); const response = await federation.fetch( From d67d80d6733a631515512d46c6ced664d72e094f Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 19:32:11 +0900 Subject: [PATCH 11/20] Preserve benchmark defaults for auth loaders An authenticated document loader only affects authenticated key fetching. It should not disable benchmarkMode's private-address default for the regular document and context loaders, whose local benchmark behavior is still needed. Keep the explicit allowPrivateAddress and userAgent guard intact while avoiding that guard for the benchmark default. https://github.com/fedify-dev/fedify/pull/787#discussion_r3339731864 https://github.com/fedify-dev/fedify/pull/787#discussion_r3339812868 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/middleware.test.ts | 13 +++++++++++++ packages/fedify/src/federation/middleware.ts | 5 ++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index ba793f6ed..9e7c4fb5f 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -148,6 +148,19 @@ test("createFederation()", async (t) => { assertEquals(federation.allowPrivateAddress, false); }); + await t.step( + "benchmarkMode keeps private-address default with auth loader only", + () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + authenticatedDocumentLoaderFactory: () => mockDocumentLoader, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.allowPrivateAddress, true); + }, + ); + await t.step("benchmarkMode rejects an explicit meterProvider", () => { const [meterProvider] = createTestMeterProvider(); assertThrows( diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 2c753445f..2c98cc7fe 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -473,8 +473,7 @@ export class FederationImpl super(); const benchmarkMode = options.benchmarkMode ?? false; const hasCustomLoaderFactory = options.documentLoaderFactory != null || - options.contextLoaderFactory != null || - options.authenticatedDocumentLoaderFactory != null; + options.contextLoaderFactory != null; const allowPrivateAddress = options.allowPrivateAddress ?? (benchmarkMode && !hasCustomLoaderFactory ? true : false); const signatureTimeWindow = options.signatureTimeWindow ?? @@ -599,7 +598,7 @@ export class FederationImpl this.router.trailingSlashInsensitive = options.trailingSlashInsensitive ?? false; this._initializeRouter(); - if (allowPrivateAddress || options.userAgent != null) { + if (options.allowPrivateAddress === true || options.userAgent != null) { if (options.documentLoaderFactory != null) { throw new TypeError( "Cannot set documentLoaderFactory with allowPrivateAddress or " + From 63f9707766ece4bfcc3a538d780891bc9a4b6d6c Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 19:58:43 +0900 Subject: [PATCH 12/20] Tighten benchmark review behavior Remove the benchmark trigger ID from the response because it is not propagated through the outbound activity path. Clarify benchmark-mode defaults around loader factories, verify explicit overrides, and make the benchmark warning name the effective relaxed protections while keeping the LogTape properties structured for machine-readable logging. Use a deterministic circuit breaker clock for the expired held activity test so the TTL assertion does not depend on wall-clock time. https://github.com/fedify-dev/fedify/pull/787 Assisted-by: Codex:gpt-5.5 --- docs/manual/benchmarking.md | 1 - packages/fedify/src/federation/bench.ts | 2 - .../fedify/src/federation/builder.test.ts | 11 +++ packages/fedify/src/federation/federation.ts | 3 +- .../fedify/src/federation/middleware.test.ts | 41 +++++++-- packages/fedify/src/federation/middleware.ts | 83 ++++++++++++++++++- 6 files changed, 126 insertions(+), 15 deletions(-) diff --git a/docs/manual/benchmarking.md b/docs/manual/benchmarking.md index f25ae1551..742c474ef 100644 --- a/docs/manual/benchmarking.md +++ b/docs/manual/benchmarking.md @@ -145,7 +145,6 @@ A successful trigger returns `202 Accepted`: ~~~~ json { "version": 1, - "triggerId": "018f52d9-8cc2-7b9d-9f9a-f0d777dfc14b", "activityId": "https://example.com/activities/bench-1", "recipientCount": 1, "inboxCount": 1 diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts index 0178dca47..86097bd39 100644 --- a/packages/fedify/src/federation/bench.ts +++ b/packages/fedify/src/federation/bench.ts @@ -136,12 +136,10 @@ export async function handleBenchmarkTrigger( 403, ); } - const triggerId = crypto.randomUUID(); await context.sendActivity(sender, recipients, activity); return jsonResponse( { version: 1, - triggerId, activityId: activity.id?.href ?? null, recipientCount: recipients.length, inboxCount: inboxUrls.length, diff --git a/packages/fedify/src/federation/builder.test.ts b/packages/fedify/src/federation/builder.test.ts index f4aab0ed0..a2af0d6a8 100644 --- a/packages/fedify/src/federation/builder.test.ts +++ b/packages/fedify/src/federation/builder.test.ts @@ -176,6 +176,17 @@ test("FederationBuilder", async (t) => { assertEquals(impl.benchmarkMode, true); assertEquals(impl.allowPrivateAddress, true); assertEquals(impl.signatureTimeWindow, false); + + const overridden = await builder.build({ + kv: new MemoryKvStore(), + benchmarkMode: true, + allowPrivateAddress: false, + signatureTimeWindow: { minutes: 10 }, + }); + const overriddenImpl = overridden as FederationImpl; + assertEquals(overriddenImpl.benchmarkMode, true); + assertEquals(overriddenImpl.allowPrivateAddress, false); + assertEquals(overriddenImpl.signatureTimeWindow, { minutes: 10 }); }); await t.step("should snapshot router state on build", async () => { diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index 55a9b1613..4517bb270 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -943,7 +943,8 @@ export interface FederationOptions { * targets. Do not enable this option in production. * * When enabled, {@link FederationOptions.allowPrivateAddress} defaults to - * `true` unless a custom document loader is configured, and + * `true` unless {@link FederationOptions.documentLoaderFactory} or + * {@link FederationOptions.contextLoaderFactory} is configured, and * {@link FederationOptions.signatureTimeWindow} defaults to `false`. * * Turned off by default. diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 9e7c4fb5f..8023c6f5a 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -201,8 +201,26 @@ test("createFederation()", async (t) => { assertEquals(records[0].level, "warning"); assertEquals( records[0].rawMessage, - "Fedify benchmarkMode is enabled; benchmark-only relaxations are " + - "active and must not be used in production.", + "Fedify benchmarkMode is enabled; private address checks " + + "disabled (allowPrivateAddress=true); HTTP Signature time " + + "window disabled (signatureTimeWindow=false). Benchmark " + + "endpoints are active and must not be used in production.", + ); + assertEquals( + records[0].properties.relaxations, + [ + { + protection: "private_address_checks", + effect: "disabled", + effectiveValue: true, + }, + { + protection: "http_signature_time_window", + effect: "disabled", + effectiveValue: false, + secureDefaultSeconds: 3600, + }, + ], ); } finally { await reset(); @@ -597,13 +615,11 @@ test("benchmarkMode trigger endpoint", async (t) => { assertEquals(response.status, 202); const body = await response.json() as { version: number; - triggerId: string; activityId: string; recipientCount: number; inboxCount: number; }; assertEquals(body.version, 1); - assertEquals(typeof body.triggerId, "string"); assertEquals(body.activityId, "https://example.com/activities/bench-1"); assertEquals(body.recipientCount, 1); assertEquals(body.inboxCount, 1); @@ -8016,13 +8032,20 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { await t.step("expired held activity is dropped", async () => { fetchMock.hardReset(); fetchMock.spyGlobal(); + const now = Temporal.Instant.from("2026-05-25T00:00:02Z"); let dropped: { remoteHost: string; heldSince: Temporal.Instant } | null = null; - const { federation, queued } = setup({ - failureThreshold: 1, - heldActivityTtl: { seconds: 1 }, - onActivityDrop(remoteHost, details) { - dropped = { remoteHost, heldSince: details.heldSince }; + const { federation, queued, kv } = setup(false); + federation.circuitBreaker = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + failureThreshold: 1, + heldActivityTtl: { seconds: 1 }, + onActivityDrop(remoteHost, details) { + dropped = { remoteHost, heldSince: details.heldSince }; + }, }, }); let permanentFailureReason: unknown; diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 2c98cc7fe..380c269a5 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -220,6 +220,78 @@ function clampNegativeDelay(delay: Temporal.Duration): Temporal.Duration { return delay.sign < 0 ? Temporal.Duration.from({ seconds: 0 }) : delay; } +type BenchmarkRelaxation = + | { + readonly protection: "private_address_checks"; + readonly effect: "disabled"; + readonly effectiveValue: true; + } + | { + readonly protection: "http_signature_time_window"; + readonly effect: "disabled"; + readonly effectiveValue: false; + readonly secureDefaultSeconds: 3600; + } + | { + readonly protection: "http_signature_time_window"; + readonly effect: "changed"; + readonly effectiveSeconds: number; + readonly secureDefaultSeconds: 3600; + }; + +function getBenchmarkRelaxations( + allowPrivateAddress: boolean, + signatureTimeWindow: Temporal.Duration | Temporal.DurationLike | false, +): BenchmarkRelaxation[] { + const relaxations: BenchmarkRelaxation[] = []; + if (allowPrivateAddress) { + relaxations.push({ + protection: "private_address_checks", + effect: "disabled", + effectiveValue: true, + }); + } + if (signatureTimeWindow === false) { + relaxations.push({ + protection: "http_signature_time_window", + effect: "disabled", + effectiveValue: false, + secureDefaultSeconds: 3600, + }); + } else { + const seconds = Temporal.Duration.from(signatureTimeWindow).total({ + unit: "seconds", + }); + if (seconds !== 3600) { + relaxations.push({ + protection: "http_signature_time_window", + effect: "changed", + effectiveSeconds: seconds, + secureDefaultSeconds: 3600, + }); + } + } + return relaxations; +} + +function formatBenchmarkRelaxations( + relaxations: readonly BenchmarkRelaxation[], +): string { + if (relaxations.length < 1) return "no benchmark-only protections relaxed"; + return relaxations.map((relaxation) => { + switch (relaxation.protection) { + case "private_address_checks": + return "private address checks disabled (allowPrivateAddress=true)"; + case "http_signature_time_window": + if (relaxation.effect === "disabled") { + return `HTTP Signature time window disabled (signatureTimeWindow=false)`; + } + return `HTTP Signature time window set to ${relaxation.effectiveSeconds}s ` + + `(secure default: ${relaxation.secureDefaultSeconds}s)`; + } + }).join("; "); +} + function maxDelay( first: Temporal.Duration, second: Temporal.Duration, @@ -486,9 +558,16 @@ export class FederationImpl ); } if (benchmarkMode) { + const relaxations = getBenchmarkRelaxations( + allowPrivateAddress, + signatureTimeWindow, + ); + const relaxationSummary = formatBenchmarkRelaxations(relaxations); getLogger(["fedify", "federation", "benchmark"]).warn( - "Fedify benchmarkMode is enabled; benchmark-only relaxations are " + - "active and must not be used in production.", + `Fedify benchmarkMode is enabled; ${relaxationSummary}. Benchmark endpoints are active and must not be used in production.`, + { + relaxations, + }, ); } this.benchmarkMode = benchmarkMode; From 3c8d3d930dc2280ca02512d5e3a3c7546f2a6061 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 19:58:52 +0900 Subject: [PATCH 13/20] Tidy metrics test teardown Ensure the low-latency signature histogram test always shuts down its MeterProvider, even if an assertion fails. Also avoid the never cast in the queue depth mock by casting through the queue depth type instead. https://github.com/fedify-dev/fedify/pull/787#discussion_r3340510722 Assisted-by: Codex:gpt-5.5 --- .../fedify/src/federation/metrics.test.ts | 66 ++++++++++--------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts index 2b39668ba..309286e5d 100644 --- a/packages/fedify/src/federation/metrics.test.ts +++ b/packages/fedify/src/federation/metrics.test.ts @@ -8,7 +8,7 @@ import { } from "@opentelemetry/sdk-metrics"; import type { DocumentLoader, RemoteDocument } from "@fedify/vocab-runtime"; import { FetchError } from "@fedify/vocab-runtime"; -import type { MessageQueue } from "./mq.ts"; +import type { MessageQueue, MessageQueueDepth } from "./mq.ts"; import { classifyFetchError, getFederationMetrics, @@ -100,36 +100,40 @@ test("recordFanoutRecipients() omits activity type when unknown", () => { test("signature verification duration uses explicit low-latency buckets", async () => { const reader = new TestMetricReader(); const meterProvider = new MeterProvider({ readers: [reader] }); - getFederationMetrics(meterProvider).recordSignatureVerificationDuration( - 7, - "http", - "verified", - ); - - const result = await reader.collect(); - const metric = result.resourceMetrics.scopeMetrics - .flatMap((scope) => scope.metrics) - .find((metric) => - metric.descriptor.name === "activitypub.signature.verification.duration" + try { + getFederationMetrics(meterProvider).recordSignatureVerificationDuration( + 7, + "http", + "verified", ); - assertEquals(metric?.dataPointType, DataPointType.HISTOGRAM); - const histogram = metric as HistogramMetricData | undefined; - assertEquals(histogram?.dataPoints[0].value.buckets.boundaries, [ - 0.1, - 0.25, - 0.5, - 1, - 2.5, - 5, - 10, - 25, - 50, - 100, - 250, - 500, - 1000, - ]); - await meterProvider.shutdown(); + + const result = await reader.collect(); + const metric = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => + metric.descriptor.name === + "activitypub.signature.verification.duration" + ); + assertEquals(metric?.dataPointType, DataPointType.HISTOGRAM); + const histogram = metric as HistogramMetricData | undefined; + assertEquals(histogram?.dataPoints[0].value.buckets.boundaries, [ + 0.1, + 0.25, + 0.5, + 1, + 2.5, + 5, + 10, + 25, + 50, + 100, + 250, + 500, + 1000, + ]); + } finally { + await meterProvider.shutdown(); + } }); test("registerQueueDepthGauge() skips unavailable depth snapshots", async () => { @@ -155,7 +159,7 @@ test("registerQueueDepthGauge() skips unavailable depth snapshots", async () => return Promise.resolve(); }, getDepth() { - return Promise.resolve(null as never); + return Promise.resolve(null as unknown as MessageQueueDepth); }, }; const healthyQueue: MessageQueue = { From 0dc8e0f38cac7b92a15ac38f7239bd7b7a6f55df Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 20:16:04 +0900 Subject: [PATCH 14/20] Constrain benchmark trigger delivery Make benchmark trigger delivery use server-configured sink inboxes instead of trusting allowlists or bypass flags from the request body. This keeps the trigger endpoint from being used to make a target sign and enqueue traffic to arbitrary inboxes. Also make benchmark warning generation tolerate calendar-aware signature time windows by skipping the seconds summary when Temporal cannot convert the value without a relative reference. https://github.com/fedify-dev/fedify/pull/787#discussion_r3340679284 https://github.com/fedify-dev/fedify/pull/787#discussion_r3340698783 Assisted-by: Codex:gpt-5.5 --- CHANGES.md | 11 +-- docs/manual/benchmarking.md | 25 ++++-- packages/fedify/src/federation/bench.ts | 29 +++---- packages/fedify/src/federation/builder.ts | 2 +- packages/fedify/src/federation/federation.ts | 28 ++++++- .../fedify/src/federation/middleware.test.ts | 84 ++++++++++++++----- packages/fedify/src/federation/middleware.ts | 48 +++++++++-- 7 files changed, 163 insertions(+), 64 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index db2f9300f..7dfe26199 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -131,11 +131,12 @@ To be released. When enabled, Fedify exposes `GET /.well-known/fedify/bench/stats` for in-process OpenTelemetry metric snapshots and `POST /.well-known/fedify/bench/trigger` for driving `sendActivity()` - to explicit benchmark sink recipients. Benchmark mode also defaults - `allowPrivateAddress` to `true` when built-in loaders are used, defaults - `signatureTimeWindow` to `false`, reports queue depth through the new - `fedify.queue.depth` gauge, and adds explicit low-latency buckets to - the signature verification duration histogram. [[#744], [#782], [#787]] + to server-configured benchmark sink recipients. Benchmark mode also + defaults `allowPrivateAddress` to `true` when built-in loaders are used, + defaults `signatureTimeWindow` to `false`, reports queue depth through + the new `fedify.queue.depth` gauge, and adds explicit low-latency + buckets to the signature verification duration histogram. + [[#744], [#782], [#787]] - Added OpenTelemetry metrics for ActivityPub fanout and activity lifecycle events, complementing the per-recipient diff --git a/docs/manual/benchmarking.md b/docs/manual/benchmarking.md index 742c474ef..423c7ae4e 100644 --- a/docs/manual/benchmarking.md +++ b/docs/manual/benchmarking.md @@ -23,7 +23,8 @@ measurements without a separate metrics backend. Enabling benchmark mode ----------------------- -Set `benchmarkMode: true` when creating the `Federation` object: +Enable `benchmarkMode` when creating the `Federation` object. If you use the +benchmark trigger endpoint, configure the sink inboxes on the server: ~~~~ typescript twoslash import type { KvStore } from "@fedify/fedify"; @@ -34,7 +35,9 @@ const federation = createFederation({ // ---cut-start--- kv: null as unknown as KvStore, // ---cut-end--- - benchmarkMode: true, + benchmarkMode: { + triggerSinks: ["https://sink.example/inbox"], + }, }); ~~~~ @@ -65,12 +68,14 @@ const meterProvider = null as unknown as MeterProvider; // ---cut-end--- import { createFederation } from "@fedify/fedify"; -const benchmarkMode = process.env.FEDIFY_BENCHMARK === "1"; +const benchmarkEnabled = process.env.FEDIFY_BENCHMARK === "1"; const federation = createFederation({ kv, - benchmarkMode, - meterProvider: benchmarkMode ? undefined : meterProvider, + benchmarkMode: benchmarkEnabled + ? { triggerSinks: ["https://sink.example/inbox"] } + : false, + meterProvider: benchmarkEnabled ? undefined : meterProvider, }); ~~~~ @@ -108,7 +113,6 @@ The request body has this shape: ~~~~ json { "sender": { "identifier": "alice" }, - "sinks": ["https://sink.example/inbox"], "recipients": [ { "@context": "https://www.w3.org/ns/activitystreams", @@ -136,9 +140,12 @@ The `sender` must be either `{ "identifier": string }` or have `id` and `inbox` properties. The activity is parsed as an ActivityPub `Activity`. -By default, every recipient inbox must appear in the `sinks` list. This keeps -benchmark traffic pointed at benchmark sink inboxes. To bypass this guard for -a controlled run, set `"allowUnsafeRecipients": true`. +By default, every recipient inbox must appear in the server-configured +`~FederationBenchmarkOptions.triggerSinks` list. This keeps benchmark traffic +pointed at benchmark sink inboxes and prevents callers from choosing their own +allowlist. To bypass this guard for a controlled run, set +`~FederationBenchmarkOptions.allowUnsafeTriggerRecipients` to `true` in the +application configuration. A successful trigger returns `202 Accepted`: diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts index 86097bd39..7f998efe0 100644 --- a/packages/fedify/src/federation/bench.ts +++ b/packages/fedify/src/federation/bench.ts @@ -105,6 +105,7 @@ export async function handleBenchmarkStats( export async function handleBenchmarkTrigger( request: Request, context: Context, + options: BenchmarkTriggerOptions = {}, ): Promise { if (request.method !== "POST") { return new Response("Method not allowed", { @@ -121,13 +122,14 @@ export async function handleBenchmarkTrigger( try { const body = asRecord(json, "request body"); const sender = parseSender(body.sender); - const sinks = parseSinks(body.sinks); const recipients = await parseRecipients(body.recipients, context); const activity = await parseActivity(body.activity, context); const inboxes = extractInboxes({ recipients }); const inboxUrls = Object.keys(inboxes); - const unsafeInboxes = inboxUrls.filter((inbox) => !sinks.has(inbox)); - if (unsafeInboxes.length > 0 && body.allowUnsafeRecipients !== true) { + const unsafeInboxes = options.allowUnsafeRecipients + ? [] + : inboxUrls.filter((inbox) => !options.sinks?.has(inbox)); + if (unsafeInboxes.length > 0) { return jsonResponse( { error: "unsafe_recipient", @@ -154,6 +156,11 @@ export async function handleBenchmarkTrigger( } } +export interface BenchmarkTriggerOptions { + readonly sinks?: ReadonlySet; + readonly allowUnsafeRecipients?: boolean; +} + class BenchmarkTriggerError extends Error { constructor(message: string, readonly status = 400) { super(message); @@ -175,22 +182,6 @@ function parseSender(value: unknown): BenchmarkSender { ); } -function parseSinks(value: unknown): Set { - if (!Array.isArray(value)) { - throw new BenchmarkTriggerError("sinks must be an array of inbox URLs."); - } - return new Set(value.map((sink) => { - if (typeof sink !== "string") { - throw new BenchmarkTriggerError("sinks must contain only URL strings."); - } - try { - return new URL(sink).href; - } catch { - throw new BenchmarkTriggerError("sinks must contain only valid URLs."); - } - })); -} - async function parseRecipients( value: unknown, context: Context, diff --git a/packages/fedify/src/federation/builder.ts b/packages/fedify/src/federation/builder.ts index 7e10c1909..6aa15016b 100644 --- a/packages/fedify/src/federation/builder.ts +++ b/packages/fedify/src/federation/builder.ts @@ -199,7 +199,7 @@ export class FederationBuilderImpl * Builds the federation object. * @param options Parameters for initializing the federation object. * @returns The federation object. - * @throws {TypeError} If `benchmarkMode: true` and `meterProvider` are both + * @throws {TypeError} If benchmark mode and `meterProvider` are both * specified. */ async build( diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index 4517bb270..e1d5e9d40 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -804,7 +804,7 @@ export interface FederationBuilder * Builds the federation object. * @param options Parameters for initializing the federation object. * @returns The federation object. - * @throws {TypeError} If `benchmarkMode: true` and `meterProvider` are both + * @throws {TypeError} If benchmark mode and `meterProvider` are both * specified. */ build( @@ -849,6 +849,27 @@ export interface InboxChallengePolicy { nonceTtlSeconds?: number; } +/** + * Options for cooperative benchmark mode. + * @since 2.3.0 + */ +export interface FederationBenchmarkOptions { + /** + * Server-controlled inbox URLs that the benchmark trigger endpoint may + * deliver to. + */ + triggerSinks?: readonly (string | URL)[]; + + /** + * Whether the benchmark trigger endpoint may deliver to recipients outside + * {@link FederationBenchmarkOptions.triggerSinks}. + * + * Do not enable this option unless the benchmark endpoint is only reachable + * by a trusted benchmark controller. + */ + allowUnsafeTriggerRecipients?: boolean; +} + /** * Options for creating a {@link Federation} object. * @template TContextData The context data to pass to the {@link Context}. @@ -940,7 +961,8 @@ export interface FederationOptions { /** * Whether to enable cooperative benchmark mode. This mode exposes * benchmark-only endpoints and relaxes selected defaults for benchmark - * targets. Do not enable this option in production. + * targets. Pass an object to configure benchmark trigger delivery. + * Do not enable this option in production. * * When enabled, {@link FederationOptions.allowPrivateAddress} defaults to * `true` unless {@link FederationOptions.documentLoaderFactory} or @@ -950,7 +972,7 @@ export interface FederationOptions { * Turned off by default. * @since 2.3.0 */ - benchmarkMode?: boolean; + benchmarkMode?: boolean | FederationBenchmarkOptions; /** * Options for making `User-Agent` strings for HTTP requests. diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 8023c6f5a..093dad67a 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -136,6 +136,16 @@ test("createFederation()", async (t) => { assertEquals(federation.signatureTimeWindow, { minutes: 10 }); }); + await t.step("benchmarkMode tolerates calendar time windows", () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + signatureTimeWindow: { months: 1 }, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.signatureTimeWindow, { months: 1 }); + }); + await t.step("benchmarkMode leaves custom loader factories alone", () => { const federation = createFederation({ kv, @@ -471,7 +481,9 @@ test("createFederation() registers queue depth for regular metrics", async () => }); test("benchmarkMode trigger endpoint", async (t) => { - const createTriggerTarget = () => { + const createTriggerTarget = ( + options: { allowUnsafeTriggerRecipients?: boolean } = {}, + ) => { const messages: OutboxMessage[] = []; const queue: MessageQueue = { enqueue(message: OutboxMessage) { @@ -484,7 +496,10 @@ test("benchmarkMode trigger endpoint", async (t) => { }; const federation = createFederation({ kv: new MemoryKvStore(), - benchmarkMode: true, + benchmarkMode: { + triggerSinks: ["https://sink.example/inbox"], + allowUnsafeTriggerRecipients: options.allowUnsafeTriggerRecipients, + }, contextLoaderFactory: () => mockDocumentLoader, queue: { outbox: queue }, }); @@ -511,7 +526,7 @@ test("benchmarkMode trigger endpoint", async (t) => { } = {}, ) => ({ sender: { identifier: "alice" }, - sinks: options.sinks ?? ["https://sink.example/inbox"], + sinks: options.sinks, recipients: [ { "@context": "https://www.w3.org/ns/activitystreams", @@ -560,26 +575,29 @@ test("benchmarkMode trigger endpoint", async (t) => { }); }); - await t.step("rejects recipients outside the sink list", async () => { - const { federation, messages } = createTriggerTarget(); - const response = await federation.fetch( - new Request("https://example.com/.well-known/fedify/bench/trigger", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify( - await createTriggerBody({ - recipientInbox: "https://not-a-sink.example/inbox", - }), - ), - }), - { contextData: undefined }, - ); - assertEquals(response.status, 403); - assertEquals(messages, []); - }); + await t.step( + "rejects recipients outside configured trigger sinks", + async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify( + await createTriggerBody({ + recipientInbox: "https://not-a-sink.example/inbox", + }), + ), + }), + { contextData: undefined }, + ); + assertEquals(response.status, 403); + assertEquals(messages, []); + }, + ); await t.step( - "allows unsafe recipients only with an explicit override", + "does not trust request-provided trigger sinks or bypasses", async () => { const { federation, messages } = createTriggerTarget(); const response = await federation.fetch( @@ -589,12 +607,36 @@ test("benchmarkMode trigger endpoint", async (t) => { body: JSON.stringify( await createTriggerBody({ recipientInbox: "https://not-a-sink.example/inbox", + sinks: ["https://not-a-sink.example/inbox"], allowUnsafeRecipients: true, }), ), }), { contextData: undefined }, ); + assertEquals(response.status, 403); + assertEquals(messages, []); + }, + ); + + await t.step( + "allows unsafe recipients only with a server override", + async () => { + const { federation, messages } = createTriggerTarget({ + allowUnsafeTriggerRecipients: true, + }); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify( + await createTriggerBody({ + recipientInbox: "https://not-a-sink.example/inbox", + }), + ), + }), + { contextData: undefined }, + ); assertEquals(response.status, 202); assertEquals(messages.length, 1); assertEquals(messages[0].inbox, "https://not-a-sink.example/inbox"); diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 380c269a5..fbc45fb6d 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -59,6 +59,7 @@ import { handleNodeInfo, handleNodeInfoJrd } from "../nodeinfo/handler.ts"; import type { JsonValue, NodeInfo } from "../nodeinfo/types.ts"; import { type BenchmarkMetricReader, + type BenchmarkTriggerOptions, createBenchmarkMeterProvider, handleBenchmarkStats, handleBenchmarkTrigger, @@ -111,6 +112,7 @@ import type { import type { ConstructorWithTypeId, Federation, + FederationBenchmarkOptions, FederationFetchOptions, FederationOptions, FederationStartQueueOptions, @@ -259,9 +261,14 @@ function getBenchmarkRelaxations( secureDefaultSeconds: 3600, }); } else { - const seconds = Temporal.Duration.from(signatureTimeWindow).total({ - unit: "seconds", - }); + let seconds: number; + try { + seconds = Temporal.Duration.from(signatureTimeWindow).total({ + unit: "seconds", + }); + } catch { + return relaxations; + } if (seconds !== 3600) { relaxations.push({ protection: "http_signature_time_window", @@ -292,6 +299,23 @@ function formatBenchmarkRelaxations( }).join("; "); } +function getBenchmarkTriggerOptions( + benchmarkOptions: FederationBenchmarkOptions, +): BenchmarkTriggerOptions { + const sinks = benchmarkOptions.triggerSinks?.map((sink) => { + try { + return new URL(sink).href; + } catch { + throw new TypeError("benchmarkMode.triggerSinks must contain only URLs."); + } + }); + return { + sinks: sinks == null ? undefined : new Set(sinks), + allowUnsafeRecipients: + benchmarkOptions.allowUnsafeTriggerRecipients === true, + }; +} + function maxDelay( first: Temporal.Duration, second: Temporal.Duration, @@ -498,7 +522,7 @@ export interface FederationOrigin { * Create a new {@link Federation} instance. * @param options Parameters for initializing the instance. * @returns A new {@link Federation} instance. - * @throws {TypeError} If `benchmarkMode: true` and `meterProvider` are both + * @throws {TypeError} If benchmark mode and `meterProvider` are both * specified. * @since 0.10.0 */ @@ -540,10 +564,15 @@ export class FederationImpl inboxChallengePolicy?: InboxChallengePolicy; benchmarkMode: boolean; benchmarkMetricReader?: BenchmarkMetricReader; + benchmarkTriggerOptions: BenchmarkTriggerOptions; constructor(options: FederationOptions) { super(); - const benchmarkMode = options.benchmarkMode ?? false; + const benchmarkMode = options.benchmarkMode != null && + options.benchmarkMode !== false; + const benchmarkOptions = typeof options.benchmarkMode === "object" + ? options.benchmarkMode + : {}; const hasCustomLoaderFactory = options.documentLoaderFactory != null || options.contextLoaderFactory != null; const allowPrivateAddress = options.allowPrivateAddress ?? @@ -571,6 +600,9 @@ export class FederationImpl ); } this.benchmarkMode = benchmarkMode; + this.benchmarkTriggerOptions = benchmarkMode + ? getBenchmarkTriggerOptions(benchmarkOptions) + : {}; this.kv = options.kv; this.kvPrefixes = { ...({ @@ -2451,7 +2483,11 @@ export class FederationImpl case "benchmarkStats": return await handleBenchmarkStats(request, this.benchmarkMetricReader!); case "benchmarkTrigger": - return await handleBenchmarkTrigger(request, context); + return await handleBenchmarkTrigger( + request, + context, + this.benchmarkTriggerOptions, + ); } // Routes that require JSON-LD Accepts header: From d831b2a5dc7d26a931e5fb67e4903228c15d92dc Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 20:16:17 +0900 Subject: [PATCH 15/20] Parallelize queue depth snapshots Collect queue depth snapshots in parallel so a slow queue backend does not block other queue depth observations during an OpenTelemetry collection pass. Add a regression test that verifies the fast queue starts while another queue is still pending. https://github.com/fedify-dev/fedify/pull/787#discussion_r3340678318 Assisted-by: Codex:gpt-5.5 --- .../fedify/src/federation/metrics.test.ts | 56 +++++++++++++++++++ packages/fedify/src/federation/metrics.ts | 9 +-- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts index 309286e5d..c0bd54d6a 100644 --- a/packages/fedify/src/federation/metrics.test.ts +++ b/packages/fedify/src/federation/metrics.test.ts @@ -201,6 +201,62 @@ test("registerQueueDepthGauge() skips unavailable depth snapshots", async () => } }); +test("registerQueueDepthGauge() queries queue depths in parallel", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + let releaseSlowDepth: ((depth: MessageQueueDepth) => void) | undefined; + let fastDepthStarted = false; + try { + const slowQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return new Promise((resolve) => { + releaseSlowDepth = resolve; + }); + }, + }; + const fastQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + fastDepthStarted = true; + return Promise.resolve({ queued: 5 }); + }, + }; + + registerQueueDepthGauge(meterProvider, [ + { role: "inbox", queue: slowQueue }, + { role: "outbox", queue: fastQueue }, + ]); + + const collection = reader.collect(); + await Promise.resolve(); + assertEquals(fastDepthStarted, true); + releaseSlowDepth?.({ queued: 3 }); + + const result = await collection; + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + assertEquals( + queueDepth?.dataPoints.map((point) => point.value).sort(), + [3, 5], + ); + } finally { + releaseSlowDepth?.({ queued: 0 }); + await meterProvider.shutdown(); + } +}); + test("recordInboxActivity() records counter with result and activity type", () => { const [meterProvider, recorder] = createTestMeterProvider(); for ( diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index ad2db15c4..d1c62fc70 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -1220,16 +1220,17 @@ export function registerQueueDepthGauge( } } if (uniqueQueues.size < 1) return; + const queueEntries = Array.from(uniqueQueues.entries()); const gauge = getFederationMetrics(meterProvider).queueDepth; gauge.addCallback(async (observableResult) => { - for (const [queue, roles] of uniqueQueues) { + await Promise.all(queueEntries.map(async ([queue, roles]) => { let depth; try { depth = await queue.getDepth!(); } catch { - continue; + return; } - if (depth == null) continue; + if (depth == null) return; const attributes = buildQueueDepthAttributes(queue, roles); observableResult.observe(depth.queued, { ...attributes, @@ -1247,7 +1248,7 @@ export function registerQueueDepthGauge( "fedify.queue.depth.state": "delayed", }); } - } + })); }); } From a0174f84a7b81f603d6b7cf9e1060be98715ba2d Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 20:17:49 +0900 Subject: [PATCH 16/20] Reject empty benchmark triggers Reject trigger requests that produce no recipient inboxes so benchmark clients get immediate feedback instead of a successful no-op. Parse recipients in parallel as part of the trigger request so remote document or context work does not serialize across recipients. https://github.com/fedify-dev/fedify/pull/787#discussion_r3340678287 https://github.com/fedify-dev/fedify/pull/787#discussion_r3340678310 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/bench.ts | 13 +++++++----- .../fedify/src/federation/middleware.test.ts | 21 ++++++++++++++++++- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts index 7f998efe0..f75dfd767 100644 --- a/packages/fedify/src/federation/bench.ts +++ b/packages/fedify/src/federation/bench.ts @@ -126,6 +126,11 @@ export async function handleBenchmarkTrigger( const activity = await parseActivity(body.activity, context); const inboxes = extractInboxes({ recipients }); const inboxUrls = Object.keys(inboxes); + if (inboxUrls.length < 1) { + throw new BenchmarkTriggerError( + "No valid recipient inboxes found. The recipients list must not be empty.", + ); + } const unsafeInboxes = options.allowUnsafeRecipients ? [] : inboxUrls.filter((inbox) => !options.sinks?.has(inbox)); @@ -189,8 +194,7 @@ async function parseRecipients( if (!Array.isArray(value)) { throw new BenchmarkTriggerError("recipients must be an array."); } - const recipients: Recipient[] = []; - for (const item of value) { + return await Promise.all(value.map(async (item) => { let object: VocabObject; try { object = await VocabObject.fromJsonLd(item, { @@ -213,9 +217,8 @@ async function parseRecipients( "each recipient must have id and inbox properties.", ); } - recipients.push(recipient); - } - return recipients; + return recipient; + })); } function isRecipient(value: unknown): value is Recipient { diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 093dad67a..8d4548eb3 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -521,13 +521,14 @@ test("benchmarkMode trigger endpoint", async (t) => { const createTriggerBody = async ( options: { recipientInbox?: string; + recipients?: unknown[]; sinks?: string[]; allowUnsafeRecipients?: boolean; } = {}, ) => ({ sender: { identifier: "alice" }, sinks: options.sinks, - recipients: [ + recipients: options.recipients ?? [ { "@context": "https://www.w3.org/ns/activitystreams", type: "Service", @@ -575,6 +576,24 @@ test("benchmarkMode trigger endpoint", async (t) => { }); }); + await t.step("rejects empty recipient lists", async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(await createTriggerBody({ recipients: [] })), + }), + { contextData: undefined }, + ); + assertEquals(response.status, 400); + assertEquals(await response.json(), { + error: + "No valid recipient inboxes found. The recipients list must not be empty.", + }); + assertEquals(messages, []); + }); + await t.step( "rejects recipients outside configured trigger sinks", async () => { From 968d687a486715509964773d83ce95536e3f7f23 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 22:34:18 +0900 Subject: [PATCH 17/20] Document benchmark response shapes Add JSDoc for the benchmark metric snapshot and endpoint helpers so the new public exports describe their response shapes and collection behavior. Also make the trigger response explicitly include a nullable queue correlation field, matching the current Context.sendActivity() API which does not return a queue or batch identifier. Assisted-by: Codex:gpt-5.5 --- docs/manual/benchmarking.md | 1 + packages/fedify/src/federation/bench.ts | 83 +++++++++++++++++++ packages/fedify/src/federation/metrics.ts | 12 ++- .../fedify/src/federation/middleware.test.ts | 2 + 4 files changed, 97 insertions(+), 1 deletion(-) diff --git a/docs/manual/benchmarking.md b/docs/manual/benchmarking.md index 423c7ae4e..535bdf2ca 100644 --- a/docs/manual/benchmarking.md +++ b/docs/manual/benchmarking.md @@ -153,6 +153,7 @@ A successful trigger returns `202 Accepted`: { "version": 1, "activityId": "https://example.com/activities/bench-1", + "queueCorrelationId": null, "recipientCount": 1, "inboxCount": 1 } diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts index f75dfd767..fe44dca6b 100644 --- a/packages/fedify/src/federation/bench.ts +++ b/packages/fedify/src/federation/bench.ts @@ -14,6 +14,7 @@ import { extractInboxes } from "./send.ts"; /** * Metric reader owned by `benchmarkMode`. + * @since 2.3.0 */ export class BenchmarkMetricReader extends MetricReader { protected onShutdown(): Promise { @@ -25,6 +26,11 @@ export class BenchmarkMetricReader extends MetricReader { } } +/** + * Creates the in-process OpenTelemetry meter provider used by benchmark mode. + * @returns The meter provider and the metric reader attached to it. + * @since 2.3.0 + */ export function createBenchmarkMeterProvider(): { readonly meterProvider: MeterProvider; readonly reader: BenchmarkMetricReader; @@ -36,44 +42,92 @@ export function createBenchmarkMeterProvider(): { }; } +/** + * A serialized snapshot of all benchmark-mode OpenTelemetry metrics. + * + * The `scopeMetrics` field contains the collected metrics grouped by + * instrumentation scope. The `errors` field contains stringified collection + * errors reported by the metric reader. + * @since 2.3.0 + */ export interface BenchmarkMetricSnapshot { + /** The schema version of this snapshot shape. */ readonly version: 1; + /** The snapshot source. Always `"server"` for Fedify benchmark targets. */ readonly source: "server"; + /** The ISO 8601 time when the snapshot was generated. */ readonly generatedAt: string; + /** Metrics grouped by OpenTelemetry instrumentation scope. */ readonly scopeMetrics: readonly BenchmarkScopeMetrics[]; + /** Stringified metric collection errors, if any. */ readonly errors: readonly string[]; } +/** + * Metrics collected from one OpenTelemetry instrumentation scope. + * @since 2.3.0 + */ export interface BenchmarkScopeMetrics { + /** The OpenTelemetry instrumentation scope descriptor. */ readonly scope: { + /** The instrumentation scope name. */ readonly name: string; + /** The instrumentation scope version, if provided. */ readonly version?: string; }; + /** The metrics emitted by the scope. */ readonly metrics: readonly BenchmarkMetric[]; } +/** + * A serialized OpenTelemetry metric in a benchmark snapshot. + * @since 2.3.0 + */ export interface BenchmarkMetric { + /** The OpenTelemetry metric name. */ readonly name: string; + /** The OpenTelemetry metric description. */ readonly description: string; + /** The OpenTelemetry metric unit, such as `ms` or `{count}`. */ readonly unit: string; + /** The metric data point kind. */ readonly dataPointType: | "histogram" | "exponential_histogram" | "gauge" | "sum"; + /** The serialized data points for the metric. */ readonly dataPoints: readonly BenchmarkDataPoint[]; } +/** + * A serialized OpenTelemetry metric data point. + * + * The timestamp fields use OpenTelemetry high-resolution time tuples. + * Histogram values preserve their SDK histogram shape, including bucket + * boundaries and counts. + * @since 2.3.0 + */ export interface BenchmarkDataPoint { + /** The metric attributes attached to the data point. */ readonly attributes: Record; + /** The OpenTelemetry data point start time. */ readonly startTime: readonly [number, number]; + /** The OpenTelemetry data point end time. */ readonly endTime: readonly [number, number]; + /** The data point value or histogram payload. */ readonly value: | number | Histogram | ExponentialHistogram; } +/** + * Collects and serializes benchmark-mode metrics from a benchmark reader. + * @param reader The benchmark metric reader to collect from. + * @returns A server metric snapshot with any collection errors stringified. + * @since 2.3.0 + */ export async function collectBenchmarkMetrics( reader: BenchmarkMetricReader, ): Promise { @@ -87,6 +141,13 @@ export async function collectBenchmarkMetrics( }; } +/** + * Handles `GET /.well-known/fedify/bench/stats`. + * @param request The HTTP request to handle. + * @param reader The benchmark metric reader to collect from. + * @returns A JSON metric snapshot response, or `405 Method Not Allowed`. + * @since 2.3.0 + */ export async function handleBenchmarkStats( request: Request, reader: BenchmarkMetricReader, @@ -102,6 +163,18 @@ export async function handleBenchmarkStats( }); } +/** + * Handles `POST /.well-known/fedify/bench/trigger`. + * + * The handler validates a benchmark trigger request, checks recipients against + * server-controlled trigger options, and calls `Context.sendActivity()` to use + * the target's normal outbox path. + * @param request The HTTP request to handle. + * @param context The Fedify context used to resolve actors and send activity. + * @param options Server-controlled benchmark trigger delivery options. + * @returns A JSON response describing the sent activity, or a validation error. + * @since 2.3.0 + */ export async function handleBenchmarkTrigger( request: Request, context: Context, @@ -148,6 +221,7 @@ export async function handleBenchmarkTrigger( { version: 1, activityId: activity.id?.href ?? null, + queueCorrelationId: null, recipientCount: recipients.length, inboxCount: inboxUrls.length, }, @@ -161,8 +235,17 @@ export async function handleBenchmarkTrigger( } } +/** + * Server-controlled options for benchmark trigger delivery. + * @since 2.3.0 + */ export interface BenchmarkTriggerOptions { + /** Inbox URLs that the trigger endpoint may deliver to. */ readonly sinks?: ReadonlySet; + /** + * Whether recipients outside {@link BenchmarkTriggerOptions.sinks} may be + * used. + */ readonly allowUnsafeRecipients?: boolean; } diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index d1c62fc70..6c8735472 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -94,11 +94,21 @@ export interface QueueTaskCommonAttributes { } /** - * A queue to observe for `fedify.queue.depth`. + * An entry for observing one queue role in `fedify.queue.depth`. + * + * This public API is used by {@link registerQueueDepthGauge()} to associate a + * queue depth source with the task role it represents. * @since 2.3.0 */ export interface QueueDepthGaugeEntry { + /** + * The task role whose queue depth is observed. + */ role: QueueTaskRole; + + /** + * The message queue to observe, or `undefined` when the role has no queue. + */ queue?: MessageQueue; } diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 8d4548eb3..51ab397bb 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -677,11 +677,13 @@ test("benchmarkMode trigger endpoint", async (t) => { const body = await response.json() as { version: number; activityId: string; + queueCorrelationId: string | null; recipientCount: number; inboxCount: number; }; assertEquals(body.version, 1); assertEquals(body.activityId, "https://example.com/activities/bench-1"); + assertEquals(body.queueCorrelationId, null); assertEquals(body.recipientCount, 1); assertEquals(body.inboxCount, 1); assertEquals(messages.length, 1); From 42a7c52a05a1f27c25ea3eb273cfa88e09d02352 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 22:55:24 +0900 Subject: [PATCH 18/20] Return benchmark queue correlation ids Use the triggered activity ID as the benchmark queue correlation handle so successful trigger responses return a value that is also preserved on queued fanout and outbox work. https://github.com/fedify-dev/fedify/pull/787#discussion_r3341629912 Assisted-by: Codex:gpt-5.5 --- docs/manual/benchmarking.md | 5 ++++- packages/fedify/src/federation/bench.ts | 8 ++++++-- packages/fedify/src/federation/middleware.test.ts | 8 ++++++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/docs/manual/benchmarking.md b/docs/manual/benchmarking.md index 535bdf2ca..022ba37a1 100644 --- a/docs/manual/benchmarking.md +++ b/docs/manual/benchmarking.md @@ -153,12 +153,15 @@ A successful trigger returns `202 Accepted`: { "version": 1, "activityId": "https://example.com/activities/bench-1", - "queueCorrelationId": null, + "queueCorrelationId": "https://example.com/activities/bench-1", "recipientCount": 1, "inboxCount": 1 } ~~~~ +The `queueCorrelationId` is the activity ID preserved on the queued fanout or +outbox work. + Metrics ------- diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts index fe44dca6b..6a0b94e35 100644 --- a/packages/fedify/src/federation/bench.ts +++ b/packages/fedify/src/federation/bench.ts @@ -197,6 +197,10 @@ export async function handleBenchmarkTrigger( const sender = parseSender(body.sender); const recipients = await parseRecipients(body.recipients, context); const activity = await parseActivity(body.activity, context); + if (activity.id == null) { + throw new BenchmarkTriggerError("activity must have an id."); + } + const activityId = activity.id.href; const inboxes = extractInboxes({ recipients }); const inboxUrls = Object.keys(inboxes); if (inboxUrls.length < 1) { @@ -220,8 +224,8 @@ export async function handleBenchmarkTrigger( return jsonResponse( { version: 1, - activityId: activity.id?.href ?? null, - queueCorrelationId: null, + activityId, + queueCorrelationId: activityId, recipientCount: recipients.length, inboxCount: inboxUrls.length, }, diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 51ab397bb..10a18368e 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -677,17 +677,21 @@ test("benchmarkMode trigger endpoint", async (t) => { const body = await response.json() as { version: number; activityId: string; - queueCorrelationId: string | null; + queueCorrelationId: string; recipientCount: number; inboxCount: number; }; assertEquals(body.version, 1); assertEquals(body.activityId, "https://example.com/activities/bench-1"); - assertEquals(body.queueCorrelationId, null); + assertEquals( + body.queueCorrelationId, + "https://example.com/activities/bench-1", + ); assertEquals(body.recipientCount, 1); assertEquals(body.inboxCount, 1); assertEquals(messages.length, 1); assertEquals(messages[0].type, "outbox"); + assertEquals(messages[0].activityId, body.queueCorrelationId); assertEquals(messages[0].inbox, "https://sink.example/inbox"); }); }); From 96d30f4c5c20b3219ad8b501549119b85f3addb0 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 23:22:00 +0900 Subject: [PATCH 19/20] Keep queue depth metrics distinct Give each Federation instance an opaque queue depth source ID so two instances sharing one MeterProvider do not emit identical gauge attribute sets. Register queue depth callbacks again when the effective global MeterProvider changes, matching the fallback used by other metrics. https://github.com/fedify-dev/fedify/pull/787#discussion_r3341744777 https://github.com/fedify-dev/fedify/pull/787#discussion_r3341759746 Assisted-by: Codex:gpt-5.5 --- docs/manual/opentelemetry.md | 5 +- packages/fedify/src/federation/metrics.ts | 19 +++- .../fedify/src/federation/middleware.test.ts | 100 +++++++++++++++++- packages/fedify/src/federation/middleware.ts | 23 +++- 4 files changed, 141 insertions(+), 6 deletions(-) diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index ff7848030..548afe2cc 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -849,7 +849,9 @@ Fedify records the following OpenTelemetry metrics: than one Fedify queue role, and `fedify.queue.roles` lists those roles as a comma-separated string. `fedify.queue.backend` and `fedify.queue.native_retrial` follow the same rules as the queue task - metrics. + metrics. `fedify.federation.instance_id` is an opaque per-Federation + instance identifier that keeps queue depth series distinct when multiple + Federation instances share one [`MeterProvider`]. The `fedify.queue.task.*` metrics describe what Fedify's workers do with queued messages. They complement the backend-side @@ -965,6 +967,7 @@ for ActivityPub: | `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | | `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | | `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | +| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"8b4e6d24-8a66-4d04-9f63-7850c99f6ed6"` | | `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | | `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | | `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 6c8735472..c4acadb35 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -112,6 +112,18 @@ export interface QueueDepthGaugeEntry { queue?: MessageQueue; } +/** + * Options for observing queue depth metrics. + * @since 2.3.0 + */ +export interface QueueDepthGaugeOptions { + /** + * An opaque source identifier to distinguish queue depth series registered on + * the same meter provider. + */ + sourceId?: string; +} + /** * The kind of ActivityPub signature verified, used as the * `activitypub.signature.kind` metric attribute. @@ -1218,6 +1230,7 @@ export function getQueueBackend(queue?: MessageQueue): string | undefined { export function registerQueueDepthGauge( meterProvider: MeterProvider, entries: readonly QueueDepthGaugeEntry[], + options: QueueDepthGaugeOptions = {}, ): void { const uniqueQueues = new Map(); for (const { role, queue } of entries) { @@ -1241,7 +1254,7 @@ export function registerQueueDepthGauge( return; } if (depth == null) return; - const attributes = buildQueueDepthAttributes(queue, roles); + const attributes = buildQueueDepthAttributes(queue, roles, options); observableResult.observe(depth.queued, { ...attributes, "fedify.queue.depth.state": "queued", @@ -1265,12 +1278,16 @@ export function registerQueueDepthGauge( function buildQueueDepthAttributes( queue: MessageQueue, roles: readonly QueueTaskRole[], + options: QueueDepthGaugeOptions, ): Attributes { const sortedRoles = [...roles].sort(); const role = sortedRoles.length === 1 ? sortedRoles[0] : "shared"; const attributes: Attributes = { "fedify.queue.role": role, }; + if (options.sourceId != null) { + attributes["fedify.federation.instance_id"] = options.sourceId; + } if (role === "shared") { attributes["fedify.queue.roles"] = sortedRoles.join(","); } diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 10a18368e..ad073b5df 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -50,7 +50,7 @@ import { rsaPublicKey3, } from "../testing/keys.ts"; import { FetchError, getDocumentLoader } from "@fedify/vocab-runtime"; -import { SpanStatusCode } from "@opentelemetry/api"; +import { metrics, SpanStatusCode } from "@opentelemetry/api"; import { DataPointType, MeterProvider, @@ -480,6 +480,104 @@ test("createFederation() registers queue depth for regular metrics", async () => } }); +test("createFederation() registers queue depth after global meterProvider is set", async () => { + metrics.disable(); + const queue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued: 8 }); + }, + }; + const federation = createFederation({ + kv: new MemoryKvStore(), + queue, + }); + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + metrics.setGlobalMeterProvider(meterProvider); + (federation as FederationImpl).meterProvider; + + const result = await reader.collect(); + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + + assertExists(queueDepth); + assertEquals(queueDepth.dataPointType, DataPointType.GAUGE); + assertEquals( + queueDepth.dataPoints.map((point) => ({ + state: point.attributes["fedify.queue.depth.state"], + role: point.attributes["fedify.queue.role"], + value: point.value, + })), + [ + { state: "queued", role: "shared", value: 8 }, + ], + ); + } finally { + metrics.disable(); + await meterProvider.shutdown(); + } +}); + +test("createFederation() distinguishes queue depth series per federation", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + const createQueue = (queued: number): MessageQueue => ({ + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued }); + }, + }); + createFederation({ + kv: new MemoryKvStore(), + meterProvider, + queue: createQueue(1), + }); + createFederation({ + kv: new MemoryKvStore(), + meterProvider, + queue: createQueue(2), + }); + + const result = await reader.collect(); + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + + assertExists(queueDepth); + const queuedPoints = queueDepth.dataPoints.filter((point) => + point.attributes["fedify.queue.depth.state"] === "queued" + ); + assertEquals( + queuedPoints.map((point) => point.value).sort(), + [1, 2], + ); + const instanceIds = queuedPoints.map((point) => + point.attributes["fedify.federation.instance_id"] + ); + assertEquals( + instanceIds.every((id) => typeof id === "string"), + true, + ); + assertEquals(new Set(instanceIds).size, 2); + } finally { + await meterProvider.shutdown(); + } +}); + test("benchmarkMode trigger endpoint", async (t) => { const createTriggerTarget = ( options: { allowUnsafeTriggerRecipients?: boolean } = {}, diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index fbc45fb6d..dfd3e426a 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -139,6 +139,7 @@ import { getRemoteHost, instrumentDocumentLoader, isAbortError, + type QueueDepthGaugeEntry, type QueueTaskCommonAttributes, type QueueTaskResult, recordCircuitBreakerStateChange, @@ -565,6 +566,9 @@ export class FederationImpl benchmarkMode: boolean; benchmarkMetricReader?: BenchmarkMetricReader; benchmarkTriggerOptions: BenchmarkTriggerOptions; + readonly #queueDepthGaugeSourceId = crypto.randomUUID(); + #queueDepthGaugeEntries: readonly QueueDepthGaugeEntry[] = []; + #queueDepthGaugeMeterProvider?: MeterProvider; constructor(options: FederationOptions) { super(); @@ -845,11 +849,14 @@ export class FederationImpl } else { this._meterProvider = options.meterProvider; } - registerQueueDepthGauge(this.meterProvider, [ + this.#queueDepthGaugeEntries = [ { role: "inbox", queue: this.inboxQueue }, { role: "outbox", queue: this.outboxQueue }, { role: "fanout", queue: this.fanoutQueue }, - ]); + ]; + this.#registerQueueDepthGauge( + this._meterProvider ?? metrics.getMeterProvider(), + ); this.firstKnock = options.firstKnock; } @@ -858,7 +865,17 @@ export class FederationImpl } get meterProvider(): MeterProvider { - return this._meterProvider ?? metrics.getMeterProvider(); + const meterProvider = this._meterProvider ?? metrics.getMeterProvider(); + this.#registerQueueDepthGauge(meterProvider); + return meterProvider; + } + + #registerQueueDepthGauge(meterProvider: MeterProvider): void { + if (meterProvider === this.#queueDepthGaugeMeterProvider) return; + registerQueueDepthGauge(meterProvider, this.#queueDepthGaugeEntries, { + sourceId: this.#queueDepthGaugeSourceId, + }); + this.#queueDepthGaugeMeterProvider = meterProvider; } _initializeRouter(): void { From b9fb66d2e8d42fd23e081ad2069f2744f8aac84e Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 2 Jun 2026 23:49:09 +0900 Subject: [PATCH 20/20] Avoid Workers global random IDs Cloudflare Workers forbids generating random values while evaluating the global scope. The queue depth gauge only needs an opaque per-process Federation identifier to keep series distinct on a shared MeterProvider, so use a local counter instead of crypto.randomUUID(). Update the OpenTelemetry attribute example so it does not imply that this identifier is a UUID. Assisted-by: Codex:gpt-5.5 --- docs/manual/opentelemetry.md | 2 +- packages/fedify/src/federation/middleware.ts | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index 548afe2cc..b0d7476bf 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -967,7 +967,7 @@ for ActivityPub: | `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | | `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | | `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | -| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"8b4e6d24-8a66-4d04-9f63-7850c99f6ed6"` | +| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"fedify-1"` | | `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | | `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | | `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index dfd3e426a..6285b3849 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -170,6 +170,7 @@ import { handleWebFinger } from "./webfinger.ts"; import { hasMalformedKnownTemporalLiteral } from "./temporal.ts"; const circuitBreakerCasWarningKvStores = new WeakSet(); +let nextQueueDepthGaugeSourceId = 0; const retryAfterHttpDate = new RegExp( "^(?:" + "(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun), \\d{2} " + @@ -566,7 +567,9 @@ export class FederationImpl benchmarkMode: boolean; benchmarkMetricReader?: BenchmarkMetricReader; benchmarkTriggerOptions: BenchmarkTriggerOptions; - readonly #queueDepthGaugeSourceId = crypto.randomUUID(); + readonly #queueDepthGaugeSourceId = `fedify-${ + (++nextQueueDepthGaugeSourceId).toString(36) + }`; #queueDepthGaugeEntries: readonly QueueDepthGaugeEntry[] = []; #queueDepthGaugeMeterProvider?: MeterProvider;