Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,4 @@ test-results

# Claude
.claude/settings.local.json
.playwright-mcp/
20 changes: 20 additions & 0 deletions packages/kernel-browser-runtime/src/PlatformServicesClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
VatId,
VatConfig,
RemoteCommsOptions,
OnIncarnationChange,
} from '@metamask/ocap-kernel';
import {
platformServicesMethodSpecs,
Expand Down Expand Up @@ -62,6 +63,8 @@ export class PlatformServicesClient implements PlatformServices {

#remoteGiveUpHandler: ((peerId: string) => void) | undefined = undefined;

#remoteIncarnationChangeHandler: OnIncarnationChange | undefined = undefined;

/**
* **ATTN:** Prefer {@link PlatformServicesClient.make} over constructing
* this class directly.
Expand Down Expand Up @@ -96,6 +99,7 @@ export class PlatformServicesClient implements PlatformServices {
this.#rpcServer = new RpcService(kernelRemoteHandlers, {
remoteDeliver: this.#remoteDeliver.bind(this),
remoteGiveUp: this.#remoteGiveUp.bind(this),
remoteIncarnationChange: this.#remoteIncarnationChange.bind(this),
});

// Start draining messages immediately after construction
Expand Down Expand Up @@ -195,6 +199,7 @@ export class PlatformServicesClient implements PlatformServices {
* @param remoteMessageHandler - A handler function to receive remote messages.
* @param onRemoteGiveUp - Optional callback to be called when we give up on a remote.
* @param incarnationId - Unique identifier for this kernel instance.
* @param onIncarnationChange - Optional callback when a remote peer's incarnation changes.
* @returns A promise that resolves once network access has been established
* or rejects if there is some problem doing so.
*/
Expand All @@ -204,9 +209,11 @@ export class PlatformServicesClient implements PlatformServices {
remoteMessageHandler: (from: string, message: string) => Promise<string>,
onRemoteGiveUp?: (peerId: string) => void,
incarnationId?: string,
onIncarnationChange?: OnIncarnationChange,
): Promise<void> {
this.#remoteMessageHandler = remoteMessageHandler;
this.#remoteGiveUpHandler = onRemoteGiveUp;
this.#remoteIncarnationChangeHandler = onIncarnationChange;
await this.#rpcClient.call('initializeRemoteComms', {
keySeed,
...Object.fromEntries(
Expand Down Expand Up @@ -297,6 +304,19 @@ export class PlatformServicesClient implements PlatformServices {
return null;
}

/**
* Handle a remote incarnation change notification from the server.
*
* @param peerId - The peer ID of the remote that restarted.
* @returns A promise that resolves when handling is complete.
*/
async #remoteIncarnationChange(peerId: string): Promise<null> {
if (this.#remoteIncarnationChangeHandler) {
this.#remoteIncarnationChangeHandler(peerId);
}
return null;
}

/**
* Send a message to the server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ describe('PlatformServicesServer', () => {
expect.any(Function),
expect.any(Function),
undefined,
expect.any(Function),
);
});

Expand All @@ -430,6 +431,7 @@ describe('PlatformServicesServer', () => {
expect.any(Function),
expect.any(Function),
undefined,
expect.any(Function),
);
});

Expand Down
18 changes: 18 additions & 0 deletions packages/kernel-browser-runtime/src/PlatformServicesServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ export class PlatformServicesServer {
this.#handleRemoteMessage.bind(this),
this.#handleRemoteGiveUp.bind(this),
incarnationId,
this.#handleRemoteIncarnationChange.bind(this),
);
this.#sendRemoteMessageFunc = sendRemoteMessage;
this.#stopRemoteCommsFunc = stop;
Expand Down Expand Up @@ -404,5 +405,22 @@ export class PlatformServicesServer {
this.#logger.error('Error notifying kernel of remote give up:', error);
});
}

/**
* Handle when a remote peer's incarnation changes (peer restarted).
* Notifies the kernel worker via RPC to reset the RemoteHandle state.
*
* @param peerId - The peer ID of the remote that restarted.
*/
#handleRemoteIncarnationChange(peerId: string): void {
this.#rpcClient
.call('remoteIncarnationChange', { peerId })
.catch((error) => {
this.#logger.error(
'Error notifying kernel of remote incarnation change:',
error,
);
});
}
}
harden(PlatformServicesServer);
33 changes: 33 additions & 0 deletions packages/nodejs/src/kernel/PlatformServices.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ describe('NodejsPlatformServices', () => {
expect.any(Function),
undefined,
undefined,
undefined,
);
});

