From c8d31126b47eb2563469d12ef46a834c60622cf8 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 31 Mar 2026 13:39:37 +0200 Subject: [PATCH 1/2] perf: make remote-comms timing constants configurable, optimize e2e tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add reconnectionBaseDelayMs, reconnectionMaxDelayMs, handshakeTimeoutMs, writeTimeoutMs, and ackTimeoutMs to RemoteCommsOptions so that e2e tests can use fast values instead of the production defaults (500ms-10s backoff, 10s ACK/handshake/write timeouts). Thread the new options through the full stack: RemoteCommsOptions → RPC spec → transport → ReconnectionManager/HandshakeDeps/RemoteHandle, and update the browser runtime comms-query-string serializer. Restructure the e2e test lifecycle: move the relay server to beforeAll/afterAll, remove unnecessary delay() calls, reduce waitUntilQuiescent intervals, and set fileParallelism: false in the e2e vitest config to prevent port conflicts between relay-dependent test files. Reduces remote-comms e2e suite from ~215s to ~97s (55% reduction). Closes #904 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/utils/comms-query-string.ts | 5 + .../test/e2e/remote-comms.test.ts | 196 +++++++++++++----- .../test/helpers/remote-comms.ts | 19 +- .../kernel-node-runtime/vitest.config.e2e.ts | 3 + .../src/remotes/kernel/RemoteHandle.ts | 20 +- .../src/remotes/kernel/RemoteManager.ts | 6 + .../src/remotes/platform/handshake.ts | 16 +- .../src/remotes/platform/reconnection.test.ts | 22 +- .../src/remotes/platform/reconnection.ts | 31 ++- .../src/remotes/platform/transport.ts | 19 +- packages/ocap-kernel/src/remotes/types.ts | 25 +++ .../initializeRemoteComms.ts | 25 +++ 12 files changed, 323 insertions(+), 64 deletions(-) diff --git a/packages/kernel-browser-runtime/src/utils/comms-query-string.ts b/packages/kernel-browser-runtime/src/utils/comms-query-string.ts index 382d57bf8a..48b947c875 100644 --- a/packages/kernel-browser-runtime/src/utils/comms-query-string.ts +++ b/packages/kernel-browser-runtime/src/utils/comms-query-string.ts @@ -52,6 +52,11 @@ const NUMBER_PARAM_NAMES = [ 'stalePeerTimeoutMs', 'maxMessagesPerSecond', 'maxConnectionAttemptsPerMinute', + 'reconnectionBaseDelayMs', + 'reconnectionMaxDelayMs', + 'handshakeTimeoutMs', + 'writeTimeoutMs', + 'ackTimeoutMs', ] as const satisfies readonly NumberParamKey[]; const NonNegativeInteger = min(integer(), 0); diff --git a/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts b/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts index d2b287263e..5e38b72741 100644 --- a/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts +++ b/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts @@ -8,7 +8,15 @@ import { delay } from '@ocap/repo-tools/test-utils'; import { mkdtemp, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { + describe, + it, + expect, + beforeAll, + afterAll, + beforeEach, + afterEach, +} from 'vitest'; import { makeTestKernel, runTestVats } from '../helpers/kernel.ts'; import { @@ -31,6 +39,15 @@ const NETWORK_TIMEOUT = 30_000; const relayPeerId = '12D3KooWJBDqsyHQF2MWiCdU4kdqx4zTsSTLRdShg7Ui6CRWB4uc'; const testRelays = [`/ip4/127.0.0.1/tcp/9001/ws/p2p/${relayPeerId}`]; +// Fast timing options for tests to avoid waiting on real delays +const testBackoffOptions = { + reconnectionBaseDelayMs: 10, + reconnectionMaxDelayMs: 50, + handshakeTimeoutMs: 3_000, + writeTimeoutMs: 3_000, + ackTimeoutMs: 2_000, +}; + describe.sequential('Remote Communications E2E', () => { let relay: Libp2p; let kernel1: Kernel; @@ -41,12 +58,18 @@ describe.sequential('Remote Communications E2E', () => { let kernelStore1: ReturnType; let kernelStore2: ReturnType; - beforeEach(async () => { - // Start the relay server + beforeAll(async () => { + // Start the relay server once for all tests (stateless, reusable) relay = await startRelay(console); - // Wait for relay to be fully initialized - await delay(1000); + }); + + afterAll(async () => { + if (relay) { + await relay.stop(); + } + }); + beforeEach(async () => { // Create temp directory for database files tempDir = await mkdtemp(join(tmpdir(), 'ocap-e2e-')); dbFilename1 = join(tempDir, 'kernel1.db'); @@ -69,10 +92,8 @@ describe.sequential('Remote Communications E2E', () => { afterEach(async () => { const STOP_TIMEOUT = 3000; - // Stop in parallel to speed up cleanup + // Stop kernels in parallel to speed up cleanup await Promise.all([ - relay && - stopWithTimeout(async () => relay.stop(), STOP_TIMEOUT, 'relay.stop'), kernel1 && stopWithTimeout( async () => kernel1.stop(), @@ -89,15 +110,20 @@ describe.sequential('Remote Communications E2E', () => { if (tempDir) { await rm(tempDir, { recursive: true, force: true }); } - await delay(200); }); describe('Basic Connectivity', () => { it( 'initializes remote comms on both kernels', async () => { - await kernel1.initRemoteComms({ relays: testRelays }); - await kernel2.initRemoteComms({ relays: testRelays }); + await kernel1.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); + await kernel2.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); const status1 = await kernel1.getStatus(); const status2 = await kernel2.getStatus(); @@ -120,6 +146,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); const response = await sendRemoteMessage( @@ -143,6 +170,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); const aliceToBob = await sendRemoteMessage( @@ -172,8 +200,14 @@ describe.sequential('Remote Communications E2E', () => { 'remote relationships should survive kernel restart', async () => { // Initialize remote comms - await kernel1.initRemoteComms({ relays: testRelays }); - await kernel2.initRemoteComms({ relays: testRelays }); + await kernel1.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); + await kernel2.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); // Launch client vat on kernel1 const clientConfig = makeMaasClientConfig('client1', true); @@ -219,7 +253,10 @@ describe.sequential('Remote Communications E2E', () => { await makeSQLKernelDatabase({ dbFilename: dbFilename2 }), { resetStorage: false }, ); - await serverKernel.initRemoteComms({ relays: testRelays }); + await serverKernel.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); // Tell the client to talk to the server a second time expectedCount += 1; @@ -238,7 +275,10 @@ describe.sequential('Remote Communications E2E', () => { await makeSQLKernelDatabase({ dbFilename: dbFilename1 }), { resetStorage: false }, ); - await clientKernel.initRemoteComms({ relays: testRelays }); + await clientKernel.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); // Tell the client to talk to the server a third time expectedCount += 1; @@ -269,6 +309,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); // Verify initial connectivity @@ -298,6 +339,7 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, bobConfig, + testBackoffOptions, ); // eslint-disable-next-line require-atomic-updates kernel2 = restartResult.kernel; @@ -331,7 +373,10 @@ describe.sequential('Remote Communications E2E', () => { it( 'handles connection failure to non-existent peer', async () => { - await kernel1.initRemoteComms({ relays: testRelays }); + await kernel1.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); const aliceConfig = makeRemoteVatConfig('Alice'); await launchVatAndGetURL(kernel1, aliceConfig); @@ -368,6 +413,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); const initialMessage = await sendRemoteMessage( @@ -382,8 +428,7 @@ describe.sequential('Remote Communications E2E', () => { await kernel2.stop(); // Send a message which will queue and trigger reconnection attempts - // The reconnection will use exponential backoff with base delay of 500ms - // and max delay of 10s. With jitter, delays will be randomized. + // The reconnection will use exponential backoff (with fast test config). const messagePromise = kernel1.queueMessage( aliceRef, 'sendRemoteMessage', @@ -400,6 +445,7 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, bobConfig, + testBackoffOptions, ); // eslint-disable-next-line require-atomic-updates kernel2 = restartResult.kernel; @@ -414,10 +460,8 @@ describe.sequential('Remote Communications E2E', () => { ); // Verify that reconnection took some time (indicating backoff delays) - // With exponential backoff, even with jitter, we expect at least - // one delay period (~500ms base) before reconnection succeeds - // We allow for some variance due to jitter and network timing - expect(totalReconnectTime).toBeGreaterThan(1000); + // Even with fast test backoff, kernel restart + reconnection takes measurable time + expect(totalReconnectTime).toBeGreaterThan(100); const followUpMessage = await sendRemoteMessage( kernel1, @@ -436,8 +480,14 @@ describe.sequential('Remote Communications E2E', () => { it( 'queues messages when connection is not established', async () => { - await kernel1.initRemoteComms({ relays: testRelays }); - await kernel2.initRemoteComms({ relays: testRelays }); + await kernel1.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); + await kernel2.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); const aliceConfig = makeRemoteVatConfig('Alice'); await launchVatAndGetURL(kernel1, aliceConfig); @@ -463,6 +513,7 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, bobConfig, + testBackoffOptions, ); // eslint-disable-next-line require-atomic-updates kernel2 = restartResult.kernel; @@ -492,6 +543,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); // Send multiple messages in sequence using sendSequence @@ -526,7 +578,11 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, - { maxMessagesPerSecond: 500, maxConnectionAttemptsPerMinute: 500 }, + { + maxMessagesPerSecond: 500, + maxConnectionAttemptsPerMinute: 500, + ...testBackoffOptions, + }, ); await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); @@ -551,6 +607,7 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, bobConfig, + testBackoffOptions, ); // eslint-disable-next-line require-atomic-updates kernel2 = restartResult.kernel; @@ -595,10 +652,19 @@ describe.sequential('Remote Communications E2E', () => { let kernel3: Kernel | undefined; try { - await kernel1.initRemoteComms({ relays: testRelays }); - await kernel2.initRemoteComms({ relays: testRelays }); + await kernel1.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); + await kernel2.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); kernel3 = await makeTestKernel(kernelDatabase3); - await kernel3.initRemoteComms({ relays: testRelays }); + await kernel3.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); const aliceConfig = makeRemoteVatConfig('Alice'); const bobConfigInitial = makeRemoteVatConfig('Bob'); @@ -642,6 +708,7 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, bobConfigRestart, + testBackoffOptions, ); // eslint-disable-next-line require-atomic-updates kernel2 = restartResult2.kernel; @@ -652,6 +719,7 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, charlieConfigRestart, + testBackoffOptions, ) ).kernel; @@ -703,6 +771,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); const initialMessage = await sendRemoteMessage( @@ -729,6 +798,7 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, bobConfig, + testBackoffOptions, ); // eslint-disable-next-line require-atomic-updates kernel2 = restartResult.kernel; @@ -752,6 +822,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); const initialMessage = await sendRemoteMessage( @@ -773,6 +844,7 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, bobConfig, + testBackoffOptions, ); // eslint-disable-next-line require-atomic-updates kernel2 = restartResult.kernel; @@ -800,6 +872,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); const initialMessage = await sendRemoteMessage( @@ -830,7 +903,7 @@ describe.sequential('Remote Communications E2E', () => { // Manually reconnect await kernel1.reconnectPeer(peerId2); - await delay(2000); + await delay(200); // Send message after manual reconnect - should succeed const messageAfterManualReconnect = await sendRemoteMessage( @@ -852,12 +925,24 @@ describe.sequential('Remote Communications E2E', () => { it( 'detects incarnation change when peer restarts with fresh state', async () => { - // Initialize with low retry attempts to trigger give-up on incarnation change + // Initialize with low retry attempts to trigger give-up on incarnation change. + // Use moderate backoff (not the fast test defaults) so reconnection attempts + // don't exhaust before the restarted peer is reachable. await kernel1.initRemoteComms({ relays: testRelays, maxRetryAttempts: 2, + reconnectionBaseDelayMs: 200, + reconnectionMaxDelayMs: 500, + handshakeTimeoutMs: 3_000, + writeTimeoutMs: 3_000, + }); + await kernel2.initRemoteComms({ + relays: testRelays, + reconnectionBaseDelayMs: 200, + reconnectionMaxDelayMs: 500, + handshakeTimeoutMs: 3_000, + writeTimeoutMs: 3_000, }); - await kernel2.initRemoteComms({ relays: testRelays }); const aliceConfig = makeRemoteVatConfig('Alice'); const bobConfig = makeRemoteVatConfig('Bob'); @@ -881,7 +966,10 @@ describe.sequential('Remote Communications E2E', () => { const freshKernel2 = await makeTestKernel(freshDb2); // eslint-disable-next-line require-atomic-updates kernel2 = freshKernel2; - await kernel2.initRemoteComms({ relays: testRelays }); + await kernel2.initRemoteComms({ + relays: testRelays, + ...testBackoffOptions, + }); // Launch Bob again (fresh vat, no previous state) await launchVatAndGetURL(kernel2, bobConfig); @@ -912,12 +1000,24 @@ describe.sequential('Remote Communications E2E', () => { it( 'rejects promises when remote connection is lost after max retries', async () => { - // Initialize kernel1 with a low maxRetryAttempts to trigger give-up quickly + // Initialize kernel1 with a low maxRetryAttempts to trigger give-up. + // Use moderate backoff so reconnection attempts don't exhaust before + // the message can be queued and become a pending promise. await kernel1.initRemoteComms({ relays: testRelays, - maxRetryAttempts: 1, // Only 1 retry attempt before giving up + maxRetryAttempts: 3, + reconnectionBaseDelayMs: 200, + reconnectionMaxDelayMs: 1_000, + handshakeTimeoutMs: 3_000, + writeTimeoutMs: 3_000, + }); + await kernel2.initRemoteComms({ + relays: testRelays, + reconnectionBaseDelayMs: 200, + reconnectionMaxDelayMs: 1_000, + handshakeTimeoutMs: 3_000, + writeTimeoutMs: 3_000, }); - await kernel2.initRemoteComms({ relays: testRelays }); // Set up Alice and Bob manually (can't use setupAliceAndBob as it reinitializes comms) const aliceConfig = makeRemoteVatConfig('Alice'); @@ -934,13 +1034,9 @@ describe.sequential('Remote Communications E2E', () => { // Now stop kernel2 to trigger connection loss await kernel2.stop(); - // Wait for connection loss to be detected and reconnection attempts to fail - await delay(2000); - - // Send a message that will trigger promise creation and eventual rejection - // The message will create a promise with the remote as decider (from URL redemption) - // When we give up on the remote, that promise should be rejected - // The vat should then propagate that rejection to the promise returned here + // Send a message immediately - it will be queued while reconnection is attempted. + // With maxRetryAttempts: 3, the reconnection will give up and reject + // all pending promises including this message. await expect( kernel1.queueMessage(aliceRef, 'sendRemoteMessage', [ bobURL, @@ -963,6 +1059,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); // Send a message that creates a promise with remote as decider @@ -976,7 +1073,7 @@ describe.sequential('Remote Communications E2E', () => { await kernel2.stop(); // Wait a bit for connection loss to be detected - await delay(500); + await delay(100); // Restart kernel2 quickly (before max retries, since default is infinite) // The promise should remain unresolved and resolve normally after reconnection @@ -986,12 +1083,13 @@ describe.sequential('Remote Communications E2E', () => { false, testRelays, bobConfig, + testBackoffOptions, ); // eslint-disable-next-line require-atomic-updates kernel2 = restartResult.kernel; // Wait for reconnection - await delay(2000); + await delay(200); // The message should eventually be delivered and resolved // The promise was never rejected because retries weren't exhausted @@ -1012,6 +1110,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); // Send a message to create cross-kernel object references @@ -1038,6 +1137,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); // Send a message to create cross-kernel refs @@ -1051,7 +1151,7 @@ describe.sequential('Remote Communications E2E', () => { // and allow the remote to process it and respond for (let i = 0; i < 3; i++) { await kernel1.queueMessage(aliceRef, 'ping', []); - await waitUntilQuiescent(500); + await waitUntilQuiescent(100); } // Verify communication still works after DGC @@ -1076,6 +1176,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); // Send messages in both directions to create refs on both sides @@ -1088,7 +1189,7 @@ describe.sequential('Remote Communications E2E', () => { // Trigger cranks to process the reap and allow BOYD to flow for (let i = 0; i < 3; i++) { await kernel2.queueMessage(bobRef, 'ping', []); - await waitUntilQuiescent(500); + await waitUntilQuiescent(100); } // Verify communication still works after DGC from both directions @@ -1122,6 +1223,7 @@ describe.sequential('Remote Communications E2E', () => { kernelStore1, kernelStore2, testRelays, + testBackoffOptions, ); // Send messages to establish refs on both sides @@ -1140,7 +1242,7 @@ describe.sequential('Remote Communications E2E', () => { kernel1.queueMessage(aliceRef, 'ping', []), kernel2.queueMessage(bobRef, 'ping', []), ]); - await waitUntilQuiescent(500); + await waitUntilQuiescent(100); } // Verify continued bidirectional communication works - this proves diff --git a/packages/kernel-node-runtime/test/helpers/remote-comms.ts b/packages/kernel-node-runtime/test/helpers/remote-comms.ts index 0323a758ed..ec7d0dcbe3 100644 --- a/packages/kernel-node-runtime/test/helpers/remote-comms.ts +++ b/packages/kernel-node-runtime/test/helpers/remote-comms.ts @@ -10,6 +10,12 @@ import type { import { makeTestKernel } from './kernel.ts'; +/** + * Options forwarded to initRemoteComms by restart helpers. + * Omits 'relays' which is passed as a separate parameter. + */ +type RestartRemoteCommsOptions = Omit; + /** * Extract peerId from remoteComms status, returning undefined for * disconnected state. @@ -154,16 +160,18 @@ export async function sendRemoteMessage( * @param dbFilename - The database filename to open a fresh connection to. * @param resetStorage - Whether to reset storage. * @param relays - Array of relay addresses. + * @param remoteCommsOptions - Additional options forwarded to initRemoteComms. * @returns The restarted kernel. */ export async function restartKernel( dbFilename: string, resetStorage: boolean, relays: string[], + remoteCommsOptions?: RestartRemoteCommsOptions, ): Promise { const kernelDatabase = await makeSQLKernelDatabase({ dbFilename }); const kernel = await makeTestKernel(kernelDatabase, { resetStorage }); - await kernel.initRemoteComms({ relays }); + await kernel.initRemoteComms({ relays, ...remoteCommsOptions }); return kernel; } @@ -174,6 +182,7 @@ export async function restartKernel( * @param resetStorage - Whether to reset storage. * @param relays - Array of relay addresses. * @param config - Cluster configuration for the vat. + * @param remoteCommsOptions - Additional options forwarded to initRemoteComms. * @returns Object with the restarted kernel and its ocap URL. */ export async function restartKernelAndReloadVat( @@ -181,8 +190,14 @@ export async function restartKernelAndReloadVat( resetStorage: boolean, relays: string[], config: ClusterConfig, + remoteCommsOptions?: RestartRemoteCommsOptions, ): Promise<{ kernel: Kernel; url: string }> { - const kernel = await restartKernel(dbFilename, resetStorage, relays); + const kernel = await restartKernel( + dbFilename, + resetStorage, + relays, + remoteCommsOptions, + ); const url = await launchVatAndGetURL(kernel, config); return { kernel, url }; } diff --git a/packages/kernel-node-runtime/vitest.config.e2e.ts b/packages/kernel-node-runtime/vitest.config.e2e.ts index 50fe7c16eb..a7edcddcc6 100644 --- a/packages/kernel-node-runtime/vitest.config.e2e.ts +++ b/packages/kernel-node-runtime/vitest.config.e2e.ts @@ -12,6 +12,9 @@ export default defineConfig((args) => { test: { name: 'nodejs:e2e', pool: 'forks', + // Run test files sequentially to prevent port conflicts between + // relay-dependent test suites (remote-comms, relay-connectivity, quic-transport). + fileParallelism: false, setupFiles: [ fileURLToPath( import.meta.resolve('@metamask/kernel-shims/endoify-node'), diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 5dc97f3ce8..344a43335d 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -40,6 +40,7 @@ type RemoteHandleConstructorProps = { locationHints?: string[] | undefined; logger?: Logger | undefined; onGiveUp?: ((peerId: string) => void) | undefined; + ackTimeoutMs?: number | undefined; }; type MessageDelivery = ['message', string, Message]; @@ -149,6 +150,9 @@ export class RemoteHandle implements EndpointHandle { /** Callback invoked when we give up on this remote (for promise rejection). */ readonly #onGiveUp: ((peerId: string) => void) | undefined; + /** How long to wait for ACK before retransmitting (ms). Defaults to ACK_TIMEOUT_MS. */ + readonly #ackTimeoutMs: number; + /** * Construct a new RemoteHandle instance. * @@ -161,6 +165,7 @@ export class RemoteHandle implements EndpointHandle { * @param params.locationHints - Possible contact points to reach the other end. * @param params.logger - Optional logger for diagnostic output. * @param params.onGiveUp - Optional callback when we give up on this remote. + * @param params.ackTimeoutMs - Optional ACK timeout in ms. Defaults to ACK_TIMEOUT_MS. */ // eslint-disable-next-line no-restricted-syntax private constructor({ @@ -172,6 +177,7 @@ export class RemoteHandle implements EndpointHandle { locationHints, logger, onGiveUp, + ackTimeoutMs, }: RemoteHandleConstructorProps) { this.remoteId = remoteId; this.#peerId = peerId; @@ -182,6 +188,7 @@ export class RemoteHandle implements EndpointHandle { this.#myCrankResult = { didDelivery: remoteId }; this.#logger = logger ?? new Logger(`RemoteHandle:${peerId.slice(0, 8)}`); this.#onGiveUp = onGiveUp; + this.#ackTimeoutMs = ackTimeoutMs ?? ACK_TIMEOUT_MS; } /** @@ -345,7 +352,7 @@ export class RemoteHandle implements EndpointHandle { if (this.#hasPendingMessages()) { this.#ackTimeoutHandle = setTimeout(() => { this.#handleAckTimeout(); - }, ACK_TIMEOUT_MS); + }, this.#ackTimeoutMs); } } @@ -1002,8 +1009,11 @@ export class RemoteHandle implements EndpointHandle { const { promise, resolve, reject } = makePromiseKit(); this.#pendingRedemptions.set(replyKey, [resolve, reject]); - // Set up timeout handling with AbortSignal - const timeoutSignal = AbortSignal.timeout(30_000); + // Set up timeout handling with AbortSignal. + // Use (MAX_RETRIES + 1)× ACK timeout as the redemption deadline: enough + // time for the full ACK retry cycle (initial + retransmissions) to complete. + const redemptionTimeoutMs = this.#ackTimeoutMs * (MAX_RETRIES + 1); + const timeoutSignal = AbortSignal.timeout(redemptionTimeoutMs); let abortHandler: (() => void) | undefined; const timeoutPromise = new Promise((_resolve, _reject) => { abortHandler = () => { @@ -1011,7 +1021,9 @@ export class RemoteHandle implements EndpointHandle { if (this.#pendingRedemptions.has(replyKey)) { this.#pendingRedemptions.delete(replyKey); } - _reject(new Error('URL redemption timed out after 30 seconds')); + _reject( + new Error(`URL redemption timed out after ${redemptionTimeoutMs}ms`), + ); }; timeoutSignal.addEventListener('abort', abortHandler); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index 5330f6a332..b23fd56939 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -51,6 +51,9 @@ export class RemoteManager { /** Optional mnemonic for seed derivation */ readonly #mnemonic: string | undefined; + /** Optional ACK timeout override for RemoteHandle instances */ + #ackTimeoutMs: number | undefined; + /** * Unique identifier for this kernel instance. * Used to detect when a remote peer has lost its state and reconnected. @@ -242,6 +245,8 @@ export class RemoteManager { mnemonic: options?.mnemonic ?? this.#mnemonic, }; + this.#ackTimeoutMs = mergedOptions.ackTimeoutMs; + this.#remoteComms = await initRemoteComms( this.#kernelStore, this.#platformServices, @@ -347,6 +352,7 @@ export class RemoteManager { locationHints: hints, logger: this.#logger, onGiveUp: this.#handleRemoteGiveUp.bind(this), + ackTimeoutMs: this.#ackTimeoutMs, }); this.#remotes.set(remoteId, remote); this.#remotesByPeer.set(peerId, remote); diff --git a/packages/ocap-kernel/src/remotes/platform/handshake.ts b/packages/ocap-kernel/src/remotes/platform/handshake.ts index ff95565f73..9e8ccd2627 100644 --- a/packages/ocap-kernel/src/remotes/platform/handshake.ts +++ b/packages/ocap-kernel/src/remotes/platform/handshake.ts @@ -32,6 +32,10 @@ export type HandshakeDeps = { logger: Logger; /** Set the incarnation ID for a peer. Returns true if it changed. */ setRemoteIncarnation: (peerId: string, incarnationId: string) => boolean; + /** Timeout in ms for handshake read operations. Defaults to HANDSHAKE_TIMEOUT_MS. */ + handshakeTimeoutMs?: number | undefined; + /** Timeout in ms for channel write operations. Defaults to DEFAULT_WRITE_TIMEOUT_MS. */ + writeTimeoutMs?: number | undefined; }; /** @@ -112,6 +116,8 @@ export async function performOutboundHandshake( deps: HandshakeDeps, ): Promise { const { localIncarnationId, logger, setRemoteIncarnation } = deps; + const hsTimeout = deps.handshakeTimeoutMs ?? HANDSHAKE_TIMEOUT_MS; + const wtTimeout = deps.writeTimeoutMs ?? DEFAULT_WRITE_TIMEOUT_MS; const { peerId } = channel; const shortPeerId = peerId.slice(0, 8); const shortIncarnation = localIncarnationId.slice(0, 8); @@ -127,12 +133,12 @@ export async function performOutboundHandshake( await writeWithTimeout( channel, fromString(JSON.stringify(handshakeMsg)), - DEFAULT_WRITE_TIMEOUT_MS, + wtTimeout, ); // Wait for handshakeAck logger.log(`${shortPeerId}:: waiting for handshakeAck`); - const response = await readWithTimeout(channel, HANDSHAKE_TIMEOUT_MS); + const response = await readWithTimeout(channel, hsTimeout); const parsed = JSON.parse(response); if (!isHandshakeMessage(parsed) || parsed.method !== 'handshakeAck') { @@ -164,12 +170,14 @@ export async function performInboundHandshake( deps: HandshakeDeps, ): Promise { const { localIncarnationId, logger, setRemoteIncarnation } = deps; + const hsTimeout = deps.handshakeTimeoutMs ?? HANDSHAKE_TIMEOUT_MS; + const wtTimeout = deps.writeTimeoutMs ?? DEFAULT_WRITE_TIMEOUT_MS; const { peerId } = channel; const shortPeerId = peerId.slice(0, 8); // Wait for handshake logger.log(`${shortPeerId}:: waiting for handshake`); - const message = await readWithTimeout(channel, HANDSHAKE_TIMEOUT_MS); + const message = await readWithTimeout(channel, hsTimeout); const parsed = JSON.parse(message); if (!isHandshakeMessage(parsed) || parsed.method !== 'handshake') { @@ -192,7 +200,7 @@ export async function performInboundHandshake( await writeWithTimeout( channel, fromString(JSON.stringify(ackMsg)), - DEFAULT_WRITE_TIMEOUT_MS, + wtTimeout, ); const incarnationChanged = setRemoteIncarnation(peerId, remoteIncarnationId); diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts index 1c57112c3e..2c86a65015 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts @@ -244,22 +244,38 @@ describe('ReconnectionManager', () => { // No attempts yet (attemptCount = 0) const backoff0 = manager.calculateBackoff('peer1'); - expect(calculateReconnectionBackoff).toHaveBeenCalledWith(0); + expect(calculateReconnectionBackoff).toHaveBeenCalledWith(0, undefined); expect(backoff0).toBe(50); // 100 * 2^(-1) = 50 // After first increment (attemptCount = 1) manager.incrementAttempt('peer1'); const backoff1 = manager.calculateBackoff('peer1'); - expect(calculateReconnectionBackoff).toHaveBeenCalledWith(1); + expect(calculateReconnectionBackoff).toHaveBeenCalledWith(1, undefined); expect(backoff1).toBe(100); // After second increment (attemptCount = 2) manager.incrementAttempt('peer1'); const backoff2 = manager.calculateBackoff('peer1'); - expect(calculateReconnectionBackoff).toHaveBeenCalledWith(2); + expect(calculateReconnectionBackoff).toHaveBeenCalledWith(2, undefined); expect(backoff2).toBe(200); }); + it('passes custom backoff options to calculateReconnectionBackoff', () => { + const { calculateReconnectionBackoff } = vi.mocked(kernelUtils); + const customManager = new ReconnectionManager({ + backoffBaseDelayMs: 10, + backoffMaxDelayMs: 50, + }); + + customManager.incrementAttempt('peer1'); + customManager.calculateBackoff('peer1'); + + expect(calculateReconnectionBackoff).toHaveBeenCalledWith(1, { + baseDelayMs: 10, + maxDelayMs: 50, + }); + }); + it('calculates independently for different peers', () => { manager.incrementAttempt('peer1'); manager.incrementAttempt('peer1'); diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection.ts b/packages/ocap-kernel/src/remotes/platform/reconnection.ts index 83fe586db0..1cf2654031 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection.ts @@ -35,20 +35,34 @@ export class ReconnectionManager { readonly #consecutiveErrorThreshold: number; + readonly #backoffBaseDelayMs: number | undefined; + + readonly #backoffMaxDelayMs: number | undefined; + /** * Creates a new ReconnectionManager. * * @param options - Configuration options. * @param options.consecutiveErrorThreshold - Number of consecutive identical errors * before marking a peer as permanently failed. Default is 5. Must be at least 1. + * @param options.backoffBaseDelayMs - Base delay in ms for exponential backoff. + * If not provided, uses DEFAULT_BASE_DELAY_MS (500ms). + * @param options.backoffMaxDelayMs - Maximum delay in ms for exponential backoff. + * If not provided, uses DEFAULT_MAX_DELAY_MS (10s). */ - constructor(options?: { consecutiveErrorThreshold?: number }) { + constructor(options?: { + consecutiveErrorThreshold?: number; + backoffBaseDelayMs?: number; + backoffMaxDelayMs?: number; + }) { const threshold = options?.consecutiveErrorThreshold ?? DEFAULT_CONSECUTIVE_ERROR_THRESHOLD; if (threshold < 1) { throw new Error('consecutiveErrorThreshold must be at least 1'); } this.#consecutiveErrorThreshold = threshold; + this.#backoffBaseDelayMs = options?.backoffBaseDelayMs; + this.#backoffMaxDelayMs = options?.backoffMaxDelayMs; } /** @@ -163,7 +177,20 @@ export class ReconnectionManager { */ calculateBackoff(peerId: string): number { const state = this.#getState(peerId); - return calculateReconnectionBackoff(state.attemptCount); + return calculateReconnectionBackoff( + state.attemptCount, + this.#backoffBaseDelayMs !== undefined || + this.#backoffMaxDelayMs !== undefined + ? { + ...(this.#backoffBaseDelayMs !== undefined && { + baseDelayMs: this.#backoffBaseDelayMs, + }), + ...(this.#backoffMaxDelayMs !== undefined && { + maxDelayMs: this.#backoffMaxDelayMs, + }), + } + : undefined, + ); } /** diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index a630c6c3b6..903185ced1 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -87,6 +87,10 @@ export async function initTransport( stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, maxMessagesPerSecond = DEFAULT_MESSAGE_RATE_LIMIT, maxConnectionAttemptsPerMinute = DEFAULT_CONNECTION_RATE_LIMIT, + reconnectionBaseDelayMs, + reconnectionMaxDelayMs, + handshakeTimeoutMs, + writeTimeoutMs, directTransports, allowedWsHosts, } = options; @@ -95,7 +99,16 @@ export async function initTransport( const { signal } = stopController; const logger = new Logger(); const outputError = makeErrorLogger(logger); - const reconnectionManager = new ReconnectionManager(); + const reconnectionManagerOpts: ConstructorParameters< + typeof ReconnectionManager + >[0] = {}; + if (reconnectionBaseDelayMs !== undefined) { + reconnectionManagerOpts.backoffBaseDelayMs = reconnectionBaseDelayMs; + } + if (reconnectionMaxDelayMs !== undefined) { + reconnectionManagerOpts.backoffMaxDelayMs = reconnectionMaxDelayMs; + } + const reconnectionManager = new ReconnectionManager(reconnectionManagerOpts); const peerStateManager = new PeerStateManager(logger, stalePeerTimeoutMs); const validateMessageSize = makeMessageSizeValidator(maxMessageSizeBytes); const checkConnectionLimit = makeConnectionLimitChecker( @@ -137,6 +150,8 @@ export async function initTransport( logger, setRemoteIncarnation: (peerId: string, incarnationId: string) => peerStateManager.setRemoteIncarnation(peerId, incarnationId), + handshakeTimeoutMs, + writeTimeoutMs, } : undefined; @@ -536,7 +551,7 @@ export async function initTransport( await writeWithTimeout( channel, fromString(message), - DEFAULT_WRITE_TIMEOUT_MS, + writeTimeoutMs ?? DEFAULT_WRITE_TIMEOUT_MS, ); peerStateManager.updateConnectionTime(targetPeerId); reconnectionManager.resetBackoff(targetPeerId); diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index c881e74f98..136ad2c11e 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -102,6 +102,31 @@ export type RemoteCommsOptions = { * Uses a sliding 1-minute window. */ maxConnectionAttemptsPerMinute?: number | undefined; + /** + * Base delay in milliseconds for reconnection exponential backoff (default: 500ms). + * Used as the starting delay that doubles with each subsequent attempt. + */ + reconnectionBaseDelayMs?: number | undefined; + /** + * Maximum delay in milliseconds for reconnection exponential backoff (default: 10s). + * The backoff delay is capped at this value regardless of attempt count. + */ + reconnectionMaxDelayMs?: number | undefined; + /** + * Timeout in milliseconds for handshake operations (default: 10s). + * Controls how long to wait for a handshake or handshakeAck response. + */ + handshakeTimeoutMs?: number | undefined; + /** + * Timeout in milliseconds for channel write operations (default: 10s). + * Controls how long to wait for a message to be written to a channel. + */ + writeTimeoutMs?: number | undefined; + /** + * Timeout in milliseconds for ACK before retransmitting a message (default: 10s). + * When a sent message is not acknowledged within this timeout, it will be retransmitted. + */ + ackTimeoutMs?: number | undefined; /** * Hostnames or IP addresses permitted for plain ws:// relay connections, * in addition to RFC 1918 / loopback addresses which are always allowed. diff --git a/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.ts b/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.ts index 849c457d4f..6ba0e14725 100644 --- a/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.ts +++ b/packages/ocap-kernel/src/rpc/platform-services/initializeRemoteComms.ts @@ -16,6 +16,11 @@ const initializeRemoteCommsParamsStruct = object({ maxRetryAttempts: optional(number()), maxQueue: optional(number()), allowedWsHosts: optional(array(string())), + reconnectionBaseDelayMs: optional(number()), + reconnectionMaxDelayMs: optional(number()), + handshakeTimeoutMs: optional(number()), + writeTimeoutMs: optional(number()), + ackTimeoutMs: optional(number()), incarnationId: optional(string()), }); @@ -25,6 +30,11 @@ type InitializeRemoteCommsParams = { maxRetryAttempts?: number; maxQueue?: number; allowedWsHosts?: string[]; + reconnectionBaseDelayMs?: number; + reconnectionMaxDelayMs?: number; + handshakeTimeoutMs?: number; + writeTimeoutMs?: number; + ackTimeoutMs?: number; incarnationId?: string; }; @@ -74,6 +84,21 @@ export const initializeRemoteCommsHandler: InitializeRemoteCommsHandler = { if (params.allowedWsHosts !== undefined) { options.allowedWsHosts = params.allowedWsHosts; } + if (params.reconnectionBaseDelayMs !== undefined) { + options.reconnectionBaseDelayMs = params.reconnectionBaseDelayMs; + } + if (params.reconnectionMaxDelayMs !== undefined) { + options.reconnectionMaxDelayMs = params.reconnectionMaxDelayMs; + } + if (params.handshakeTimeoutMs !== undefined) { + options.handshakeTimeoutMs = params.handshakeTimeoutMs; + } + if (params.writeTimeoutMs !== undefined) { + options.writeTimeoutMs = params.writeTimeoutMs; + } + if (params.ackTimeoutMs !== undefined) { + options.ackTimeoutMs = params.ackTimeoutMs; + } return await initializeRemoteComms( params.keySeed, options, From bdb3a4afd364640b56b30f613ab8612624344caf Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 31 Mar 2026 13:45:14 +0200 Subject: [PATCH 2/2] fix: update RemoteHandle timeout tests for dynamic redemption timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The redemption timeout is now derived from ackTimeoutMs * (MAX_RETRIES + 1) instead of a hardcoded 30s, so the expected value changes from 30_000 to 40_000 (default ACK timeout 10s × 4) and the error message format changes. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/remotes/kernel/RemoteHandle.test.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 2b5e52e231..ba6f8a9f68 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -818,7 +818,7 @@ describe('RemoteHandle', () => { vi.restoreAllMocks(); }); - it('sets up 30-second timeout using AbortSignal.timeout', async () => { + it('sets up redemption timeout derived from ACK timeout and max retries', async () => { const remote = makeRemote(); const mockOcapURL = 'ocap:test@peer'; @@ -830,9 +830,9 @@ describe('RemoteHandle', () => { const urlPromise = remote.redeemOcapURL(mockOcapURL); - // Verify AbortSignal.timeout was called with 30 seconds - expect(AbortSignal.timeout).toHaveBeenCalledWith(30_000); - expect(mockSignal?.timeoutMs).toBe(30_000); + // Default: ACK_TIMEOUT_MS (10_000) * (MAX_RETRIES (3) + 1) = 40_000 + expect(AbortSignal.timeout).toHaveBeenCalledWith(40_000); + expect(mockSignal?.timeoutMs).toBe(40_000); // Wait for sendRemoteMessage to be called await new Promise((resolve) => queueMicrotask(() => resolve())); @@ -922,9 +922,9 @@ describe('RemoteHandle', () => { // Wait for the abort handler to execute await new Promise((resolve) => queueMicrotask(() => resolve())); - // Verify the promise rejects + // Verify the promise rejects with dynamic timeout message await expect(urlPromise).rejects.toThrow( - 'URL redemption timed out after 30 seconds', + 'URL redemption timed out after 40000ms', ); // Verify cleanup happened - trying to handle a reply with the same key should fail