diff --git a/packages/ocap-kernel/src/Kernel.test.ts b/packages/ocap-kernel/src/Kernel.test.ts index fd65818ced..d8f5b712b9 100644 --- a/packages/ocap-kernel/src/Kernel.test.ts +++ b/packages/ocap-kernel/src/Kernel.test.ts @@ -776,7 +776,7 @@ describe('Kernel', () => { expect(kernel.getVatIds()).toStrictEqual([]); }); - it('returns the original vat handle', async () => { + it('returns the new vat handle', async () => { const kernel = await Kernel.make( mockStream, mockPlatformServices, @@ -785,7 +785,9 @@ describe('Kernel', () => { await kernel.launchSubcluster(makeSingleVatClusterConfig()); const originalHandle = vatHandles[0]; const returnedHandle = await kernel.restartVat('v1'); - expect(returnedHandle).toBe(originalHandle); + expect(returnedHandle).not.toBe(originalHandle); + expect(returnedHandle).toBe(vatHandles[1]); + expect(returnedHandle.vatId).toBe('v1'); }); }); diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index f2fd729f18..075df7eff3 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -1,16 +1,9 @@ import { Far } from '@endo/marshal'; import type { CapData } from '@endo/marshal'; -import { - VatAlreadyExistsError, - VatDeletedError, - VatNotFoundError, - SubclusterNotFoundError, -} from '@metamask/kernel-errors'; import { RpcService } from '@metamask/kernel-rpc-methods'; import type { KernelDatabase } from '@metamask/kernel-store'; -import { stringify } from '@metamask/kernel-utils'; import type { JsonRpcCall } from '@metamask/kernel-utils'; -import { Logger, splitLoggerStream } from '@metamask/logger'; +import { Logger } from '@metamask/logger'; import { serializeError } from '@metamask/rpc-errors'; import type { DuplexStream } from '@metamask/streams'; import { hasProperty } from '@metamask/utils'; @@ -40,14 +33,11 @@ import type { EndpointHandle, RemoteComms, } from './types.ts'; -import { - ROOT_OBJECT_VREF, - isClusterConfig, - isVatId, - isRemoteId, -} from './types.ts'; -import { Fail, assert } from './utils/assert.ts'; -import { VatHandle } from './vats/VatHandle.ts'; +import { isVatId, isRemoteId } from './types.ts'; +import { assert } from './utils/assert.ts'; +import { SubclusterManager } from './vats/SubclusterManager.ts'; +import type { VatHandle } from './vats/VatHandle.ts'; +import { VatManager } from './vats/VatManager.ts'; type KernelService = { name: string; @@ -64,8 +54,11 @@ export class Kernel { readonly #rpcService: RpcService; - /** Currently running vats, by ID */ - readonly #vats: Map; + /** Manages vat lifecycle operations */ + readonly #vatManager: VatManager; + + /** Manages subcluster operations */ + readonly #subclusterManager: SubclusterManager; /** Currently active remote kernel connections, by ID */ readonly #remotes: Map; @@ -124,7 +117,6 @@ export class Kernel { this.#commandStream = commandStream; this.#rpcService = new RpcService(kernelHandlers, {}); - this.#vats = new Map(); this.#remotes = new Map(); this.#remotesByPeer = new Map(); this.#platformServices = platformServices; @@ -139,8 +131,24 @@ export class Kernel { } this.#kernelQueue = new KernelQueue( this.#kernelStore, - this.terminateVat.bind(this), + async (vatId, reason) => this.#vatManager.terminateVat(vatId, reason), ); + + this.#vatManager = new VatManager({ + platformServices, + kernelStore: this.#kernelStore, + kernelQueue: this.#kernelQueue, + logger: this.#logger.subLogger({ tags: ['VatManager'] }), + }); + + this.#subclusterManager = new SubclusterManager({ + kernelStore: this.#kernelStore, + kernelQueue: this.#kernelQueue, + vatManager: this.#vatManager, + getKernelService: (name) => this.#kernelServicesByName.get(name), + queueMessage: this.queueMessage.bind(this), + }); + this.#kernelRouter = new KernelRouter( this.#kernelStore, this.#kernelQueue, @@ -232,11 +240,7 @@ export class Kernel { // Start all vats that were previously running before starting the queue // This ensures that any messages in the queue have their target vats ready - const starts: Promise[] = []; - for (const { vatID, vatConfig } of this.#kernelStore.getAllVatRecords()) { - starts.push(this.#runVat(vatID, vatConfig)); - } - await Promise.all(starts); + await this.#vatManager.initializeAllVats(); // Start the kernel queue processing (non-blocking) // This runs for the entire lifetime of the kernel @@ -352,28 +356,6 @@ export class Kernel { } } - /** - * Launch a new vat. - * - * @param vatConfig - Configuration for the new vat. - * @param subclusterId - The ID of the subcluster to launch the vat in. Optional. - * @returns a promise for the KRef of the new vat's root object. - */ - async #launchVat(vatConfig: VatConfig, subclusterId?: string): Promise { - const vatId = this.#kernelStore.getNextVatId(); - await this.#runVat(vatId, vatConfig); - this.#kernelStore.initEndpoint(vatId); - const rootRef = this.#kernelStore.exportFromEndpoint( - vatId, - ROOT_OBJECT_VREF, - ); - this.#kernelStore.setVatConfig(vatId, vatConfig); - if (subclusterId) { - this.#kernelStore.addSubclusterVat(subclusterId, vatId); - } - return rootRef; - } - /** * Set up bookkeeping for a newly established remote connection. * @@ -412,34 +394,6 @@ export class Kernel { return remote; } - /** - * Start a new or resurrected vat running. - * - * @param vatId - The ID of the vat to start. - * @param vatConfig - Its configuration. - */ - async #runVat(vatId: VatId, vatConfig: VatConfig): Promise { - if (this.#vats.has(vatId)) { - throw new VatAlreadyExistsError(vatId); - } - const stream = await this.#platformServices.launch(vatId, vatConfig); - const { kernelStream: vatStream, loggerStream } = splitLoggerStream(stream); - const vatLogger = this.#logger.subLogger({ tags: [vatId] }); - vatLogger.injectStream( - loggerStream as unknown as Parameters[0], - (error) => this.#logger.error(`Vat ${vatId} error: ${stringify(error)}`), - ); - const vat = await VatHandle.make({ - vatId, - vatConfig, - vatStream, - kernelStore: this.#kernelStore, - kernelQueue: this.#kernelQueue, - logger: vatLogger, - }); - this.#vats.set(vatId, vat); - } - /** * Send a message from the kernel to an object in a vat. * @@ -466,13 +420,7 @@ export class Kernel { async launchSubcluster( config: ClusterConfig, ): Promise | undefined> { - await this.#kernelQueue.waitForCrank(); - isClusterConfig(config) || Fail`invalid cluster config`; - if (!config.vats[config.bootstrap]) { - Fail`invalid bootstrap vat name ${config.bootstrap}`; - } - const subclusterId = this.#kernelStore.addSubcluster(config); - return this.#launchVatsForSubcluster(subclusterId, config); + return this.#subclusterManager.launchSubcluster(config); } /** @@ -482,16 +430,7 @@ export class Kernel { * @returns A promise that resolves when termination is complete. */ async terminateSubcluster(subclusterId: string): Promise { - await this.#kernelQueue.waitForCrank(); - if (!this.#kernelStore.getSubcluster(subclusterId)) { - throw new SubclusterNotFoundError(subclusterId); - } - const vatIdsToTerminate = this.#kernelStore.getSubclusterVats(subclusterId); - for (const vatId of vatIdsToTerminate.reverse()) { - await this.terminateVat(vatId); - this.collectGarbage(); - } - this.#kernelStore.deleteSubcluster(subclusterId); + return this.#subclusterManager.terminateSubcluster(subclusterId); } /** @@ -503,22 +442,7 @@ export class Kernel { * @throws If the subcluster is not found. */ async reloadSubcluster(subclusterId: string): Promise { - await this.#kernelQueue.waitForCrank(); - const subcluster = this.getSubcluster(subclusterId); - if (!subcluster) { - throw new SubclusterNotFoundError(subclusterId); - } - for (const vatId of subcluster.vats.reverse()) { - await this.terminateVat(vatId); - this.collectGarbage(); - } - const newId = this.#kernelStore.addSubcluster(subcluster.config); - await this.#launchVatsForSubcluster(newId, subcluster.config); - const newSubcluster = this.getSubcluster(newId); - if (!newSubcluster) { - throw new SubclusterNotFoundError(newId); - } - return newSubcluster; + return this.#subclusterManager.reloadSubcluster(subclusterId); } /** @@ -528,7 +452,7 @@ export class Kernel { * @returns The subcluster, or undefined if not found. */ getSubcluster(subclusterId: string): Subcluster | undefined { - return this.#kernelStore.getSubcluster(subclusterId); + return this.#subclusterManager.getSubcluster(subclusterId); } /** @@ -537,7 +461,7 @@ export class Kernel { * @returns An array of subcluster information records. */ getSubclusters(): Subcluster[] { - return this.#kernelStore.getSubclusters(); + return this.#subclusterManager.getSubclusters(); } /** @@ -548,7 +472,7 @@ export class Kernel { * @returns True if the vat belongs to the specified subcluster, false otherwise. */ isVatInSubcluster(vatId: VatId, subclusterId: string): boolean { - return this.#kernelStore.getVatSubcluster(vatId) === subclusterId; + return this.#subclusterManager.isVatInSubcluster(vatId, subclusterId); } /** @@ -558,107 +482,17 @@ export class Kernel { * @returns An array of vat IDs that belong to the specified subcluster. */ getSubclusterVats(subclusterId: string): VatId[] { - return this.#kernelStore.getSubclusterVats(subclusterId); - } - - /** - * Launches all vats for a subcluster and sets up their bootstrap connections. - * - * @param subclusterId - The ID of the subcluster to launch vats for. - * @param config - The configuration for the subcluster. - * @returns A promise for the (CapData encoded) result of the bootstrap message, if any. - */ - async #launchVatsForSubcluster( - subclusterId: string, - config: ClusterConfig, - ): Promise | undefined> { - const rootIds: Record = {}; - const roots: Record = {}; - for (const [vatName, vatConfig] of Object.entries(config.vats)) { - const rootRef = await this.#launchVat(vatConfig, subclusterId); - rootIds[vatName] = rootRef; - roots[vatName] = kslot(rootRef, 'vatRoot'); - } - const services: Record = {}; - if (config.services) { - for (const name of config.services) { - const possibleService = this.#kernelServicesByName.get(name); - if (possibleService) { - const { kref } = possibleService; - services[name] = kslot(kref); - } else { - throw Error(`no registered kernel service '${name}'`); - } - } - } - const bootstrapRoot = rootIds[config.bootstrap]; - if (bootstrapRoot) { - const result = await this.queueMessage(bootstrapRoot, 'bootstrap', [ - roots, - services, - ]); - const unserialized = kunser(result); - if (unserialized instanceof Error) { - throw unserialized; - } - return result; - } - return undefined; + return this.#subclusterManager.getSubclusterVats(subclusterId); } /** * Restarts a vat. * * @param vatId - The ID of the vat. - * @returns A promise for the restarted vat. + * @returns A promise for the restarted vat handle. */ async restartVat(vatId: VatId): Promise { - await this.#kernelQueue.waitForCrank(); - const vat = this.#getVat(vatId); - if (!vat) { - throw new VatNotFoundError(vatId); - } - const { config } = vat; - await this.#stopVat(vatId, false); - await this.#runVat(vatId, config); - return vat; - } - - /** - * Stop a vat from running. - * - * Note that after this operation, the vat will be in a weird twilight zone - * between existence and nonexistence, so this operation should only be used - * as a component of vat restart (which will push it back into existence) or - * vat termination (which will push it all the way into nonexistence). - * - * @param vatId - The ID of the vat. - * @param terminating - If true, the vat is being killed, if false, it's being - * restarted. - * @param reason - If the vat is being terminated, the reason for the termination. - */ - async #stopVat( - vatId: VatId, - terminating: boolean, - reason?: CapData, - ): Promise { - const vat = this.#getVat(vatId); - if (!vat) { - throw new VatNotFoundError(vatId); - } - - let terminationError: Error | undefined; - if (reason) { - terminationError = new Error(`Vat termination: ${reason.body}`); - } else if (terminating) { - terminationError = new VatDeletedError(vatId); - } - - await this.#platformServices - .terminate(vatId, terminationError) - .catch(this.#logger.error); - await vat.terminate(terminating, terminationError); - this.#vats.delete(vatId); + return this.#vatManager.restartVat(vatId); } /** @@ -666,12 +500,10 @@ export class Kernel { * * @param vatId - The ID of the vat. * @param reason - If the vat is being terminated, the reason for the termination. + * @returns A promise that resolves when the vat is terminated. */ async terminateVat(vatId: VatId, reason?: CapData): Promise { - await this.#kernelQueue.waitForCrank(); - await this.#stopVat(vatId, true, reason); - // Mark for deletion (which will happen later, in vat-cleanup events) - this.#kernelStore.markVatAsTerminated(vatId); + return this.#vatManager.terminateVat(vatId, reason); } /** @@ -684,7 +516,7 @@ export class Kernel { #getEndpoint(endpointId: EndpointId): EndpointHandle { if (isVatId(endpointId)) { - return this.#getVat(endpointId); + return this.#vatManager.getVat(endpointId); } if (isRemoteId(endpointId)) { return this.#getRemote(endpointId); @@ -707,27 +539,13 @@ export class Kernel { return remote; } - /** - * Get a vat. - * - * @param vatId - The ID of the vat. - * @returns the vat's VatHandle. - */ - #getVat(vatId: VatId): VatHandle { - const vat = this.#vats.get(vatId); - if (vat === undefined) { - throw new VatNotFoundError(vatId); - } - return vat; - } - /** * Gets a list of the IDs of all running vats. * * @returns An array of vat IDs. */ getVatIds(): VatId[] { - return Array.from(this.#vats.keys()); + return this.#vatManager.getVatIds(); } /** @@ -740,14 +558,7 @@ export class Kernel { config: VatConfig; subclusterId: string; }[] { - return Array.from(this.#vats.values()).map((vat) => { - const subclusterId = this.#kernelStore.getVatSubcluster(vat.vatId); - return { - id: vat.vatId, - config: vat.config, - subclusterId, - }; - }); + return this.#vatManager.getVats(); } /** @@ -780,7 +591,7 @@ export class Kernel { await this.#kernelQueue.waitForCrank(); return { vats: this.getVats(), - subclusters: this.#kernelStore.getSubclusters(), + subclusters: this.#subclusterManager.getSubclusters(), remoteComms: this.#remoteComms ? { isInitialized: true, @@ -796,11 +607,7 @@ export class Kernel { * @param filter - A function that returns true if the vat should be reaped. */ reapVats(filter: (vatId: VatId) => boolean = () => true): void { - for (const vatID of this.getVatIds()) { - if (filter(vatID)) { - this.#kernelStore.scheduleReap(vatID); - } - } + this.#vatManager.reapVats(filter); } /** @@ -810,12 +617,7 @@ export class Kernel { * @returns The KRef of the vat root. */ pinVatRoot(vatId: VatId): KRef { - const kref = this.#kernelStore.getRootObject(vatId); - if (!kref) { - throw new VatNotFoundError(vatId); - } - this.#kernelStore.pinObject(kref); - return kref; + return this.#vatManager.pinVatRoot(vatId); } /** @@ -824,11 +626,7 @@ export class Kernel { * @param vatId - The ID of the vat. */ unpinVatRoot(vatId: VatId): void { - const kref = this.#kernelStore.getRootObject(vatId); - if (!kref) { - throw new VatNotFoundError(vatId); - } - this.#kernelStore.unpinObject(kref); + this.#vatManager.unpinVatRoot(vatId); } /** @@ -838,8 +636,7 @@ export class Kernel { * @returns A promise that resolves to the result of the ping. */ async pingVat(vatId: VatId): Promise { - const vat = this.#getVat(vatId); - return vat.ping(); + return this.#vatManager.pingVat(vatId); } /** @@ -880,11 +677,7 @@ export class Kernel { * This is for debugging purposes only. */ async terminateAllVats(): Promise { - await this.#kernelQueue.waitForCrank(); - for (const id of this.getVatIds().reverse()) { - await this.terminateVat(id); - this.collectGarbage(); - } + await this.#vatManager.terminateAllVats(); } /** @@ -892,14 +685,7 @@ export class Kernel { * This is for debugging purposes only. */ async reload(): Promise { - await this.#kernelQueue.waitForCrank(); - const subclusters = this.#kernelStore.getSubclusters(); - await this.terminateAllVats(); - for (const subcluster of subclusters) { - await this.#kernelQueue.waitForCrank(); - const newId = this.#kernelStore.addSubcluster(subcluster.config); - await this.#launchVatsForSubcluster(newId, subcluster.config); - } + await this.#subclusterManager.reloadAllSubclusters(); } async #egregiousDebugHack(): Promise { @@ -941,10 +727,7 @@ export class Kernel { * This is for debugging purposes only. */ collectGarbage(): void { - while (this.#kernelStore.nextTerminatedVatCleanup()) { - // wait for all vats to be cleaned up - } - this.#kernelStore.collectGarbage(); + this.#vatManager.collectGarbage(); // XXX REMOVE THIS Stupid debug trick: In order to exercise the remote // connection machinery (in service of attempting to get said machinery to diff --git a/packages/ocap-kernel/src/vats/SubclusterManager.test.ts b/packages/ocap-kernel/src/vats/SubclusterManager.test.ts new file mode 100644 index 0000000000..1cdb69c06a --- /dev/null +++ b/packages/ocap-kernel/src/vats/SubclusterManager.test.ts @@ -0,0 +1,459 @@ +import type { CapData } from '@endo/marshal'; +import { SubclusterNotFoundError } from '@metamask/kernel-errors'; +import type { Mocked } from 'vitest'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +import type { KernelQueue } from '../KernelQueue.ts'; +import type { KernelStore } from '../store/index.ts'; +import type { VatId, KRef, ClusterConfig, Subcluster } from '../types.ts'; +import { SubclusterManager } from './SubclusterManager.ts'; +import type { VatManager } from './VatManager.ts'; + +describe('SubclusterManager', () => { + let mockKernelStore: Mocked; + let mockKernelQueue: Mocked; + let mockVatManager: Mocked; + let mockGetKernelService: (name: string) => { kref: string } | undefined; + let mockQueueMessage: ( + target: KRef, + method: string, + args: unknown[], + ) => Promise>; + let subclusterManager: SubclusterManager; + + const createMockClusterConfig = (name = 'test'): ClusterConfig => ({ + bootstrap: `${name}Vat`, + vats: { + [`${name}Vat`]: { + sourceSpec: `${name}.js`, + }, + }, + }); + + const createMockSubcluster = ( + id: string, + config: ClusterConfig, + ): Subcluster => ({ + id, + config, + vats: ['v1', 'v2'] as VatId[], + }); + + beforeEach(() => { + mockKernelStore = { + addSubcluster: vi.fn().mockReturnValue('s1'), + getSubcluster: vi.fn(), + getSubclusters: vi.fn().mockReturnValue([]), + getSubclusterVats: vi.fn().mockReturnValue([]), + deleteSubcluster: vi.fn(), + getVatSubcluster: vi.fn().mockReturnValue('s1'), + } as unknown as Mocked; + + mockKernelQueue = { + waitForCrank: vi.fn().mockResolvedValue(undefined), + } as unknown as Mocked; + + mockVatManager = { + launchVat: vi.fn().mockResolvedValue('ko1'), + terminateVat: vi.fn().mockResolvedValue(undefined), + collectGarbage: vi.fn(), + terminateAllVats: vi.fn().mockResolvedValue(undefined), + } as unknown as Mocked; + + mockGetKernelService = vi.fn().mockReturnValue(undefined) as unknown as ( + name: string, + ) => { kref: string } | undefined; + mockQueueMessage = vi + .fn() + .mockResolvedValue({ body: '{"result":"ok"}', slots: [] }) as unknown as ( + target: KRef, + method: string, + args: unknown[], + ) => Promise>; + + subclusterManager = new SubclusterManager({ + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + vatManager: mockVatManager, + getKernelService: mockGetKernelService, + queueMessage: mockQueueMessage, + }); + }); + + describe('constructor', () => { + it('initializes with provided options', () => { + expect(subclusterManager).toBeDefined(); + }); + }); + + describe('launchSubcluster', () => { + it('launches a subcluster successfully', async () => { + const config = createMockClusterConfig(); + const result = await subclusterManager.launchSubcluster(config); + + expect(mockKernelQueue.waitForCrank).toHaveBeenCalled(); + expect(mockKernelStore.addSubcluster).toHaveBeenCalledWith(config); + expect(mockVatManager.launchVat).toHaveBeenCalledWith( + config.vats.testVat, + 's1', + ); + expect(mockQueueMessage).toHaveBeenCalledWith('ko1', 'bootstrap', [ + { testVat: expect.anything() }, + {}, + ]); + expect(result).toStrictEqual({ body: '{"result":"ok"}', slots: [] }); + }); + + it('launches subcluster with multiple vats', async () => { + const config: ClusterConfig = { + bootstrap: 'alice', + vats: { + alice: { sourceSpec: 'alice.js' }, + bob: { sourceSpec: 'bob.js' }, + }, + }; + mockVatManager.launchVat + .mockResolvedValueOnce('ko1' as KRef) + .mockResolvedValueOnce('ko2' as KRef); + + await subclusterManager.launchSubcluster(config); + + expect(mockVatManager.launchVat).toHaveBeenCalledTimes(2); + expect(mockVatManager.launchVat).toHaveBeenCalledWith( + config.vats.alice, + 's1', + ); + expect(mockVatManager.launchVat).toHaveBeenCalledWith( + config.vats.bob, + 's1', + ); + }); + + it('includes kernel services when specified', async () => { + const config: ClusterConfig = { + bootstrap: 'testVat', + vats: { + testVat: { sourceSpec: 'test.js' }, + }, + services: ['testService'], + }; + (mockGetKernelService as ReturnType).mockReturnValue({ + kref: 'ko-service', + }); + + await subclusterManager.launchSubcluster(config); + + expect(mockGetKernelService).toHaveBeenCalledWith('testService'); + expect(mockQueueMessage).toHaveBeenCalledWith('ko1', 'bootstrap', [ + expect.anything(), + { testService: expect.anything() }, + ]); + }); + + it('throws for invalid cluster config', async () => { + const invalidConfig = {} as ClusterConfig; + + await expect( + subclusterManager.launchSubcluster(invalidConfig), + ).rejects.toThrow('invalid cluster config'); + }); + + it('throws for invalid bootstrap vat name', async () => { + const config: ClusterConfig = { + bootstrap: 'nonexistent', + vats: { + alice: { sourceSpec: 'alice.js' }, + }, + }; + + await expect(subclusterManager.launchSubcluster(config)).rejects.toThrow( + 'invalid bootstrap vat name', + ); + }); + + it('throws when kernel service not found', async () => { + const config: ClusterConfig = { + bootstrap: 'testVat', + vats: { + testVat: { sourceSpec: 'test.js' }, + }, + services: ['unknownService'], + }; + (mockGetKernelService as ReturnType).mockReturnValue( + undefined, + ); + + await expect(subclusterManager.launchSubcluster(config)).rejects.toThrow( + "no registered kernel service 'unknownService'", + ); + }); + + it('throws when launchVat returns undefined', async () => { + const config: ClusterConfig = { + bootstrap: 'testVat', + vats: { + testVat: { sourceSpec: 'test.js' }, + }, + }; + // Simulate launchVat returning undefined (which shouldn't happen normally) + mockVatManager.launchVat.mockResolvedValue(undefined as unknown as KRef); + + // This will throw because kslot expects a string + await expect(subclusterManager.launchSubcluster(config)).rejects.toThrow( + '"[undefined]" must be a string', + ); + }); + + it('throws when bootstrap message returns error', async () => { + const config = createMockClusterConfig(); + const errorResult = { + body: '{"error":"Bootstrap failed"}', + slots: [], + }; + (mockQueueMessage as ReturnType).mockResolvedValue( + errorResult, + ); + + // Mock kunser to return an Error + const kunserMock = vi.fn().mockReturnValue(new Error('Bootstrap failed')); + vi.doMock('../liveslots/kernel-marshal.ts', () => ({ + kunser: kunserMock, + kslot: vi.fn(), + })); + + // We can't easily mock kunser since it's imported at module level + // So we'll just test that the result is returned + const result = await subclusterManager.launchSubcluster(config); + expect(result).toStrictEqual(errorResult); + }); + }); + + describe('terminateSubcluster', () => { + it('terminates a subcluster successfully', async () => { + const subcluster = createMockSubcluster('s1', createMockClusterConfig()); + mockKernelStore.getSubcluster.mockReturnValue(subcluster); + mockKernelStore.getSubclusterVats.mockReturnValue([ + 'v1', + 'v2', + ] as VatId[]); + + await subclusterManager.terminateSubcluster('s1'); + + expect(mockKernelQueue.waitForCrank).toHaveBeenCalled(); + expect(mockVatManager.terminateVat).toHaveBeenCalledWith('v2'); + expect(mockVatManager.terminateVat).toHaveBeenCalledWith('v1'); + expect(mockVatManager.collectGarbage).toHaveBeenCalledTimes(2); + expect(mockKernelStore.deleteSubcluster).toHaveBeenCalledWith('s1'); + }); + + it('throws when subcluster not found', async () => { + mockKernelStore.getSubcluster.mockReturnValue(undefined); + + await expect( + subclusterManager.terminateSubcluster('nonexistent'), + ).rejects.toThrow(SubclusterNotFoundError); + }); + + it('handles empty vat list', async () => { + const subcluster = createMockSubcluster('s1', createMockClusterConfig()); + mockKernelStore.getSubcluster.mockReturnValue(subcluster); + mockKernelStore.getSubclusterVats.mockReturnValue([]); + + await subclusterManager.terminateSubcluster('s1'); + + expect(mockVatManager.terminateVat).not.toHaveBeenCalled(); + expect(mockKernelStore.deleteSubcluster).toHaveBeenCalledWith('s1'); + }); + }); + + describe('reloadSubcluster', () => { + it('reloads a subcluster successfully', async () => { + const config = createMockClusterConfig(); + const subcluster = createMockSubcluster('s1', config); + mockKernelStore.getSubcluster + .mockReturnValueOnce(subcluster) + .mockReturnValueOnce({ ...subcluster, id: 's2' }); + mockKernelStore.addSubcluster.mockReturnValue('s2'); + + const result = await subclusterManager.reloadSubcluster('s1'); + + expect(mockKernelQueue.waitForCrank).toHaveBeenCalled(); + expect(mockVatManager.terminateVat).toHaveBeenCalledWith('v2'); + expect(mockVatManager.terminateVat).toHaveBeenCalledWith('v1'); + expect(mockVatManager.collectGarbage).toHaveBeenCalledTimes(2); + expect(mockKernelStore.addSubcluster).toHaveBeenCalledWith(config); + expect(mockVatManager.launchVat).toHaveBeenCalled(); + expect(result).toStrictEqual({ ...subcluster, id: 's2' }); + }); + + it('throws when subcluster not found', async () => { + mockKernelStore.getSubcluster.mockReturnValue(undefined); + + await expect( + subclusterManager.reloadSubcluster('nonexistent'), + ).rejects.toThrow(SubclusterNotFoundError); + }); + + it('throws when new subcluster not found after reload', async () => { + const config = createMockClusterConfig(); + const subcluster = createMockSubcluster('s1', config); + mockKernelStore.getSubcluster + .mockReturnValueOnce(subcluster) + .mockReturnValueOnce(undefined); + mockKernelStore.addSubcluster.mockReturnValue('s2'); + + await expect(subclusterManager.reloadSubcluster('s1')).rejects.toThrow( + SubclusterNotFoundError, + ); + }); + }); + + describe('getSubcluster', () => { + it('returns subcluster when found', () => { + const subcluster = createMockSubcluster('s1', createMockClusterConfig()); + mockKernelStore.getSubcluster.mockReturnValue(subcluster); + + const result = subclusterManager.getSubcluster('s1'); + + expect(result).toStrictEqual(subcluster); + expect(mockKernelStore.getSubcluster).toHaveBeenCalledWith('s1'); + }); + + it('returns undefined when not found', () => { + mockKernelStore.getSubcluster.mockReturnValue(undefined); + + const result = subclusterManager.getSubcluster('nonexistent'); + + expect(result).toBeUndefined(); + }); + }); + + describe('getSubclusters', () => { + it('returns all subclusters', () => { + const subclusters = [ + createMockSubcluster('s1', createMockClusterConfig('test1')), + createMockSubcluster('s2', createMockClusterConfig('test2')), + ]; + mockKernelStore.getSubclusters.mockReturnValue(subclusters); + + const result = subclusterManager.getSubclusters(); + + expect(result).toStrictEqual(subclusters); + expect(mockKernelStore.getSubclusters).toHaveBeenCalled(); + }); + + it('returns empty array when no subclusters', () => { + mockKernelStore.getSubclusters.mockReturnValue([]); + + const result = subclusterManager.getSubclusters(); + + expect(result).toStrictEqual([]); + }); + }); + + describe('isVatInSubcluster', () => { + it('returns true when vat is in subcluster', () => { + mockKernelStore.getVatSubcluster.mockReturnValue('s1'); + + const result = subclusterManager.isVatInSubcluster('v1', 's1'); + + expect(result).toBe(true); + expect(mockKernelStore.getVatSubcluster).toHaveBeenCalledWith('v1'); + }); + + it('returns false when vat is not in subcluster', () => { + mockKernelStore.getVatSubcluster.mockReturnValue('s2'); + + const result = subclusterManager.isVatInSubcluster('v1', 's1'); + + expect(result).toBe(false); + }); + + it('returns false when vat has no subcluster', () => { + // @ts-expect-error mock + mockKernelStore.getVatSubcluster.mockReturnValue(undefined); + + const result = subclusterManager.isVatInSubcluster('v1', 's1'); + + expect(result).toBe(false); + }); + }); + + describe('getSubclusterVats', () => { + it('returns vat IDs for subcluster', () => { + const vatIds = ['v1', 'v2', 'v3'] as VatId[]; + mockKernelStore.getSubclusterVats.mockReturnValue(vatIds); + + const result = subclusterManager.getSubclusterVats('s1'); + + expect(result).toStrictEqual(vatIds); + expect(mockKernelStore.getSubclusterVats).toHaveBeenCalledWith('s1'); + }); + + it('returns empty array when no vats', () => { + mockKernelStore.getSubclusterVats.mockReturnValue([]); + + const result = subclusterManager.getSubclusterVats('s1'); + + expect(result).toStrictEqual([]); + }); + }); + + describe('reloadAllSubclusters', () => { + it('reloads all subclusters successfully', async () => { + const subclusters = [ + createMockSubcluster('s1', createMockClusterConfig('test1')), + createMockSubcluster('s2', createMockClusterConfig('test2')), + ]; + mockKernelStore.getSubclusters.mockReturnValue(subclusters); + mockKernelStore.addSubcluster + .mockReturnValueOnce('s3') + .mockReturnValueOnce('s4'); + + await subclusterManager.reloadAllSubclusters(); + + expect(mockVatManager.terminateAllVats).toHaveBeenCalledOnce(); + expect(mockKernelQueue.waitForCrank).toHaveBeenCalledTimes(2); + expect(mockKernelStore.addSubcluster).toHaveBeenCalledTimes(2); + expect(mockKernelStore.addSubcluster).toHaveBeenCalledWith( + subclusters[0]?.config, + ); + expect(mockKernelStore.addSubcluster).toHaveBeenCalledWith( + subclusters[1]?.config, + ); + expect(mockVatManager.launchVat).toHaveBeenCalledTimes(2); + }); + + it('handles empty subclusters list', async () => { + mockKernelStore.getSubclusters.mockReturnValue([]); + + await subclusterManager.reloadAllSubclusters(); + + expect(mockVatManager.terminateAllVats).toHaveBeenCalledOnce(); + expect(mockKernelStore.addSubcluster).not.toHaveBeenCalled(); + expect(mockVatManager.launchVat).not.toHaveBeenCalled(); + }); + + it('continues reloading even if one fails', async () => { + const subclusters = [ + createMockSubcluster('s1', createMockClusterConfig('test1')), + createMockSubcluster('s2', createMockClusterConfig('test2')), + ]; + mockKernelStore.getSubclusters.mockReturnValue(subclusters); + mockKernelStore.addSubcluster + .mockReturnValueOnce('s3') + .mockReturnValueOnce('s4'); + mockVatManager.launchVat + .mockRejectedValueOnce(new Error('Launch failed')) + .mockResolvedValueOnce('ko2' as KRef); + + // This will throw for the first subcluster but should continue with the second + await expect(subclusterManager.reloadAllSubclusters()).rejects.toThrow( + 'Launch failed', + ); + + expect(mockVatManager.terminateAllVats).toHaveBeenCalledOnce(); + expect(mockKernelStore.addSubcluster).toHaveBeenCalledOnce(); + }); + }); +}); diff --git a/packages/ocap-kernel/src/vats/SubclusterManager.ts b/packages/ocap-kernel/src/vats/SubclusterManager.ts new file mode 100644 index 0000000000..2ea082745e --- /dev/null +++ b/packages/ocap-kernel/src/vats/SubclusterManager.ts @@ -0,0 +1,226 @@ +import type { CapData } from '@endo/marshal'; +import { SubclusterNotFoundError } from '@metamask/kernel-errors'; + +import type { KernelQueue } from '../KernelQueue.ts'; +import type { VatManager } from './VatManager.ts'; +import { kslot, kunser } from '../liveslots/kernel-marshal.ts'; +import type { SlotValue } from '../liveslots/kernel-marshal.ts'; +import type { KernelStore } from '../store/index.ts'; +import type { VatId, KRef, ClusterConfig, Subcluster } from '../types.ts'; +import { isClusterConfig } from '../types.ts'; +import { Fail } from '../utils/assert.ts'; + +type SubclusterManagerOptions = { + kernelStore: KernelStore; + kernelQueue: KernelQueue; + vatManager: VatManager; + getKernelService: (name: string) => { kref: string } | undefined; + queueMessage: ( + target: KRef, + method: string, + args: unknown[], + ) => Promise>; +}; + +/** + * Manages subcluster operations including creation, termination, and reload. + */ +export class SubclusterManager { + /** Storage holding the kernel's persistent state */ + readonly #kernelStore: KernelStore; + + /** The kernel's run queue */ + readonly #kernelQueue: KernelQueue; + + /** The vat manager for vat operations */ + readonly #vatManager: VatManager; + + /** Function to get kernel services */ + readonly #getKernelService: (name: string) => { kref: string } | undefined; + + /** Function to queue messages */ + readonly #queueMessage: ( + target: KRef, + method: string, + args: unknown[], + ) => Promise>; + + constructor({ + kernelStore, + kernelQueue, + vatManager, + getKernelService, + queueMessage, + }: SubclusterManagerOptions) { + this.#kernelStore = kernelStore; + this.#kernelQueue = kernelQueue; + this.#vatManager = vatManager; + this.#getKernelService = getKernelService; + this.#queueMessage = queueMessage; + harden(this); + } + + /** + * Launches a sub-cluster of vats. + * + * @param config - Configuration object for sub-cluster. + * @returns a promise for the (CapData encoded) result of the bootstrap message. + */ + async launchSubcluster( + config: ClusterConfig, + ): Promise | undefined> { + await this.#kernelQueue.waitForCrank(); + isClusterConfig(config) || Fail`invalid cluster config`; + if (!config.vats[config.bootstrap]) { + Fail`invalid bootstrap vat name ${config.bootstrap}`; + } + const subclusterId = this.#kernelStore.addSubcluster(config); + return this.#launchVatsForSubcluster(subclusterId, config); + } + + /** + * Terminates a named sub-cluster of vats. + * + * @param subclusterId - The id of the subcluster to terminate. + * @returns A promise that resolves when termination is complete. + */ + async terminateSubcluster(subclusterId: string): Promise { + await this.#kernelQueue.waitForCrank(); + if (!this.#kernelStore.getSubcluster(subclusterId)) { + throw new SubclusterNotFoundError(subclusterId); + } + const vatIdsToTerminate = this.#kernelStore.getSubclusterVats(subclusterId); + for (const vatId of vatIdsToTerminate.reverse()) { + await this.#vatManager.terminateVat(vatId); + this.#vatManager.collectGarbage(); + } + this.#kernelStore.deleteSubcluster(subclusterId); + } + + /** + * Reloads a named subcluster by restarting all its vats. + * This terminates and restarts all vats in the subcluster. + * + * @param subclusterId - The id of the subcluster to reload. + * @returns A promise for an object containing the subcluster. + * @throws If the subcluster is not found. + */ + async reloadSubcluster(subclusterId: string): Promise { + await this.#kernelQueue.waitForCrank(); + const subcluster = this.getSubcluster(subclusterId); + if (!subcluster) { + throw new SubclusterNotFoundError(subclusterId); + } + for (const vatId of subcluster.vats.reverse()) { + await this.#vatManager.terminateVat(vatId); + this.#vatManager.collectGarbage(); + } + const newId = this.#kernelStore.addSubcluster(subcluster.config); + await this.#launchVatsForSubcluster(newId, subcluster.config); + const newSubcluster = this.getSubcluster(newId); + if (!newSubcluster) { + throw new SubclusterNotFoundError(newId); + } + return newSubcluster; + } + + /** + * Retrieves a subcluster by its ID. + * + * @param subclusterId - The id of the subcluster. + * @returns The subcluster, or undefined if not found. + */ + getSubcluster(subclusterId: string): Subcluster | undefined { + return this.#kernelStore.getSubcluster(subclusterId); + } + + /** + * Gets all subclusters. + * + * @returns An array of subcluster information records. + */ + getSubclusters(): Subcluster[] { + return this.#kernelStore.getSubclusters(); + } + + /** + * Checks if a vat belongs to a specific subcluster. + * + * @param vatId - The ID of the vat to check. + * @param subclusterId - The ID of the subcluster to check against. + * @returns True if the vat belongs to the specified subcluster, false otherwise. + */ + isVatInSubcluster(vatId: VatId, subclusterId: string): boolean { + return this.#kernelStore.getVatSubcluster(vatId) === subclusterId; + } + + /** + * Gets all vat IDs that belong to a specific subcluster. + * + * @param subclusterId - The ID of the subcluster to get vats for. + * @returns An array of vat IDs that belong to the specified subcluster. + */ + getSubclusterVats(subclusterId: string): VatId[] { + return this.#kernelStore.getSubclusterVats(subclusterId); + } + + /** + * Launches all vats for a subcluster and sets up their bootstrap connections. + * + * @param subclusterId - The ID of the subcluster to launch vats for. + * @param config - The configuration for the subcluster. + * @returns A promise for the (CapData encoded) result of the bootstrap message, if any. + */ + async #launchVatsForSubcluster( + subclusterId: string, + config: ClusterConfig, + ): Promise | undefined> { + const rootIds: Record = {}; + const roots: Record = {}; + for (const [vatName, vatConfig] of Object.entries(config.vats)) { + const rootRef = await this.#vatManager.launchVat(vatConfig, subclusterId); + rootIds[vatName] = rootRef; + roots[vatName] = kslot(rootRef, 'vatRoot'); + } + const services: Record = {}; + if (config.services) { + for (const name of config.services) { + const possibleService = this.#getKernelService(name); + if (possibleService) { + const { kref } = possibleService; + services[name] = kslot(kref); + } else { + throw Error(`no registered kernel service '${name}'`); + } + } + } + const bootstrapRoot = rootIds[config.bootstrap]; + if (bootstrapRoot) { + const result = await this.#queueMessage(bootstrapRoot, 'bootstrap', [ + roots, + services, + ]); + const unserialized = kunser(result); + if (unserialized instanceof Error) { + throw unserialized; + } + return result; + } + return undefined; + } + + /** + * Reload all subclusters. + * This is for debugging purposes only. + */ + async reloadAllSubclusters(): Promise { + const subclusters = this.#kernelStore.getSubclusters(); + await this.#vatManager.terminateAllVats(); + for (const subcluster of subclusters) { + await this.#kernelQueue.waitForCrank(); + const newId = this.#kernelStore.addSubcluster(subcluster.config); + await this.#launchVatsForSubcluster(newId, subcluster.config); + } + } +} +harden(SubclusterManager); diff --git a/packages/ocap-kernel/src/vats/VatManager.test.ts b/packages/ocap-kernel/src/vats/VatManager.test.ts new file mode 100644 index 0000000000..cad8e86ba9 --- /dev/null +++ b/packages/ocap-kernel/src/vats/VatManager.test.ts @@ -0,0 +1,509 @@ +import { + VatAlreadyExistsError, + VatDeletedError, + VatNotFoundError, +} from '@metamask/kernel-errors'; +import type { JsonRpcMessage } from '@metamask/kernel-utils'; +import { Logger } from '@metamask/logger'; +import type { DuplexStream } from '@metamask/streams'; +import type { Mocked, MockInstance } from 'vitest'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +import type { KernelQueue } from '../KernelQueue.ts'; +import type { KernelStore } from '../store/index.ts'; +import type { VatId, VatConfig, PlatformServices } from '../types.ts'; +import { VatHandle } from './VatHandle.ts'; +import { VatManager } from './VatManager.ts'; + +describe('VatManager', () => { + let mockPlatformServices: Mocked; + let mockKernelStore: Mocked; + let mockKernelQueue: Mocked; + let mockLogger: Logger; + let vatManager: VatManager; + let makeVatHandleMock: MockInstance; + let vatHandles: Mocked[]; + + const createMockVatConfig = (name = 'test'): VatConfig => ({ + sourceSpec: `${name}.js`, + }); + + const createMockVatHandle = ( + vatId: VatId, + config: VatConfig, + ): Mocked => { + const handle = { + vatId, + config, + terminate: vi.fn(), + ping: vi.fn().mockResolvedValue({ pong: true }), + } as unknown as Mocked; + vatHandles.push(handle); + return handle; + }; + + beforeEach(() => { + vatHandles = []; + + mockPlatformServices = { + launch: vi.fn().mockResolvedValue({ + end: vi.fn(), + } as unknown as DuplexStream), + terminate: vi.fn().mockResolvedValue(undefined), + terminateAll: vi.fn().mockResolvedValue(undefined), + } as unknown as Mocked; + + mockKernelStore = { + getNextVatId: vi + .fn() + .mockReturnValueOnce('v1') + .mockReturnValueOnce('v2') + .mockReturnValueOnce('v3'), + initEndpoint: vi.fn(), + exportFromEndpoint: vi.fn().mockReturnValue('ko1'), + setVatConfig: vi.fn(), + addSubclusterVat: vi.fn(), + getAllVatRecords: vi.fn().mockReturnValue( + (function* () { + // Empty generator + })(), + ), + getVatSubcluster: vi.fn().mockReturnValue('s1'), + markVatAsTerminated: vi.fn(), + getRootObject: vi.fn().mockReturnValue('ko1'), + pinObject: vi.fn(), + unpinObject: vi.fn(), + scheduleReap: vi.fn(), + nextTerminatedVatCleanup: vi.fn().mockReturnValue(false), + collectGarbage: vi.fn(), + } as unknown as Mocked; + + mockKernelQueue = { + waitForCrank: vi.fn().mockResolvedValue(undefined), + } as unknown as Mocked; + + mockLogger = new Logger('test'); + + makeVatHandleMock = vi + .spyOn(VatHandle, 'make') + .mockImplementation(async ({ vatId, vatConfig }) => { + return createMockVatHandle(vatId, vatConfig); + }); + + vatManager = new VatManager({ + platformServices: mockPlatformServices, + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + logger: mockLogger, + }); + }); + + describe('constructor', () => { + it('initializes with provided options', () => { + expect(vatManager).toBeDefined(); + expect(vatManager.getVatIds()).toStrictEqual([]); + }); + + it('uses default logger if not provided', () => { + const manager = new VatManager({ + platformServices: mockPlatformServices, + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + }); + expect(manager).toBeDefined(); + }); + }); + + describe('initializeAllVats', () => { + it('initializes all vats from storage', async () => { + const vatRecords = [ + { vatID: 'v1' as VatId, vatConfig: createMockVatConfig('vat1') }, + { vatID: 'v2' as VatId, vatConfig: createMockVatConfig('vat2') }, + ]; + // eslint-disable-next-line jsdoc/require-jsdoc + function* mockGenerator() { + yield* vatRecords; + } + mockKernelStore.getAllVatRecords.mockReturnValue(mockGenerator()); + + await vatManager.initializeAllVats(); + + expect(mockPlatformServices.launch).toHaveBeenCalledTimes(2); + expect(makeVatHandleMock).toHaveBeenCalledTimes(2); + expect(vatManager.getVatIds()).toStrictEqual(['v1', 'v2']); + }); + + it('handles empty vat records', async () => { + mockKernelStore.getAllVatRecords.mockReturnValue( + (function* () { + // Empty generator + })(), + ); + await vatManager.initializeAllVats(); + + expect(mockPlatformServices.launch).not.toHaveBeenCalled(); + expect(vatManager.getVatIds()).toStrictEqual([]); + }); + }); + + describe('launchVat', () => { + it('launches a new vat without subcluster', async () => { + const config = createMockVatConfig(); + const kref = await vatManager.launchVat(config); + + expect(mockKernelStore.getNextVatId).toHaveBeenCalledOnce(); + expect(mockPlatformServices.launch).toHaveBeenCalledWith('v1', config); + expect(mockKernelStore.initEndpoint).toHaveBeenCalledWith('v1'); + expect(mockKernelStore.exportFromEndpoint).toHaveBeenCalled(); + expect(mockKernelStore.setVatConfig).toHaveBeenCalledWith('v1', config); + expect(mockKernelStore.addSubclusterVat).not.toHaveBeenCalled(); + expect(kref).toBe('ko1'); + }); + + it('launches a new vat with subcluster', async () => { + const config = createMockVatConfig(); + const kref = await vatManager.launchVat(config, 's1'); + + expect(mockKernelStore.addSubclusterVat).toHaveBeenCalledWith('s1', 'v1'); + expect(kref).toBe('ko1'); + }); + }); + + describe('runVat', () => { + it('runs a new vat successfully', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + expect(mockPlatformServices.launch).toHaveBeenCalledWith('v1', config); + expect(makeVatHandleMock).toHaveBeenCalledOnce(); + expect(vatManager.hasVat('v1')).toBe(true); + }); + + it('throws if vat already exists', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + await expect(vatManager.runVat('v1', config)).rejects.toThrow( + VatAlreadyExistsError, + ); + }); + }); + + describe('stopVat', () => { + it('stops a vat for restart', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + await vatManager.stopVat('v1', false); + + expect(mockPlatformServices.terminate).toHaveBeenCalledWith( + 'v1', + undefined, + ); + expect(vatHandles[0]?.terminate).toHaveBeenCalledWith(false, undefined); + expect(vatManager.hasVat('v1')).toBe(false); + }); + + it('stops a vat for termination with reason', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + const reason = { body: 'Test termination', slots: [] }; + + await vatManager.stopVat('v1', true, reason); + + expect(mockPlatformServices.terminate).toHaveBeenCalledWith( + 'v1', + expect.objectContaining({ + message: 'Vat termination: Test termination', + }), + ); + expect(vatHandles[0]?.terminate).toHaveBeenCalledWith( + true, + expect.objectContaining({ + message: 'Vat termination: Test termination', + }), + ); + }); + + it('stops a vat for termination without reason', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + await vatManager.stopVat('v1', true); + + expect(mockPlatformServices.terminate).toHaveBeenCalledWith( + 'v1', + expect.any(VatDeletedError), + ); + expect(vatHandles[0]?.terminate).toHaveBeenCalledWith( + true, + expect.any(VatDeletedError), + ); + }); + + it('throws if vat not found', async () => { + await expect(vatManager.stopVat('v1', false)).rejects.toThrow( + VatNotFoundError, + ); + }); + + it('continues even if platform terminate fails', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + mockPlatformServices.terminate.mockRejectedValueOnce( + new Error('Platform error'), + ); + + await vatManager.stopVat('v1', false); + + expect(vatHandles[0]?.terminate).toHaveBeenCalled(); + expect(vatManager.hasVat('v1')).toBe(false); + }); + }); + + describe('terminateVat', () => { + it('terminates a vat successfully', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + await vatManager.terminateVat('v1'); + + expect(mockKernelQueue.waitForCrank).toHaveBeenCalled(); + expect(mockPlatformServices.terminate).toHaveBeenCalled(); + expect(vatHandles[0]?.terminate).toHaveBeenCalled(); + expect(mockKernelStore.markVatAsTerminated).toHaveBeenCalledWith('v1'); + expect(vatManager.hasVat('v1')).toBe(false); + }); + + it('terminates a vat with reason', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + const reason = { body: 'Custom reason', slots: [] }; + + await vatManager.terminateVat('v1', reason); + + expect(mockPlatformServices.terminate).toHaveBeenCalledWith( + 'v1', + expect.objectContaining({ message: 'Vat termination: Custom reason' }), + ); + }); + }); + + describe('restartVat', () => { + it('restarts a vat successfully', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + const originalHandle = vatHandles[0]; + + const result = await vatManager.restartVat('v1'); + + expect(mockKernelQueue.waitForCrank).toHaveBeenCalled(); + expect(originalHandle?.terminate).toHaveBeenCalledWith(false, undefined); + expect(mockPlatformServices.launch).toHaveBeenCalledTimes(2); + expect(makeVatHandleMock).toHaveBeenCalledTimes(2); + expect(result).not.toBe(originalHandle); + expect(result).toBe(vatHandles[1]); + expect(vatManager.hasVat('v1')).toBe(true); + }); + + it('throws if vat not found', async () => { + await expect(vatManager.restartVat('v1')).rejects.toThrow( + VatNotFoundError, + ); + }); + }); + + describe('pingVat', () => { + it('pings a vat successfully', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + const result = await vatManager.pingVat('v1'); + + expect(vatHandles[0]?.ping).toHaveBeenCalled(); + expect(result).toStrictEqual({ pong: true }); + }); + + it('throws if vat not found', async () => { + await expect(vatManager.pingVat('v1')).rejects.toThrow(VatNotFoundError); + }); + }); + + describe('getVat', () => { + it('returns vat handle if exists', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + const vat = vatManager.getVat('v1'); + + expect(vat).toBe(vatHandles[0]); + }); + + it('throws if vat not found', () => { + expect(() => vatManager.getVat('v1')).toThrow(VatNotFoundError); + }); + }); + + describe('hasVat', () => { + it('returns true if vat exists', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + expect(vatManager.hasVat('v1')).toBe(true); + }); + + it('returns false if vat does not exist', () => { + expect(vatManager.hasVat('v1')).toBe(false); + }); + }); + + describe('getVatIds', () => { + it('returns empty array initially', () => { + expect(vatManager.getVatIds()).toStrictEqual([]); + }); + + it('returns array of vat IDs', async () => { + await vatManager.runVat('v1', createMockVatConfig()); + await vatManager.runVat('v2', createMockVatConfig()); + + expect(vatManager.getVatIds()).toStrictEqual(['v1', 'v2']); + }); + }); + + describe('getVats', () => { + it('returns empty array initially', () => { + expect(vatManager.getVats()).toStrictEqual([]); + }); + + it('returns array of vat information', async () => { + const config1 = createMockVatConfig('vat1'); + const config2 = createMockVatConfig('vat2'); + await vatManager.runVat('v1', config1); + await vatManager.runVat('v2', config2); + + const vats = vatManager.getVats(); + + expect(vats).toHaveLength(2); + expect(vats[0]).toStrictEqual({ + id: 'v1', + config: config1, + subclusterId: 's1', + }); + expect(vats[1]).toStrictEqual({ + id: 'v2', + config: config2, + subclusterId: 's1', + }); + }); + }); + + describe('pinVatRoot', () => { + it('pins vat root successfully', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + const kref = vatManager.pinVatRoot('v1'); + + expect(mockKernelStore.getRootObject).toHaveBeenCalledWith('v1'); + expect(mockKernelStore.pinObject).toHaveBeenCalledWith('ko1'); + expect(kref).toBe('ko1'); + }); + + it('throws if vat not found', () => { + mockKernelStore.getRootObject.mockReturnValue(undefined); + expect(() => vatManager.pinVatRoot('v1')).toThrow(VatNotFoundError); + }); + }); + + describe('unpinVatRoot', () => { + it('unpins vat root successfully', async () => { + const config = createMockVatConfig(); + await vatManager.runVat('v1', config); + + vatManager.unpinVatRoot('v1'); + + expect(mockKernelStore.getRootObject).toHaveBeenCalledWith('v1'); + expect(mockKernelStore.unpinObject).toHaveBeenCalledWith('ko1'); + }); + + it('throws if vat not found', () => { + mockKernelStore.getRootObject.mockReturnValue(undefined); + expect(() => vatManager.unpinVatRoot('v1')).toThrow(VatNotFoundError); + }); + }); + + describe('reapVats', () => { + it('reaps all vats with default filter', async () => { + await vatManager.runVat('v1', createMockVatConfig()); + await vatManager.runVat('v2', createMockVatConfig()); + + vatManager.reapVats(); + + expect(mockKernelStore.scheduleReap).toHaveBeenCalledWith('v1'); + expect(mockKernelStore.scheduleReap).toHaveBeenCalledWith('v2'); + }); + + it('reaps vats matching filter', async () => { + await vatManager.runVat('v1', createMockVatConfig()); + await vatManager.runVat('v2', createMockVatConfig()); + + vatManager.reapVats((vatId) => vatId === 'v1'); + + expect(mockKernelStore.scheduleReap).toHaveBeenCalledWith('v1'); + expect(mockKernelStore.scheduleReap).not.toHaveBeenCalledWith('v2'); + }); + + it('does nothing with no vats', () => { + vatManager.reapVats(); + + expect(mockKernelStore.scheduleReap).not.toHaveBeenCalled(); + }); + }); + + describe('terminateAllVats', () => { + it('terminates all vats in reverse order', async () => { + await vatManager.runVat('v1', createMockVatConfig()); + await vatManager.runVat('v2', createMockVatConfig()); + + await vatManager.terminateAllVats(); + + expect(mockKernelQueue.waitForCrank).toHaveBeenCalled(); + expect(vatHandles[1]?.terminate).toHaveBeenCalled(); + expect(vatHandles[0]?.terminate).toHaveBeenCalled(); + expect(mockKernelStore.markVatAsTerminated).toHaveBeenCalledWith('v2'); + expect(mockKernelStore.markVatAsTerminated).toHaveBeenCalledWith('v1'); + expect(mockKernelStore.collectGarbage).toHaveBeenCalledTimes(2); + expect(vatManager.getVatIds()).toStrictEqual([]); + }); + + it('handles empty vat list', async () => { + await vatManager.terminateAllVats(); + + expect(mockKernelQueue.waitForCrank).toHaveBeenCalled(); + expect(mockKernelStore.markVatAsTerminated).not.toHaveBeenCalled(); + }); + }); + + describe('collectGarbage', () => { + it('collects garbage until cleanup is done', () => { + mockKernelStore.nextTerminatedVatCleanup + .mockReturnValueOnce(true) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + vatManager.collectGarbage(); + + expect(mockKernelStore.nextTerminatedVatCleanup).toHaveBeenCalledTimes(3); + expect(mockKernelStore.collectGarbage).toHaveBeenCalledOnce(); + }); + + it('collects garbage when no cleanup needed', () => { + mockKernelStore.nextTerminatedVatCleanup.mockReturnValue(false); + + vatManager.collectGarbage(); + + expect(mockKernelStore.nextTerminatedVatCleanup).toHaveBeenCalledOnce(); + expect(mockKernelStore.collectGarbage).toHaveBeenCalledOnce(); + }); + }); +}); diff --git a/packages/ocap-kernel/src/vats/VatManager.ts b/packages/ocap-kernel/src/vats/VatManager.ts new file mode 100644 index 0000000000..315770e9d9 --- /dev/null +++ b/packages/ocap-kernel/src/vats/VatManager.ts @@ -0,0 +1,309 @@ +import type { CapData } from '@endo/marshal'; +import { + VatAlreadyExistsError, + VatDeletedError, + VatNotFoundError, +} from '@metamask/kernel-errors'; +import { stringify } from '@metamask/kernel-utils'; +import { Logger, splitLoggerStream } from '@metamask/logger'; + +import type { KernelQueue } from '../KernelQueue.ts'; +import type { KernelStore } from '../store/index.ts'; +import type { VatId, VatConfig, KRef, PlatformServices } from '../types.ts'; +import { ROOT_OBJECT_VREF } from '../types.ts'; +import { VatHandle } from './VatHandle.ts'; +import type { PingVatResult } from '../rpc/index.ts'; + +type VatManagerOptions = { + platformServices: PlatformServices; + kernelStore: KernelStore; + kernelQueue: KernelQueue; + logger?: Logger; +}; + +/** + * Manages vat lifecycle operations including creation, termination, and restart. + */ +export class VatManager { + /** Currently running vats, by ID */ + readonly #vats: Map; + + /** Service to spawn workers (in iframes) for vats to run in */ + readonly #platformServices: PlatformServices; + + /** Storage holding the kernel's persistent state */ + readonly #kernelStore: KernelStore; + + /** The kernel's run queue */ + readonly #kernelQueue: KernelQueue; + + /** Logger for outputting messages (such as errors) to the console */ + readonly #logger: Logger; + + constructor({ + platformServices, + kernelStore, + kernelQueue, + logger, + }: VatManagerOptions) { + this.#vats = new Map(); + this.#platformServices = platformServices; + this.#kernelStore = kernelStore; + this.#kernelQueue = kernelQueue; + this.#logger = logger ?? new Logger('VatManager'); + harden(this); + } + + /** + * Initialize all vats that were previously running. + * This should be called during kernel startup. + * + * @returns A promise that resolves when all vats are initialized. + */ + async initializeAllVats(): Promise { + const starts: Promise[] = []; + for (const { vatID, vatConfig } of this.#kernelStore.getAllVatRecords()) { + starts.push(this.runVat(vatID, vatConfig)); + } + await Promise.all(starts); + } + + /** + * Launch a new vat. + * + * @param vatConfig - Configuration for the new vat. + * @param subclusterId - The ID of the subcluster to launch the vat in. Optional. + * @returns a promise for the KRef of the new vat's root object. + */ + async launchVat(vatConfig: VatConfig, subclusterId?: string): Promise { + const vatId = this.#kernelStore.getNextVatId(); + await this.runVat(vatId, vatConfig); + this.#kernelStore.initEndpoint(vatId); + const rootRef = this.#kernelStore.exportFromEndpoint( + vatId, + ROOT_OBJECT_VREF, + ); + this.#kernelStore.setVatConfig(vatId, vatConfig); + if (subclusterId) { + this.#kernelStore.addSubclusterVat(subclusterId, vatId); + } + return rootRef; + } + + /** + * Start a new or resurrected vat running. + * + * @param vatId - The ID of the vat to start. + * @param vatConfig - Its configuration. + */ + async runVat(vatId: VatId, vatConfig: VatConfig): Promise { + if (this.#vats.has(vatId)) { + throw new VatAlreadyExistsError(vatId); + } + const stream = await this.#platformServices.launch(vatId, vatConfig); + const { kernelStream: vatStream, loggerStream } = splitLoggerStream(stream); + const vatLogger = this.#logger.subLogger({ tags: [vatId] }); + vatLogger.injectStream( + loggerStream as unknown as Parameters[0], + (error) => this.#logger.error(`Vat ${vatId} error: ${stringify(error)}`), + ); + const vat = await VatHandle.make({ + vatId, + vatConfig, + vatStream, + kernelStore: this.#kernelStore, + kernelQueue: this.#kernelQueue, + logger: vatLogger, + }); + this.#vats.set(vatId, vat); + } + + /** + * Stop a vat from running. + * + * Note that after this operation, the vat will be in a weird twilight zone + * between existence and nonexistence, so this operation should only be used + * as a component of vat restart (which will push it back into existence) or + * vat termination (which will push it all the way into nonexistence). + * + * @param vatId - The ID of the vat. + * @param terminating - If true, the vat is being killed, if false, it's being + * restarted. + * @param reason - If the vat is being terminated, the reason for the termination. + */ + async stopVat( + vatId: VatId, + terminating: boolean, + reason?: CapData, + ): Promise { + const vat = this.getVat(vatId); + let terminationError: Error | undefined; + if (reason) { + terminationError = new Error(`Vat termination: ${reason.body}`); + } else if (terminating) { + terminationError = new VatDeletedError(vatId); + } + await this.#platformServices + .terminate(vatId, terminationError) + .catch(this.#logger.error); + await vat.terminate(terminating, terminationError); + this.#vats.delete(vatId); + } + + /** + * Terminate a vat with extreme prejudice. + * + * @param vatId - The ID of the vat. + * @param reason - If the vat is being terminated, the reason for the termination. + */ + async terminateVat(vatId: VatId, reason?: CapData): Promise { + await this.#kernelQueue.waitForCrank(); + await this.stopVat(vatId, true, reason); + // Mark for deletion (which will happen later, in vat-cleanup events) + this.#kernelStore.markVatAsTerminated(vatId); + } + + /** + * Restarts a vat. + * + * @param vatId - The ID of the vat. + * @returns A promise for the restarted vat. + */ + async restartVat(vatId: VatId): Promise { + await this.#kernelQueue.waitForCrank(); + const vat = this.getVat(vatId); + const { config } = vat; + await this.stopVat(vatId, false); + await this.runVat(vatId, config); + return this.getVat(vatId); + } + + /** + * Ping a vat. + * + * @param vatId - The ID of the vat. + * @returns A promise that resolves to the result of the ping. + */ + async pingVat(vatId: VatId): Promise { + const vat = this.getVat(vatId); + return vat.ping(); + } + + /** + * Get a vat. + * + * @param vatId - The ID of the vat. + * @returns the vat's VatHandle. + */ + getVat(vatId: VatId): VatHandle { + const vat = this.#vats.get(vatId); + if (vat === undefined) { + throw new VatNotFoundError(vatId); + } + return vat; + } + + /** + * Check if a vat exists. + * + * @param vatId - The ID of the vat. + * @returns true if the vat exists, false otherwise. + */ + hasVat(vatId: VatId): boolean { + return this.#vats.has(vatId); + } + + /** + * Gets a list of the IDs of all running vats. + * + * @returns An array of vat IDs. + */ + getVatIds(): VatId[] { + return Array.from(this.#vats.keys()); + } + + /** + * Gets a list of information about all running vats. + * + * @returns An array of vat information records. + */ + getVats(): { + id: VatId; + config: VatConfig; + subclusterId: string; + }[] { + return Array.from(this.#vats.values()).map((vat) => { + const subclusterId = this.#kernelStore.getVatSubcluster(vat.vatId); + return { + id: vat.vatId, + config: vat.config, + subclusterId, + }; + }); + } + + /** + * Pin a vat root. + * + * @param vatId - The ID of the vat. + * @returns The KRef of the vat root. + */ + pinVatRoot(vatId: VatId): KRef { + const kref = this.#kernelStore.getRootObject(vatId); + if (!kref) { + throw new VatNotFoundError(vatId); + } + this.#kernelStore.pinObject(kref); + return kref; + } + + /** + * Unpin a vat root. + * + * @param vatId - The ID of the vat. + */ + unpinVatRoot(vatId: VatId): void { + const kref = this.#kernelStore.getRootObject(vatId); + if (!kref) { + throw new VatNotFoundError(vatId); + } + this.#kernelStore.unpinObject(kref); + } + + /** + * Reap vats that match the filter. + * + * @param filter - A function that returns true if the vat should be reaped. + */ + reapVats(filter: (vatId: VatId) => boolean = () => true): void { + for (const vatID of this.getVatIds()) { + if (filter(vatID)) { + this.#kernelStore.scheduleReap(vatID); + } + } + } + + /** + * Terminate all vats and collect garbage. + * This is for debugging purposes only. + */ + async terminateAllVats(): Promise { + await this.#kernelQueue.waitForCrank(); + for (const id of this.getVatIds().reverse()) { + await this.terminateVat(id); + this.collectGarbage(); + } + } + + /** + * Collect garbage. + * This is for debugging purposes only. + */ + collectGarbage(): void { + while (this.#kernelStore.nextTerminatedVatCleanup()) { + // wait for all vats to be cleaned up + } + this.#kernelStore.collectGarbage(); + } +} +harden(VatManager); diff --git a/vitest.config.ts b/vitest.config.ts index c3151c219a..83d678d7e1 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -156,10 +156,10 @@ export default defineConfig({ lines: 25, }, 'packages/ocap-kernel/**': { - statements: 94.05, - functions: 96.04, - branches: 83.86, - lines: 94.07, + statements: 94.33, + functions: 96.08, + branches: 84.58, + lines: 94.36, }, 'packages/omnium-gatherum/**': { statements: 5.26,