Expand All @@ -272,6 +273,7 @@ describe('NodejsPlatformServices', () => {
expect.any(Function),
undefined,
undefined,
undefined,
);
});

Expand All @@ -296,6 +298,7 @@ describe('NodejsPlatformServices', () => {
expect.any(Function),
giveUpHandler,
undefined,
undefined,
);
});

Expand All @@ -322,6 +325,36 @@ describe('NodejsPlatformServices', () => {
expect.any(Function),
giveUpHandler,
incarnationId,
undefined,
);
});

it('initializes remote comms with onIncarnationChange callback', async () => {
const service = new NodejsPlatformServices({ workerFilePath });
const keySeed = '0x1234567890abcdef';
const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'];
const remoteHandler = vi.fn(async () => 'response');
const giveUpHandler = vi.fn();
const incarnationId = 'test-incarnation-id';
const incarnationChangeHandler = vi.fn();

await service.initializeRemoteComms(
keySeed,
{ relays },
remoteHandler,
giveUpHandler,
incarnationId,
incarnationChangeHandler,
);

const { initTransport } = await import('@metamask/ocap-kernel');
expect(initTransport).toHaveBeenCalledWith(
keySeed,
{ relays },
expect.any(Function),
giveUpHandler,
incarnationId,
incarnationChangeHandler,
);
});

