diff --git a/packages/core/rpc/BaseRpcDriver.test.ts b/packages/core/rpc/BaseRpcDriver.test.ts index 5e7adfc7a1..7256232372 100644 --- a/packages/core/rpc/BaseRpcDriver.test.ts +++ b/packages/core/rpc/BaseRpcDriver.test.ts @@ -121,7 +121,7 @@ class MockRpcDriver extends BaseRpcDriver { workerCheckFrequency = 500 - async makeWorker(_pluginManager: PluginManager) { + async makeWorker() { return new MockWorkerHandle() } } diff --git a/packages/core/rpc/BaseRpcDriver.ts b/packages/core/rpc/BaseRpcDriver.ts index 1b0e131cc4..715e5edb16 100644 --- a/packages/core/rpc/BaseRpcDriver.ts +++ b/packages/core/rpc/BaseRpcDriver.ts @@ -3,6 +3,7 @@ import { objectFromEntries } from '../util' import { serializeAbortSignal } from './remoteAbortSignals' import PluginManager from '../PluginManager' import { AnyConfigurationModel } from '../configuration/configurationSchema' +import { readConfObject } from '../configuration' export interface WorkerHandle { status?: string @@ -70,30 +71,31 @@ function detectHardwareConcurrency() { return 1 } class LazyWorker { - worker?: WorkerHandle + workerP?: Promise | undefined constructor(public driver: BaseRpcDriver) {} - async getWorker(pluginManager: PluginManager, rpcDriverClassName: string) { - if (!this.worker) { - const worker = await this.driver.makeWorker(pluginManager) - watchWorker(worker, this.driver.maxPingTime, rpcDriverClassName).catch( - error => { - if (this.worker) { - console.error( - 'worker did not respond, killing and generating new one', - ) - console.error(error) - this.worker.destroy() - this.worker.status = 'killed' - this.worker.error = error - this.worker = undefined - } - }, - ) - this.worker = worker + async getWorker() { + if (!this.workerP) { + this.workerP = this.driver.makeWorker().then(worker => { + watchWorker(worker, this.driver.maxPingTime, this.driver.name).catch( + error => { + if (worker) { + console.error( + 'worker did not respond, killing and generating new one', + ) + console.error(error) + worker.destroy() + worker.status = 'killed' + worker.error = error + this.workerP = undefined + } + }, + ) + return worker + }) } - return this.worker + return this.workerP } } @@ -104,9 +106,7 @@ export default abstract class BaseRpcDriver { private workerAssignments = new Map() // sessionId -> worker number - private workerCount = 0 - - abstract makeWorker(pluginManager: PluginManager): Promise + abstract makeWorker(): Promise private workerPool?: LazyWorker[] @@ -121,24 +121,18 @@ export default abstract class BaseRpcDriver { } // filter the given object and just remove any non-clonable things from it - filterArgs( - thing: THING_TYPE, - pluginManager: PluginManager, - sessionId: string, - ): THING_TYPE { + filterArgs(thing: THING_TYPE, sessionId: string): THING_TYPE { if (Array.isArray(thing)) { return thing .filter(isClonable) - .map(t => - this.filterArgs(t, pluginManager, sessionId), - ) as unknown as THING_TYPE + .map(t => this.filterArgs(t, sessionId)) as unknown as THING_TYPE } if (typeof thing === 'object' && thing !== null) { // AbortSignals are specially handled if (thing instanceof AbortSignal) { return serializeAbortSignal( thing, - this.remoteAbort.bind(this, pluginManager, sessionId), + this.remoteAbort.bind(this, sessionId), ) as unknown as THING_TYPE } @@ -155,19 +149,14 @@ export default abstract class BaseRpcDriver { return objectFromEntries( Object.entries(thing) .filter(e => isClonable(e[1])) - .map(([k, v]) => [k, this.filterArgs(v, pluginManager, sessionId)]), + .map(([k, v]) => [k, this.filterArgs(v, sessionId)]), ) as THING_TYPE } return thing } - async remoteAbort( - pluginManager: PluginManager, - sessionId: string, - functionName: string, - signalId: number, - ) { - const worker = await this.getWorker(sessionId, pluginManager) + async remoteAbort(sessionId: string, functionName: string, signalId: number) { + const worker = await this.getWorker(sessionId) worker.call( functionName, { signalId }, @@ -179,7 +168,8 @@ export default abstract class BaseRpcDriver { const hardwareConcurrency = detectHardwareConcurrency() const workerCount = - this.workerCount || Math.max(1, Math.ceil((hardwareConcurrency - 2) / 3)) + readConfObject(this.config, 'workerCount') || + Math.max(1, Math.ceil((hardwareConcurrency - 2) / 3)) return [...new Array(workerCount)].map(() => new LazyWorker(this)) } @@ -193,10 +183,7 @@ export default abstract class BaseRpcDriver { return this.workerPool } - async getWorker( - sessionId: string, - pluginManager: PluginManager, - ): Promise { + async getWorker(sessionId: string): Promise { const workers = this.getWorkerPool() let workerNumber = this.workerAssignments.get(sessionId) if (workerNumber === undefined) { @@ -207,7 +194,7 @@ export default abstract class BaseRpcDriver { } // console.log(`${sessionId} -> worker ${workerNumber}`) - const worker = workers[workerNumber].getWorker(pluginManager, this.name) + const worker = workers[workerNumber].getWorker() if (!worker) { throw new Error('no web workers registered for RPC') } @@ -225,14 +212,10 @@ export default abstract class BaseRpcDriver { throw new TypeError('sessionId is required') } let done = false - const worker = await this.getWorker(sessionId, pluginManager) + const worker = await this.getWorker(sessionId) const rpcMethod = pluginManager.getRpcMethodType(functionName) const serializedArgs = await rpcMethod.serializeArguments(args, this.name) - const filteredAndSerializedArgs = this.filterArgs( - serializedArgs, - pluginManager, - sessionId, - ) + const filteredAndSerializedArgs = this.filterArgs(serializedArgs, sessionId) // now actually call the worker const callP = worker diff --git a/packages/core/rpc/RpcManager.ts b/packages/core/rpc/RpcManager.ts index a03d574069..f59caeae09 100644 --- a/packages/core/rpc/RpcManager.ts +++ b/packages/core/rpc/RpcManager.ts @@ -43,22 +43,32 @@ export default class RpcManager { if (driver) { return driver } + let newDriver + const config = this.mainConfiguration.drivers.get('WebWorkerRpcDriver') + if (backendName === 'MainThreadRpcDriver') { + const backendConfiguration = + this.backendConfigurations.MainThreadRpcDriver - const backendConfiguration = this.backendConfigurations[backendName] - const DriverClassImpl = DriverClasses[backendName] - - if (!DriverClassImpl) { + if (!backendConfiguration) { + throw new Error( + `requested RPC driver "${backendName}" is missing config`, + ) + } + newDriver = new MainThreadRpcDriver({ ...backendConfiguration, config }) + } else if (backendName === 'WebWorkerRpcDriver') { + const backendConfiguration = this.backendConfigurations.WebWorkerRpcDriver + if (!backendConfiguration) { + throw new Error( + `requested RPC driver "${backendName}" is missing config`, + ) + } + newDriver = new WebWorkerRpcDriver( + { ...backendConfiguration, config }, + { plugins: this.pluginManager.runtimePluginDefinitions }, + ) + } else { throw new Error(`requested RPC driver "${backendName}" is not installed`) } - - if (!backendConfiguration) { - throw new Error(`requested RPC driver "${backendName}" is missing config`) - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const newDriver = new DriverClassImpl(backendConfiguration as any, { - plugins: this.pluginManager.runtimePluginDefinitions, - }) this.driverObjects.set(backendName, newDriver) return newDriver } diff --git a/packages/core/rpc/configSchema.ts b/packages/core/rpc/configSchema.ts index 317ddb5821..e1885b8871 100644 --- a/packages/core/rpc/configSchema.ts +++ b/packages/core/rpc/configSchema.ts @@ -1,20 +1,27 @@ import { types } from 'mobx-state-tree' import { ConfigurationSchema } from '../configuration' +const BaseRpcDriverConfigSchema = ConfigurationSchema( + 'BaseRpcDriver', + { + workerCount: { + type: 'number', + description: + 'The number of workers to use. If 0 (the default) JBrowse will decide how many workers to use.', + defaultValue: 0, + }, + }, + { explicitlyTyped: true }, +) const MainThreadRpcDriverConfigSchema = ConfigurationSchema( 'MainThreadRpcDriver', {}, - { explicitlyTyped: true }, + { explicitlyTyped: true, baseConfiguration: BaseRpcDriverConfigSchema }, ) const WebWorkerRpcDriverConfigSchema = ConfigurationSchema( 'WebWorkerRpcDriver', {}, - { explicitlyTyped: true }, -) -const ElectronRpcDriverConfigSchema = ConfigurationSchema( - 'ElectronRpcDriver', - {}, - { explicitlyTyped: true }, + { explicitlyTyped: true, baseConfiguration: BaseRpcDriverConfigSchema }, ) export default ConfigurationSchema( @@ -31,7 +38,6 @@ export default ConfigurationSchema( types.union( MainThreadRpcDriverConfigSchema, WebWorkerRpcDriverConfigSchema, - ElectronRpcDriverConfigSchema, ), ), { MainThreadRpcDriver: { type: 'MainThreadRpcDriver' } }, diff --git a/products/jbrowse-web/src/Loader.tsx b/products/jbrowse-web/src/Loader.tsx index 8d1da457d0..bc4cedf3b1 100644 --- a/products/jbrowse-web/src/Loader.tsx +++ b/products/jbrowse-web/src/Loader.tsx @@ -293,10 +293,16 @@ const Renderer = observer( }, { pluginManager }, ) - rootModel.jbrowse.configuration.rpc.addDriverConfig( - 'WebWorkerRpcDriver', - { type: 'WebWorkerRpcDriver' }, - ) + if ( + !rootModel.jbrowse.configuration.rpc.drivers.get( + 'WebWorkerRpcDriver', + ) + ) { + rootModel.jbrowse.configuration.rpc.addDriverConfig( + 'WebWorkerRpcDriver', + { type: 'WebWorkerRpcDriver' }, + ) + } if (!loader.configSnapshot?.configuration?.rpc?.defaultDriver) { rootModel.jbrowse.configuration.rpc.defaultDriver.set( 'WebWorkerRpcDriver',