Skip to content

Commit

Permalink
Allow user to specify number of workers (#2829)
Browse files Browse the repository at this point in the history
* Fix bug where too many workers were created

* Add ability to specify workerCount in config

* Fix name of base rpc driver config schema
  • Loading branch information
garrettjstevens committed Mar 18, 2022
1 parent 1d6e373 commit a2a20af
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 78 deletions.
2 changes: 1 addition & 1 deletion packages/core/rpc/BaseRpcDriver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class MockRpcDriver extends BaseRpcDriver {

workerCheckFrequency = 500

async makeWorker(_pluginManager: PluginManager) {
async makeWorker() {
return new MockWorkerHandle()
}
}
Expand Down
87 changes: 35 additions & 52 deletions packages/core/rpc/BaseRpcDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { objectFromEntries } from '../util'
import { serializeAbortSignal } from './remoteAbortSignals'
import PluginManager from '../PluginManager'
import { AnyConfigurationModel } from '../configuration/configurationSchema'
import { readConfObject } from '../configuration'

export interface WorkerHandle {
status?: string
Expand Down Expand Up @@ -70,30 +71,31 @@ function detectHardwareConcurrency() {
return 1
}
class LazyWorker {
worker?: WorkerHandle
workerP?: Promise<WorkerHandle> | undefined

constructor(public driver: BaseRpcDriver) {}

async getWorker(pluginManager: PluginManager, rpcDriverClassName: string) {
if (!this.worker) {
const worker = await this.driver.makeWorker(pluginManager)
watchWorker(worker, this.driver.maxPingTime, rpcDriverClassName).catch(
error => {
if (this.worker) {
console.error(
'worker did not respond, killing and generating new one',
)
console.error(error)
this.worker.destroy()
this.worker.status = 'killed'
this.worker.error = error
this.worker = undefined
}
},
)
this.worker = worker
async getWorker() {
if (!this.workerP) {
this.workerP = this.driver.makeWorker().then(worker => {
watchWorker(worker, this.driver.maxPingTime, this.driver.name).catch(
error => {
if (worker) {
console.error(
'worker did not respond, killing and generating new one',
)
console.error(error)
worker.destroy()
worker.status = 'killed'
worker.error = error
this.workerP = undefined
}
},
)
return worker
})
}
return this.worker
return this.workerP
}
}

Expand All @@ -104,9 +106,7 @@ export default abstract class BaseRpcDriver {

private workerAssignments = new Map<string, number>() // sessionId -> worker number

private workerCount = 0

abstract makeWorker(pluginManager: PluginManager): Promise<WorkerHandle>
abstract makeWorker(): Promise<WorkerHandle>

private workerPool?: LazyWorker[]

Expand All @@ -121,24 +121,18 @@ export default abstract class BaseRpcDriver {
}

// filter the given object and just remove any non-clonable things from it
filterArgs<THING_TYPE>(
thing: THING_TYPE,
pluginManager: PluginManager,
sessionId: string,
): THING_TYPE {
filterArgs<THING_TYPE>(thing: THING_TYPE, sessionId: string): THING_TYPE {
if (Array.isArray(thing)) {
return thing
.filter(isClonable)
.map(t =>
this.filterArgs(t, pluginManager, sessionId),
) as unknown as THING_TYPE
.map(t => this.filterArgs(t, sessionId)) as unknown as THING_TYPE
}
if (typeof thing === 'object' && thing !== null) {
// AbortSignals are specially handled
if (thing instanceof AbortSignal) {
return serializeAbortSignal(
thing,
this.remoteAbort.bind(this, pluginManager, sessionId),
this.remoteAbort.bind(this, sessionId),
) as unknown as THING_TYPE
}

Expand All @@ -155,19 +149,14 @@ export default abstract class BaseRpcDriver {
return objectFromEntries(
Object.entries(thing)
.filter(e => isClonable(e[1]))
.map(([k, v]) => [k, this.filterArgs(v, pluginManager, sessionId)]),
.map(([k, v]) => [k, this.filterArgs(v, sessionId)]),
) as THING_TYPE
}
return thing
}

async remoteAbort(
pluginManager: PluginManager,
sessionId: string,
functionName: string,
signalId: number,
) {
const worker = await this.getWorker(sessionId, pluginManager)
async remoteAbort(sessionId: string, functionName: string, signalId: number) {
const worker = await this.getWorker(sessionId)
worker.call(
functionName,
{ signalId },
Expand All @@ -179,7 +168,8 @@ export default abstract class BaseRpcDriver {
const hardwareConcurrency = detectHardwareConcurrency()

const workerCount =
this.workerCount || Math.max(1, Math.ceil((hardwareConcurrency - 2) / 3))
readConfObject(this.config, 'workerCount') ||
Math.max(1, Math.ceil((hardwareConcurrency - 2) / 3))

return [...new Array(workerCount)].map(() => new LazyWorker(this))
}
Expand All @@ -193,10 +183,7 @@ export default abstract class BaseRpcDriver {
return this.workerPool
}

async getWorker(
sessionId: string,
pluginManager: PluginManager,
): Promise<WorkerHandle> {
async getWorker(sessionId: string): Promise<WorkerHandle> {
const workers = this.getWorkerPool()
let workerNumber = this.workerAssignments.get(sessionId)
if (workerNumber === undefined) {
Expand All @@ -207,7 +194,7 @@ export default abstract class BaseRpcDriver {
}

// console.log(`${sessionId} -> worker ${workerNumber}`)
const worker = workers[workerNumber].getWorker(pluginManager, this.name)
const worker = workers[workerNumber].getWorker()
if (!worker) {
throw new Error('no web workers registered for RPC')
}
Expand All @@ -225,14 +212,10 @@ export default abstract class BaseRpcDriver {
throw new TypeError('sessionId is required')
}
let done = false
const worker = await this.getWorker(sessionId, pluginManager)
const worker = await this.getWorker(sessionId)
const rpcMethod = pluginManager.getRpcMethodType(functionName)
const serializedArgs = await rpcMethod.serializeArguments(args, this.name)
const filteredAndSerializedArgs = this.filterArgs(
serializedArgs,
pluginManager,
sessionId,
)
const filteredAndSerializedArgs = this.filterArgs(serializedArgs, sessionId)

// now actually call the worker
const callP = worker
Expand Down
36 changes: 23 additions & 13 deletions packages/core/rpc/RpcManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,32 @@ export default class RpcManager {
if (driver) {
return driver
}
let newDriver
const config = this.mainConfiguration.drivers.get('WebWorkerRpcDriver')
if (backendName === 'MainThreadRpcDriver') {
const backendConfiguration =
this.backendConfigurations.MainThreadRpcDriver

const backendConfiguration = this.backendConfigurations[backendName]
const DriverClassImpl = DriverClasses[backendName]

if (!DriverClassImpl) {
if (!backendConfiguration) {
throw new Error(
`requested RPC driver "${backendName}" is missing config`,
)
}
newDriver = new MainThreadRpcDriver({ ...backendConfiguration, config })
} else if (backendName === 'WebWorkerRpcDriver') {
const backendConfiguration = this.backendConfigurations.WebWorkerRpcDriver
if (!backendConfiguration) {
throw new Error(
`requested RPC driver "${backendName}" is missing config`,
)
}
newDriver = new WebWorkerRpcDriver(
{ ...backendConfiguration, config },
{ plugins: this.pluginManager.runtimePluginDefinitions },
)
} else {
throw new Error(`requested RPC driver "${backendName}" is not installed`)
}

if (!backendConfiguration) {
throw new Error(`requested RPC driver "${backendName}" is missing config`)
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const newDriver = new DriverClassImpl(backendConfiguration as any, {
plugins: this.pluginManager.runtimePluginDefinitions,
})
this.driverObjects.set(backendName, newDriver)
return newDriver
}
Expand Down
22 changes: 14 additions & 8 deletions packages/core/rpc/configSchema.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import { types } from 'mobx-state-tree'
import { ConfigurationSchema } from '../configuration'

const BaseRpcDriverConfigSchema = ConfigurationSchema(
'BaseRpcDriver',
{
workerCount: {
type: 'number',
description:
'The number of workers to use. If 0 (the default) JBrowse will decide how many workers to use.',
defaultValue: 0,
},
},
{ explicitlyTyped: true },
)
const MainThreadRpcDriverConfigSchema = ConfigurationSchema(
'MainThreadRpcDriver',
{},
{ explicitlyTyped: true },
{ explicitlyTyped: true, baseConfiguration: BaseRpcDriverConfigSchema },
)
const WebWorkerRpcDriverConfigSchema = ConfigurationSchema(
'WebWorkerRpcDriver',
{},
{ explicitlyTyped: true },
)
const ElectronRpcDriverConfigSchema = ConfigurationSchema(
'ElectronRpcDriver',
{},
{ explicitlyTyped: true },
{ explicitlyTyped: true, baseConfiguration: BaseRpcDriverConfigSchema },
)

export default ConfigurationSchema(
Expand All @@ -31,7 +38,6 @@ export default ConfigurationSchema(
types.union(
MainThreadRpcDriverConfigSchema,
WebWorkerRpcDriverConfigSchema,
ElectronRpcDriverConfigSchema,
),
),
{ MainThreadRpcDriver: { type: 'MainThreadRpcDriver' } },
Expand Down
14 changes: 10 additions & 4 deletions products/jbrowse-web/src/Loader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,16 @@ const Renderer = observer(
},
{ pluginManager },
)
rootModel.jbrowse.configuration.rpc.addDriverConfig(
'WebWorkerRpcDriver',
{ type: 'WebWorkerRpcDriver' },
)
if (
!rootModel.jbrowse.configuration.rpc.drivers.get(
'WebWorkerRpcDriver',
)
) {
rootModel.jbrowse.configuration.rpc.addDriverConfig(
'WebWorkerRpcDriver',
{ type: 'WebWorkerRpcDriver' },
)
}
if (!loader.configSnapshot?.configuration?.rpc?.defaultDriver) {
rootModel.jbrowse.configuration.rpc.defaultDriver.set(
'WebWorkerRpcDriver',
Expand Down

0 comments on commit a2a20af

Please sign in to comment.