Expand Down
4 changes: 4 additions & 0 deletions packages/nodejs/src/kernel/PlatformServices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
SendRemoteMessage,
StopRemoteComms,
RemoteCommsOptions,
OnIncarnationChange,
} from '@metamask/ocap-kernel';
import { initTransport } from '@metamask/ocap-kernel';
import { NodeWorkerDuplexStream } from '@metamask/streams';
Expand Down Expand Up @@ -228,6 +229,7 @@ export class NodejsPlatformServices implements PlatformServices {
* @param remoteMessageHandler - A handler function to receive remote messages.
* @param onRemoteGiveUp - Optional callback to be called when we give up on a remote.
* @param incarnationId - This kernel's incarnation ID for handshake protocol.
* @param onIncarnationChange - Optional callback when a remote peer's incarnation changes.
* @returns A promise that resolves once network access has been established
* or rejects if there is some problem doing so.
*/
Expand All @@ -237,6 +239,7 @@ export class NodejsPlatformServices implements PlatformServices {
remoteMessageHandler: (from: string, message: string) => Promise<string>,
onRemoteGiveUp?: (peerId: string) => void,
incarnationId?: string,
onIncarnationChange?: OnIncarnationChange,
): Promise<void> {
if (this.#sendRemoteMessageFunc) {
throw Error('remote comms already initialized');
Expand All @@ -254,6 +257,7 @@ export class NodejsPlatformServices implements PlatformServices {
this.#handleRemoteMessage.bind(this),
onRemoteGiveUp,
incarnationId,
onIncarnationChange,
);
this.#sendRemoteMessageFunc = sendRemoteMessage;
this.#stopRemoteCommsFunc = stop;
Expand Down
4 changes: 2 additions & 2 deletions packages/nodejs/test/e2e/remote-comms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,8 @@ describe.sequential('Remote Communications E2E', () => {
const response = kunser(result);

// The message should fail because incarnation changed.
// The handshake detects the new incarnation and triggers onRemoteGiveUp,
// which rejects pending promises with a "Remote connection lost" error.
// The handshake detects the new incarnation and triggers onIncarnationChange,
// which resets RemoteHandle state and rejects pending work.
expect(response).toBeInstanceOf(Error);
expect((response as Error).message).toMatch(/Remote connection lost/u);
},
Expand Down
1 change: 1 addition & 0 deletions packages/ocap-kernel/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type {
SendRemoteMessage,
StopRemoteComms,
RemoteCommsOptions,
OnIncarnationChange,
} from './remotes/types.ts';
export type { RemoteMessageBase } from './remotes/kernel/RemoteHandle.ts';
export {
Expand Down
67 changes: 67 additions & 0 deletions packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1166,4 +1166,71 @@ describe('RemoteHandle', () => {
expect(parsed.ack).toBeUndefined(); // No highestReceivedSeq
});
});

describe('handlePeerRestart', () => {
it('resets sequence numbers for fresh start', async () => {
const remote = makeRemote();

// Build up some state by sending and receiving messages
const promiseRRef = 'rp+3';
const resolutions: VatOneResolution[] = [
[promiseRRef, false, { body: '"resolved value"', slots: [] }],
];
await remote.deliverNotify(resolutions);
await remote.handleRemoteMessage(
JSON.stringify({
seq: 5,
method: 'deliver',
params: ['notify', resolutions],
}),
);

// Call handlePeerRestart
remote.handlePeerRestart();

// Send a new message - should start from seq=1
vi.mocked(mockRemoteComms.sendRemoteMessage).mockClear();
await remote.deliverNotify(resolutions);

const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock
.calls[0]![1];
const parsed = JSON.parse(sentString);
expect(parsed.seq).toBe(1);
// ack should not be included since highestReceivedSeq was reset to 0
expect(parsed.ack).toBeUndefined();
});

it('clears persisted sequence state', async () => {
const remote = makeRemote();

// Build up state
const promiseRRef = 'rp+3';
const resolutions: VatOneResolution[] = [
[promiseRRef, false, { body: '"resolved value"', slots: [] }],
];
await remote.deliverNotify(resolutions);

// Verify state exists before restart
expect(mockKernelStore.getRemoteSeqState(mockRemoteId)).toBeDefined();

// Call handlePeerRestart
remote.handlePeerRestart();

// Verify state was cleared
expect(mockKernelStore.getRemoteSeqState(mockRemoteId)).toBeUndefined();
});

it('rejects pending URL redemptions', async () => {
const remote = makeRemote();

// Start a redemption but don't resolve it
const redeemPromise = remote.redeemOcapURL('ocap:test@peer,relay');

// Call handlePeerRestart
remote.handlePeerRestart();

// The pending redemption should be rejected
await expect(redeemPromise).rejects.toThrow('Remote peer restarted');
});
});
});
36 changes: 36 additions & 0 deletions packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -931,4 +931,40 @@ export class RemoteHandle implements EndpointHandle {
this.#clearDelayedAck();
this.rejectPendingRedemptions('Remote connection cleanup');
}

/**
* Handle a peer restart (incarnation change).
* Resets all state for a fresh start: clears timers, rejects pending messages
* and redemptions, resets sequence numbers, and clears persisted seq state.
* Called when the handshake detects that the remote peer has restarted.
*/
handlePeerRestart(): void {
this.#logger.log(
`${this.#peerId.slice(0, 8)}:: handling peer restart, resetting state`,
);

// Clear timers
this.#clearAckTimeout();
this.#clearDelayedAck();

// Reject all pending messages - they will never be ACKed by the restarted peer
if (this.#hasPendingMessages()) {
this.#logger.log(
`${this.#peerId.slice(0, 8)}:: rejecting ${this.#getPendingCount()} pending messages due to peer restart`,
);
this.#rejectAllPending('Remote peer restarted');
}

// Reject pending URL redemptions - the remote won't have context for them
this.rejectPendingRedemptions('Remote peer restarted');

// Reset sequence numbers for fresh start
this.#nextSendSeq = 0;
this.#highestReceivedSeq = 0;
this.#startSeq = 0;
this.#retryCount = 0;

// Clear persisted sequence state
this.#kernelStore.clearRemoteSeqState(this.remoteId);
}
}
Loading
Loading